API Sample – Lookup Transform

API Sample – Lookup Transform

  • Comments 6

This sample creates a data flow package with an OLEDB Source component feeding into a Lookup Transform. The Lookup transform is set to Full Cache mode, and uses [DimCustomer] as its reference table.

Items of interest:

  • CustomerKey and GeographyKey are used as the index (join) columns. This is configured by using the JoinToReferenceColumn property
  • The FirstName column is being overwritten by the value retrieved by the lookup transform
  • The LastName2 column is being added as a new output column
static void Main(string[] args)
{
    Package package = new Package();

    // Add Data Flow Task
    Executable dataFlowTask = package.Executables.Add("STOCK:PipelineTask");

    // Set the name (otherwise it will be a random GUID value)
    TaskHost taskHost = dataFlowTask as TaskHost;
    taskHost.Name = "Data Flow Task";

    // We need a reference to the InnerObject to add items to the data flow
    MainPipe pipeline = taskHost.InnerObject as MainPipe;

    //
    // Add connection manager
    //

    ConnectionManager connection = package.Connections.Add("OLEDB");
    connection.Name = "localhost";
    connection.ConnectionString = "Data Source=localhost;Initial Catalog=AdventureWorksDW2008;Provider=SQLNCLI10.1;Integrated Security=SSPI;Auto Translate=False;";

    //
    // Add OLEDB Source
    //

    IDTSComponentMetaData100 srcComponent = pipeline.ComponentMetaDataCollection.New();
    srcComponent.ComponentClassID = "DTSAdapter.OleDbSource";
    srcComponent.ValidateExternalMetadata = true;
    IDTSDesigntimeComponent100 srcDesignTimeComponent = srcComponent.Instantiate();
    srcDesignTimeComponent.ProvideComponentProperties();
    srcComponent.Name = "OleDb Source";

    // Configure it to read from the given table
    srcDesignTimeComponent.SetComponentProperty("AccessMode", 0);
    srcDesignTimeComponent.SetComponentProperty("OpenRowset", "[DimCustomer]");

    // Set the connection manager
    srcComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
    srcComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

    // Retrieve the column metadata
    srcDesignTimeComponent.AcquireConnections(null);
    srcDesignTimeComponent.ReinitializeMetaData();
    srcDesignTimeComponent.ReleaseConnections();

    // Add transform
    IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();
    lookupComponent.ComponentClassID = "DTSTransform.Lookup";
    lookupComponent.Name = "Lookup";

    CManagedComponentWrapper lookupWrapper = lookupComponent.Instantiate();
    lookupWrapper.ProvideComponentProperties();

    // Connect the source and the transform
    IDTSPath100 path = pipeline.PathCollection.New();
    path.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);

    //
    // Configure the transform
    //

    // Set the connection manager
    lookupComponent.RuntimeConnectionCollection[0].ConnectionManager = DtsConvert.GetExtendedInterface(connection);
    lookupComponent.RuntimeConnectionCollection[0].ConnectionManagerID = connection.ID;

    // Cache Type - Full = 0, Partial = 1, None = 2
    lookupWrapper.SetComponentProperty("CacheType", 0);
    lookupWrapper.SetComponentProperty("SqlCommand", "select * from [DimCustomer]");

    // initialize metadata
    lookupWrapper.AcquireConnections(null);
    lookupWrapper.ReinitializeMetaData();
    lookupWrapper.ReleaseConnections();

    // Mark the columns we are joining on
    IDTSInput100 lookupInput = lookupComponent.InputCollection[0];
    IDTSInputColumnCollection100 lookupInputColumns = lookupInput.InputColumnCollection;
    IDTSVirtualInput100 lookupVirtualInput = lookupInput.GetVirtualInput();
    IDTSVirtualInputColumnCollection100 lookupVirtualInputColumns = lookupVirtualInput.VirtualInputColumnCollection;

    // We are joining on CustomerKey and GeographyKey
    // Note: join columns should be marked as READONLY
    var joinColumns = new string[] { "CustomerKey", "GeographyKey" };
    foreach (string columnName in joinColumns)
    {
        IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
        IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);
        lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);
    }
    
    // Overwrite the existing FirstName column value with the one returned by the Lookup.
    // To do this, we need to flag the column as READWRITE, and set the CopyFromReferenceColumn property on the input
    var overwriteColumns = new string[] { "FirstName" };
    foreach (string columnName in overwriteColumns)
    {
        IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];
        IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READWRITE);

        lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "CopyFromReferenceColumn", columnName);
    }

    // First output is the Match output
    IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];

    // Add a new LastName2 column from the "LastName" column returned by the lookup
    var newColumns = new Dictionary<string, string>();
    newColumns.Add("LastName", "LastName2");

    foreach (string sourceColumn in newColumns.Keys)
    {
        string newColumnName = newColumns[sourceColumn];
        string description = string.Format("Copy of {0}", sourceColumn);

        // insert the new column
        IDTSOutputColumn100 outputColumn = lookupWrapper.InsertOutputColumnAt(lookupMatchOutput.ID, 0, newColumnName, description);
        lookupWrapper.SetOutputColumnProperty(lookupMatchOutput.ID, outputColumn.ID, "CopyFromReferenceColumn", sourceColumn);
    }
}
Leave a Comment
  • Please add 1 and 2 and type the answer here:
  • Post
  • This is an index post for the series of posts with examples on how to create packages programmatically.

  • The SQL Server Integration Services team added valuable new caching options (and scalability) to the

  • 这篇文章中我们将继续和大家分享一些使用SSIS API构建SSIS包的示例 1. 构建一个Row Count转换 下面的代码可以构建一个包含OLEDB数据源和Row Count转换(包含在一个数据流任务重)的SSIS包。其中Row

  • This looks to me like you're joining the column to itself.

    foreach (string columnName in joinColumns)

       {

           IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];

           IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

           lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);

       }

    I'm getting it to work by simply looking up the column that's coming from the source and then linking to the column name in my lookup component.

    IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns["col1"];

               IDTSInputColumn100 inputColumn = patlookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

               patlookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", "col2");

  • Hi Matt,

    First, I must say this is a very nice post but unfortunately my code is not working (even though I followed the steps mentioned here).

    My Problem Statement: I want to move data from one table to other table and at the same also avoid insertion of duplicate records is destination table. To avoid duplicate records to insert I am using LookUp component but it's not working.

    Line 'IDTSOutput100 lookupMatchOutput = lookupComponent.OutputCollection[0];' doesn't return anything.

    Below is the code written for LookUP transform which basically I am using to ignore duplicate rows to insert in DB again.

               IDTSComponentMetaData100 lookupComponent = pipeline.ComponentMetaDataCollection.New();

               ...

               lookUpPath.AttachPathAndPropagateNotifications(srcComponent.OutputCollection[0], lookupComponent.InputCollection[0]);

               ...

               // Cache Type - Full = 0, Partial = 1, None = 2

               lookupWrapper.SetComponentProperty("CacheType", 0);

               lookupWrapper.SetComponentProperty("NoMatchBehavior", 1);// 1= Redirect rows to No Match output

               lookupWrapper.SetComponentProperty("SqlCommand", "select Name, ProductNumber from [dbo].[TestTableDest]");

               // initialize metadata

               ...

               // Mark the columns we are joining on

              ...

               // Note: join columns should be marked as READONLY

               var joinColumns = new string[] { "Name", "ProductNumber" };

               foreach (string columnName in joinColumns)

               {

                   IDTSVirtualInputColumn100 virtualColumn = lookupVirtualInputColumns[columnName];

                   IDTSInputColumn100 inputColumn = lookupWrapper.SetUsageType(lookupInput.ID, lookupVirtualInput, virtualColumn.LineageID, DTSUsageType.UT_READONLY);

                   lookupWrapper.SetInputColumnProperty(lookupInput.ID, inputColumn.ID, "JoinToReferenceColumn", columnName);

               }

               // Second output is the Un-Match output

               IDTSOutput100 lookupNoMatchOutput = lookupComponent.OutputCollection[1];

    But here lookupNoMatchOutput  doesn't return any column. Any help on this would be appreciated.

    Note: '...' means same code as explained in your post only. i can not post the entire code becoz of content length restriction.

  • Found the problem:

    " IDTSOutput100 lookupNoMatchOutput = lookupComponent.OutputCollection[1];" will not list columns to map with Oledb/ado.net Destination. To do this mapping we have to use source output collection only.

    Below is the final Oledb destination column mapping code I wrote:

    IDTSPath100 path = pipeline.PathCollection.New();

               path.AttachPathAndPropagateNotifications(lookupNoMatchOutput, destComponent.InputCollection[0]);

               // Configure the destination

               IDTSInput100 destInput = destComponent.InputCollection[0];

               IDTSVirtualInput100 destVirInput = destInput.GetVirtualInput();

               IDTSInputColumnCollection100 destInputCols = destInput.InputColumnCollection;

               IDTSExternalMetadataColumnCollection100 destExtCols = destInput.ExternalMetadataColumnCollection;

               IDTSOutputColumnCollection100 sourceColumns = srcComponent.OutputCollection[0].OutputColumnCollection;

               // The OLEDB destination requires you to hook up the external columns

               foreach (IDTSOutputColumn100 outputCol in sourceColumns)

               {

                   // Get the external column id

                   IDTSExternalMetadataColumn100 extCol = (IDTSExternalMetadataColumn100)destExtCols[outputCol.Name];

                   if (extCol != null)

                   {

                       // Create an input column from an output col of previous component.

                       destVirInput.SetUsageType(outputCol.ID, DTSUsageType.UT_READONLY);

                       IDTSInputColumn100 inputCol = destInputCols.GetInputColumnByLineageID(outputCol.ID);

                       if (inputCol != null)

                       {

                           // map the input column with an external metadata column

                           destDesignTimeComponent.MapInputColumn(destInput.ID, inputCol.ID, extCol.ID);

                       }

                   }

               }

    Note: Excuse me for typo mistakes in my earlier reply post or if is there any in this post.

Page 1 of 1 (6 items)