Scenario

In data warehouse projects it is often necessary to import a huge amount of data provided in CSV format into a database, without making essential structural changes to the source data format.  

Possible requirements for this Extract Transform Load (ETL) processes are:

-       Adding a BatchId to the imported records in the staging tables

-       Check the structure of the source file

-       Logging detailed information about the imported data (number of rows imported, time to import the data, etc.)

To achieve this with MS SSIS, it is necessary to build a SSIS package for every source file. This can be very time consuming due do the fact that there may be hundreds of source files to be imported and it is very cumbersome to do all the mapping tasks again and again.

In this articles I want to show, how it is possible to build these SSIS packages dynamically. I will show how to store the required metadata and how to build all the packages by providing a template package with the desired functionality.

 

The SSIS package

One of the first things to consider is how the packages should look like. Which features should they contain, and how should the data be transformed on the way from the source file to the destination table.

For the sake of simplicity I will use a rather simple template package. But I will mention how one can implement more features into the ETL process without breaking the concepts I show in this articles.

Basis features of the template package:

-       Signature check – before the procession of the input file takes place, the structure of the file is validated

-       BatchId –assign a unique Id to each imported record

The package I build is shown in the following picture.

Control Flow

http://cid-e6b2eca5f2789165.skydrive.live.com/self.aspx/DynamicSSIS/DynSSISControlFlow.png

Data Flow

http://cid-e6b2eca5f2789165.skydrive.live.com/self.aspx/DynamicSSIS/DynSSISDataFlow.png

As you see the package is really simple, and it is not a challenge to build a package like this. But as I already mentioned on the beginning of the article, the challenge is to build tens or hundreds of this packages.

Source File

The source files a in CSV Format with the column names in the first row. I use the ‘|’ symbol as column separator and {CR}{LF} as line separator.

Example:

ProductID|Name|ProductNumber|MakeFlag|FinishedGoodsFlag|Color|SafetyStockLevel| …
1|Adjustable Race|AR-5381|False|False||1000|750|0|0|||||0||||||1998-06-01 …
2|Bearing Ball|BA-8327|False|False||1000|750|0|0|||||0||||||1998-06-01 00:00:00 …
3|BB Ball Bearing|BE-2349|True|False||800|600|0|0|||||1||||||1998-06-01 00:00:00 …

                             

Destination Table

For this demonstration I use the AdventureWorks2008 database. And the template package loads the data from the flat file into the Production.Product table.

The AdventureWorks2008 database can be downloaded from CodePlex. (http://msftdbprodsamples.codeplex.com/)

The SSISFactory

The SSIS Factory is a .NET program which reads the template package, modifies the relevant parts and creates a package for each target table defined in the metadata.

The metadata

Every configuration the SSISFactory needs to create the packages must be defined in the metadata. I choose to create 2 database tables, holding this information. The one is holding information about the source file/destination table and the second one contains information about the columns belonging to a source file/destination table.

PackageDefinition table

Column name

Description

PackageDefinitionId

Key column

PackageName

Name of the package to be created

TemplatePackage

URI of the SSIS package which should be taken as the template

SourceFile

URI of the source data file

DestinationTable

Name of the destination table

 

PackageColumn table

Column name

Description

PackageColumnId

Key column

PackageDefinitionId

Foreign-Key column referencing the corresponding PackageDefinition table

ColumnType

 

ColumnName

Name of the column in the target table

DataType

Data type of the column in the target table

MaxLength

Maximum length of the data type

Precision

Precision of the data type

Scale

Scale of the data type

Order

Ordering information (must be in the order of the target table)

MapToTargetDataType

Indicates if an explicit cast to the target data type is necessary (

ConvertExpression

Describes the conversion

ConvertFriendlyExpression

Friendly description of the conversion


The package factory

The following steps are implemented in the factory class:

-       Load the package template

-       Analyze the template and store relevant information in member variables (e.g. connections)

-       Update connections

-       Update control flow components

-       Save the modified package

Load the template package

Loading the package template is quite easy. All you have to do is to load the file into an XmlDocument instance. With this instance a Microsoft.SqlServer.Dts.Runtime.Package instance can be created with the LoadFromXML method. This method takes the XmlDocument with the package definition as an input parameter. As a second parameter an event handler class to track package errors is required. In my example I do not deal with these errors because the template package should not contain any errors.

XmlDocument packageXml = new XmlDocument();

       packageXml.Load(_PackageDefinition.TemplatePackage);

 

       IDTSEvents events = new SSISEventSupport();

       _Package = new Microsoft.SqlServer.Dts.Runtime.Package();

       _Package.LoadFromXML(packageXml, events);

Analyze the template and acquire relevant information

In this step I search the template package for all the information I need late in the package modification steps. In the example I need the Source and Destination Connection Managers. As you can see the names of the connection managers are hard coded and therefore the factory class is tightly coupled to the package template.

foreach (ConnectionManager connectionManager in _Package.Connections)

       {

              if (connectionManager.Name.Equals("SourceFlatFileCM"))

              {

                    _SRCFFConnectionManager = connectionManager;

              }

              else if (connectionManager.Name.Equals("Destination"))

              {

                    _DSTSQLConnectionManager = connectionManager;

              }

       }

Update connections

The source connection managers must be updated with the specific information for the package source file. This information comprises the source file name and location and the data fields with their name, data types and length information.

In my example the update is implemented in the procedure ‘updateSourceConnectionManager’.

private void updateSourceConnectionManager()

       {

              _SRCFFConnectionManager.ConnectionString = _PackageDefinition.SourceFile;

              _SRCFFConnectionManager.Properties["Format"].SetValue(_SRCFFConnectionManager, "Delimited");

              _SRCFFConnectionManager.Properties["ColumnNamesInFirstDataRow"].SetValue(_SRCFFConnectionManager, true);

              _SRCFFConnectionManager.Properties["LocaleID"].SetValue(_SRCFFConnectionManager, "1033");

              _SRCFFConnectionManager.Properties["CodePage"].SetValue(_SRCFFConnectionManager, "1252");

             _SRCFFConnectionManager.Properties["RowDelimiter"].SetValue(_SRCFFConnectionManager, "\n");

 

       IDTSConnectionManagerFlatFile100 connectionFlatFile

             = (IDTSConnectionManagerFlatFile100)_SRCFFConnectionManager.InnerObject;

             connectionFlatFile.RowDelimiter = "\n";

 

       // remove existing columns

       foreach (IDTSConnectionManagerFlatFileColumn100 column in connectionFlatFile.Columns)

       {

             connectionFlatFile.Columns.Remove(column);

       }

       // add new columns

       foreach (PackageColumn packageColumn in _PackageColumns)

       {

             IDTSConnectionManagerFlatFileColumn100 flatFileColumn

             = (IDTSConnectionManagerFlatFileColumn100)connectionFlatFile.Columns.Add();

                           flatFileColumn.ColumnType = "Delimited";

                           flatFileColumn.ColumnWidth = 0;

                           flatFileColumn.MaximumWidth = (int)packageColumn.MaxLength;

             if (packageColumn.MapToTargetDataType == true)

             {

                    flatFileColumn.DataType = DataType.DT_STR;

             }

             else

             {

                    flatFileColumn.DataType = mapSSISDataType(packageColumn.DataType);

             }

             flatFileColumn.DataPrecision = (int)packageColumn.Precision;

             flatFileColumn.DataScale = (int)packageColumn.Scale;

             flatFileColumn.TextQualified = false;

             IDTSName100 columnName = (IDTSName100)flatFileColumn;

             columnName.Name = packageColumn.ColumnName;

             if (packageColumn.Order == _PackageColumns.Count)

             {

                    flatFileColumn.ColumnDelimiter = "\n";

             }

             else

             {

                    flatFileColumn.ColumnDelimiter = "|";

             }

       }

}

The main steps of the procedure are

1.       Set the connection manager properties (name of the source file, format, header line, code page, …)

2.       Remove all column definitions and add new ones. The column definitions are derived from the information in the meta data databes (Table PackageColumn)

a.       Remember to set the column delimiter of the last column to the row delimiter (“\n”);

Update control flow components

In my example I have to update 2 control flow components. One ‘Execute SQL’ task and a data flow task. This is done in a loop which loops through all the control flow components.

foreach (Executable executeable in _Package.Executables)

       {

              if (executeable.GetType() == typeof(Microsoft.SqlServer.Dts.Runtime.TaskHost))

              {

                    Microsoft.SqlServer.Dts.Runtime.TaskHost taskHost = (Microsoft.SqlServer.Dts.Runtime.TaskHost)executeable;

                    switch (taskHost.Name)

                    {

                           case "DFT Import":

                                  updateDFTImport(taskHost);

                                  break;

                           case "SQL Truncate target table":

                                  //updateSQLTruncateTargetTable(taskHost);

                                  break;

                           default:

                                  break;

                    }

              }

       }

SQL Truncate target table

This task is responsible for deleting the target table of our import process. Therefore the SQL statement has to be updated, so the proper table will be deleted.

       private void updateSQLTruncateTargetTable(Microsoft.SqlServer.Dts.Runtime.TaskHost taskHost)

       {

              Microsoft.SqlServer.Dts.Tasks.ExecuteSQLTask.ExecuteSQLTask executeSQLTask = (Microsoft.SqlServer.Dts.Tasks.ExecuteSQLTask.ExecuteSQLTask)taskHost.InnerObject;

             executeSQLTask.SqlStatementSource = "truncate table " + _PackageDefinition.DestinationTable;

       }

DataFlow task

More work has to be done when modifying the data flow tasks. The main steps are

1.       Process all data flow components along the chain of connected tasks.

2.       Update the metadata of all tasks to reflect the changed columns

a.       Update data conversion taks according the information in the package metadata

b.      Update the destination tasks (column metadata and mapping)

The procdure ‘updateDFTImport’ loops through all data flow components and and if it’s the first in the chain (component.InputCollection.Count == 0) calls the update procedure (updateDataflowComponents).

private void updateDFTImport(Microsoft.SqlServer.Dts.Runtime.TaskHost taskHost)

       {           

              MainPipe dataFlowTask = (MainPipe)taskHost.InnerObject;

             

              foreach (IDTSComponentMetaData100 component in dataFlowTask.ComponentMetaDataCollection)

              {

                    if (component.InputCollection.Count == 0)

                    {

                           updateDataFlowComponents(component, null, dataFlowTask);          

                    }

             }

       }

Within the ‘updateFlowComponents’ procedure dependent of the type of the data flow component a specialized update procedure is called.

       private void updateDataFlowComponents(IDTSComponentMetaData100 component, IDTSComponentMetaData100 predecessorComponent, MainPipe dataFlowTask)

       {

             switch (component.Name)

             {

                    case "FFSRC":

                           updateFlatFileSourceComponent(component);

                           break;

                    case "DCNV":

                           updateDataConversionComponent(component, predecessorComponent, dataFlowTask);

                           break;

                    case "OLEDEST":

                           updateOLEDestinationComponent(component, predecessorComponent, dataFlowTask);

                           break;

                    default:

                           updateDefaultComponent(component, predecessorComponent, DataFlowTask);

                           break;

             }

                    //continue processing along the path

             foreach (IDTSPath100 path in dataFlowTask.PathCollection)

             {

                    if (path.StartPoint.Component != null && path.StartPoint.Component.Name.Equals(component.Name))

                    {

                           updateDataFlowComponents(path.EndPoint.Component, component, dataFlowTask);

                           }

             }

       }

Save modified package

Saving the modified package is similar to the load procedure. Errors which are recognized while saving the package are tracked with a class implementing the IDTSEvents interface.

      public void savePackage(string NewPackageName)

       {

              XmlDocument newXml = new XmlDocument();

              IDTSEvents events = new SSISEventSupport();

              string xmlString;

              _Package.SaveToXML(out xmlString, events);

                    if (File.Exists(NewPackageName))

             {

                    File.Delete(NewPackageName);

             }

                    StreamWriter sw = new StreamWriter(NewPackageName);

             sw.WriteLine(xmlString);

             sw.Flush();

             sw.Close();

       }

 

Project source

The source code can be downloaded from my SkyDrive:
 http://cid-e6b2eca5f2789165.skydrive.live.com/self.aspx/DynamicSSIS/DynamicSSIS.zip

Appendix

 Integration Services Data Types

http://msdn.microsoft.com/en-us/library/ms141036.aspx