Paolo Salvatori's Blog

Adventures in the magic world of Windows Azure

How to create a custom WCF Channel that debatches an inbound message

How to create a custom WCF Channel that debatches an inbound message

Rate This
  • Comments 2

Introduction

In the following post I’ll explain in detail a demo that I presented at TechReady 9 and TechEd 2009 in Berlin.

The Problem

Consider the following scenario:

Your BizTalk application receives an XML request message through a two-way Receive Location (e.g. WCF, SOAP, HTTP). The inbound document contains multiple elements. For each item contained in the request message, the application needs to synchronously invoke a downstream WCF service that returns a response message. The application needs to process all the elements contained in the request document as a unit of work and aggregate the result of individual calls in a single response message that will be returned to the original caller.

Let’s make an example to better scope the problem. Let’s assume that our BizTalk application is exposed via a two-way WCF Receive Location and receives a request message, as the one shown below, containing multiple Operation elements.

CalculatorRequest Message

<CalculatorRequest xmlns="http://Microsoft.BizTalk.CAT.Samples.CalculatorRequest">
  <Method>XmlDocumentOrchestration</Method>
  <Operations>
    <Operation>
      <Operator>+</Operator>
      <Operand1>82</Operand1>
      <Operand2>18</Operand2>
    </Operation>
    <Operation>
      <Operator>-</Operator>
      <Operand1>30</Operand1>
      <Operand2>12</Operand2>
    </Operation>
    <Operation>
      <Operator>*</Operator>
      <Operand1>25</Operand1>
      <Operand2>8</Operand2>
    </Operation>
    <Operation>
      <Operator>\</Operator>
      <Operand1>100</Operand1>
      <Operand2>25</Operand2>
    </Operation>
      <Operation>
      <Operator>+</Operator>
      <Operand1>100</Operand1>
      <Operand2>32</Operand2>
    </Operation>
  </Operations>
</CalculatorRequest>

 

For each Operation element contained in the inbound XML document the application has to read the child Operator element and accordingly invoke one of the four methods (Add, Subtract, Multiply, Divide) exposed by the downstream WCF service. This latter is a variation of the CalculatorService that can be found on MSDN between the WCF samples. The return value of the individual calls to the CalculatorService must be aggregated in a single response message, as the one shown below. This latter will be finally returned to the original caller.

CalculatorResponse Message

<CalculatorResponse xmlns="http://Microsoft.BizTalk.CAT.Samples.CalculatorResponse">
      <Status>Ok</Status>
      <Results>
            <Result>
                  <Value>100</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>18</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>200</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>4</Value>
                  <Error>None</Error>
            </Result>
            <Result>
                  <Value>132</Value>
                  <Error>None</Error>
            </Result>
      </Results>
</CalculatorResponse>

 

The easier way to solve the above problem is to create a BizTalk application that exploits an orchestration to implement the Scatter-Gather design pattern. The orchestration in question should perform the following steps:

  • Receive the request message.
  • Extract and loop through the individual Operation elements, for example using the XPath function.
  • Use a Decide shape with 4 branches, one for each possible value of the Operator element.
  • Within each branch, apply a transformation map and invoke one of the 4 Operations (Add, Subtract, Multiply, Divide) exposed by a two-way Solicit-Response Logical Port.
  • Receive the response message from the underlying WCF service.
  • At each iteration, save the result in a collection variable.
  • Create and return a response message containing results.

The following graph depicts the architecture of the solution described above. In the remainder of this post, I’ll refer to this solution as LogicalPortsOrchestration.

Image1

Message Flow:

  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from a Test Agent.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the LogicalPortsOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “LogicalPortsOrchestration”.
  4. The LogicalPortsOrchestration uses a loop to retrieve operations and for each item it invokes the downstream Calculator WCF service using a Logical Solicit-Response Port bound to a Physical Solicit-Response WCF-BasicHttp Send Port. The request message for the Calculator WCF web service is created using an helper component and published to the (BizTalkMsgBoxDb).
  5. The request message is consumed by a WCF-BasicHttp Send Port.
  6. The WCF-BasicHttp Send Port invokes one of the methods (Add, Subtract, Multiply, Divide) exposed by the Calculator WCF service.
  7. The Calculator WCF service returns a response message.
  8. The response message is published to the MessageBox (BizTalkMsgBoxDb).
  9. The response message is returned to the caller LogicalPortsOrchestration. The orchestration repeats this pattern for each operation within the inbound CalculatorRequest xml document.
  10. The LogicalPortsOrchestration creates and publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  11. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  12. The response message is returned to the Test Agent.

The solution described above works fine but as you can easily note it requires 4 distinct roundtrips to the MessageBox for each individual call to the underlying WCF service and therefore this architectural approach is not adequate when performance goals in terms of latency and throughput are very aggressive. More in general, whenever possible you should minimize the use of orchestrations and privilege messaging only patterns to increase the overall throughput and reduce the latency of the business processes. However, the Gather-Scatter and Service Composition are 2 scenarios that can be easily implemented using an orchestration, so the question is: how can I solve the same problem using a different pattern?

An alternative solution could be using the Inline Send design pattern to solve the problem: in this case to invoke the underlying CalculatorService our orchestration would not use a Logical Port bound to Physical WCF Send Port,  but it rather use a custom WCF proxy component within an Expression shape that would replace the Send and Receive shapes. Note that not using Adapters and Physical Send Ports, the application would not benefit from their functional capabilities such as batching, retries, correlation sets initialization, declarative configuration and secondary transports. The advantage of the Inline Send technique is that for each Solicit Response call made by the orchestration, it allows to eliminate 4 roundtrips to the MessageBox:

  • Orchestration Logical Port --> Request --> MessageBox
  • MessageBox --> Request --> Physical Send Port
  • Physical Send Port --> Response --> MessageBox
  • MessageBox --> Response --> Orchestration Logical Port

The following graph depicts the architecture of the solution based on the Inline Send pattern. In the remainder of this post, I’ll refer to this solution as InlineSendOrchestration.

Image2

Message Flow:

  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from a Test Agent.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request starts a new instance of the InlineSendOrchestration. This latter uses a Direct Port receive the CalculatorRequest messages with the Method promoted property = “InlineSendOrchestration”.
  4. The InlineSendOrchestration uses a loop to retrieve operations and for each item it invokes the downstream Calculator WCF service using a generic WCF proxy component called InlineSendProxy. The endpoint used by this object must be configured in the BtsNtSvc.exe.config and BtsNtSvc64.exe.config configurations files. See the BtsNtSvc.exe.config file in the Setup folder for a sample of how configuring this component to access the downstream Calculator WCF web service.
  5. The InlineSendProxy component directly invokes one of the methods (Add, Subtract, Multiply, Divide) exposed by the Calculator WCF service.
  6. The Calculator WCF service returns a response message.
  7. The response message is written in the outbound CalculatorResponse document using an XmlWriter and a VirtualStream objects. The orchestration repeats this pattern for each operation within the inbound CalculatorRequest xml document.
  8. The InlineSendOrchestration publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  9. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  10. The response message is returned to the Test Agent.

In February 2009, the BizTalk Customer Advisory Team conducted a performance lab to determine and compare the performance in terms of latency and throughput of the LogicalPortsOrchestration and InlineSendOrchestration patterns. To this purpose, we used a lab rig composed of 2 BizTalk Server 2009 (Beta version) nodes (2 Dual-Core CPUs, 8 GB RAM), a SQL Server 2008 machine to host the MessageBox. The following table reports the results obtained using a request message that contained 5 distinct Operations:

Test Case Concurrent Test Client Users Messages/Second Average Response Time (Sec)
LogicalPortsOrchestration 100 60.64 1.61
InlineSendOrchestration 100 236.96 0.24

 

As you can easily note reviewing the numbers reported in the table above, the InlineSendOrchestration implementation provides better performance than the LogicalPortsOrchestration solution, both in terms of latency and throughput. Therefore, a question arises naturally: should I renounce to the capabilities provided by Adapters and apply the Inline Send pattern to debatch an incoming message and implement the Scatter-Gather pattern in a performant way? Fortunately, the answer is no as there’s another architectural approach to solve the problem using a messaging-only approach that does not require any orchestration.

The following picture depicts the architecture of the third solution that I’ll refer to as MessagingOnly. Unfortunately, this solution was developed after the performance lab, so no comparative tests were conducted to compare its latency and throughput with the 2 preceding implementations. Nevertheless, at the end of this article I provide a link to the code of the 3 implementations, so you can eventually run your own tests and eventually customize the solution to fit your needs.

Image3

Message Flow:

  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from a Test Agent.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request is consumed by the POG.MessagingOnly.WCF-Custom.SendPort WCF-Custom Send Port. This latter uses a Filter Expression to receive CalculatorRequest messages with the Method promoted property = “MessagingOnly”.
  4. The WCF-Custom Send Port uses a Custom Binding which contains the following binding elements: CalculatorServiceBindingElement, TextMessageEncodingBindingElement, HttpTransportBindingElement. The custom CalculatorServiceBindingElement at runtime creates a ServiceFactoryChannelFactory object that in turn creates a CalculatorServiceRequestChannel object. The Request method of this latter debatches the Operation elements contained in the inbound CalculatorRequest message, and for each of them  makes a separate call to the downstream CalculatorService using the reference to the inner HttpTransportChannel.
  5. The CalculatorService returns a response message. The custom channel executed by the WCF Adapter repeats this pattern for each Operation element within the inbound CalculatorRequest xml document. The custom channel stores the return value of individual calls in a collection variable and finally uses results to generate the CalculatorResponse message (Scatter-Gather pattern) that is finally returned by the WCF-Custom Adapter.
  6. The POG.MessagingOnly.WCF-Custom.SendPort WCF-Custom Send Port publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  7. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  8. The response message is returned to the Test Agent.

As I have noted in my previous article, WCF Bindings provide a mechanism for configuring channel stacks. In other words, a binding defines a precise recipe for building a channel stack using a transport channel, a message encoder, and a set of protocol channels. WCF ships with several built-in bindings that target common communication scenarios, such as the BasicHttpBinding, WsHttpBinding and NetTcpBinding and BizTalk Server provides a full range of WCF Adapters that correspond 1:1 to the most commonly used WCF bindings:

  • WCF-BasicHttp
  • WCF-WSHttp
  • WCF-NetTcp
  • etc.

However, if you need maximum flexibility and you need to use one or multiple custom protocol channels at runtime, you can use the CustomBinding that gives you the possibility to control which binding elements compose your binding. Likewise, in a BizTalk application, if you want to extend the default behavior of the WCF Adapters with a custom component (Service Behavior, Endpoint Behavior, Custom Binding, Custom Binding Element), you have to use the WCF-Custom and/or WCF-CustomIsolated Adapters. In fact, these latter offer you complete control over the channel stack and behaviors configuration, and as a consequence, they are the only WCF adapters you really need. Compared with the other WCF Adapters, they are the only ones to provide the possibility to:

  • Implement and exploit extensibility points.
  • Have full access to properties exposed by bindings/behaviors.
  • Enable the use of the bamInterceptor endpoint behavior.
  • Export/Import the binding configuration.
  • Disable a receive location on failure.
  • Run an http-based RL within an in-process host.
  • Use bindings (e.g. wsDualHttpBinding) for which a WCF Adapter does not exist.

The Code

Before looking at the code of the custom channel, let’s complicate the scenario as follows: let’s assume that the underlying WCF Calculator service invoked by the BizTalk application is hosted by IIS 7.0 and exposes an endpoint that uses a CustomBinding composed of the following binding elements:

  • TransactionFlowBindingElement
  • TextMessageEncodingBindingElement
  • HttpTransportBindingElement

The following picture reports the web.config used by the Calculator service:

Web.config

<?xml version="1.0" encoding="utf-8" ?>
<configuration>
    <connectionStrings>
        <add name="LogDB"
             providerName="System.Data.SqlClient"
             connectionString="Data Source=.;Initial Catalog=LogDB;Integrated Security=SSPI;" />
    </connectionStrings>

    <system.diagnostics>
        <sources>
            <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing">
                <listeners>
                    <add type="System.Diagnostics.DefaultTraceListener" name="Default">
                        <filter type="" />
                    </add>
                    <add name="ServiceModelMessageLoggingListener">
                        <filter type="" />
                    </add>
                </listeners>
            </source>
        </sources>
        <sharedListeners>
            <add initializeData="C:\calculatorservicetxn.svclog"
                 type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
                 name="ServiceModelMessageLoggingListener"
                 traceOutputOptions="Timestamp">
                <filter type="" />
            </add>
        </sharedListeners>
    </system.diagnostics>
    
    <system.serviceModel>    
        <diagnostics performanceCounters="All">
            <messageLogging logEntireMessage="false"
                            logMalformedMessages="false"
                            logMessagesAtServiceLevel="false"
                            logMessagesAtTransportLevel="false"
                            maxSizeOfMessageToLog="1000000" />
        </diagnostics>

    <!-- Service Endpoints -->
        <services>
            <service behaviorConfiguration="CalculatorServiceBehavior"
                     name="Microsoft.ServiceModel.Samples.CalculatorService">
                <endpoint address=""
                          binding="customBinding"
                          bindingConfiguration="customBinding"
                          name="customBinding"
                          contract="Microsoft.ServiceModel.Samples.ICalculator" />
            </service>
        </services>
        
    <!-- Binding Condfiguration -->
        <bindings>
            <customBinding>
                <binding name="customBinding">
                    <transactionFlow transactionProtocol="WSAtomicTransactionOctober2004" />
                    <textMessageEncoding />
                    <httpTransport />
                </binding>
            </customBinding>
        </bindings>

        <!-- Service Behaviors -->
        <behaviors>
            <serviceBehaviors>
                <behavior name="CalculatorServiceBehavior">
                    <serviceMetadata httpGetEnabled="true" />
                    <serviceDebug includeExceptionDetailInFaults="true" />
                    <serviceThrottling maxConcurrentCalls="200" 
                                       maxConcurrentSessions="200"
                                       maxConcurrentInstances="200" />
                </behavior>
            </serviceBehaviors>
        </behaviors>
    </system.serviceModel>
    
    <system.web>
        <customErrors mode="RemoteOnly"/>
        <trust level="Full" />
    </system.web>
</configuration>

 

The Calculator service requires the client application to create and flow a transaction using the WS-AtomicTransaction protocol. In particular, the WCF service will use this client-initiated transaction to log the data of the invoked operation to a SQL database. To implement this behavior, all the service operations exposed by the ICalculatorService contract interface have been decorated with the TransactionFlowAttribute. In particular, the TransactionFlowOption.Mandatory value has been used in the constructor of the TransactionFlow attribute to indicate that the client application must initiate and transmit a transaction, otherwise the service operation will immediately throw an exception.

In the CalculatorService class, all the service operations have been decorated with the OperationBehaviorAttribute. In particular, the value of the TransactionScopeRequired and TransactionAutoComplete properties has been set to true. This indicates that service operations require a transaction scope for their execution and this latter automatically completes if no unhandled exceptions occur and the method returns successfully.
All the service operations invoke the RecordToLog method that logs the current operation data and transactionId to the LogDB on SQL Server. This sample on MSDN demonstrates various aspects of creating a transactional service and the use of a client-initiated transaction to coordinate service operations.

Note in particular that the Transaction.Current.TransactionInformation.DistributedIdentifier static property is used by the RecordToLog method to retrieve the unique identifier of the client-initiated distributed transaction.

CalculatorService Class

#region Using Directives
using System;
using System.Diagnostics;
using System.ServiceModel;
using System.Transactions;
using System.Configuration;
using System.Data;
using System.Data.SqlClient;
using System.Data.SqlTypes;
#endregion

namespace Microsoft.ServiceModel.Samples
{
    // Define a service contract.
    [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")]
    public interface ICalculator
    {
        #region Contract Operations
        [OperationContract]
        [TransactionFlow(TransactionFlowOption.Mandatory)]
        double Add(double n1, double n2);

        [OperationContract]
        [TransactionFlow(TransactionFlowOption.Mandatory)]
        double Subtract(double n1, double n2);

        [OperationContract]
        [TransactionFlow(TransactionFlowOption.Mandatory)]
        double Multiply(double n1, double n2);

        [OperationContract]
        [TransactionFlow(TransactionFlowOption.Mandatory)]
        double Divide(double n1, double n2);
        #endregion
    }

    // Service class which implements the service contract.
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single)]
    public class CalculatorService : ICalculator
    {
        #region Private Constants
        private const string AttemptedToDivideByZero = "Attempted to divide by zero.";
        private const string TransactionIdFormat = "[CalculatorService] TransactionId: {0}";
        private const string CallCompletedFormat = "[CalculatorService] Message [{0}] stored to LogDB.";
        private const string MessageFormat = "[CalculatorService] {0}";
        private const string InsertLogEntrySP = "usp_InsertLogEntry";
        private const string IdParameter = "@Id";
        private const string MessageParameter = "@Message";
        private const string TypeParameter = "@Type";
        private const string SourceParameter = "@Source";
        private const string TransactionIdParameter = "@TransactionID";
        private const string Information = "Information";
        private const string Source = "CalculatorService";
        private const string None = "None";
        private const string AddFormat = "{0} + {1} = {2}";
        private const string SubtractFormat = "{0} - {1} = {2}";
        private const string MultiplyFormat = "{0} * {1} = {2}";
        private const string DivideFormat = "{0} / {1} = {2}";
        #endregion

        #region Private Static Fields
        private static string connectionString = null;
        #endregion

        #region Static Constructor
        static CalculatorService()
        {
            try
            {
                if (ConfigurationManager.ConnectionStrings != null &&
                    ConfigurationManager.ConnectionStrings.Count > 0)
                {
                    connectionString = ConfigurationManager.ConnectionStrings["LogDB"].ConnectionString;
                }
            }
            catch (Exception ex)
            {
                Trace.WriteLine(string.Format(MessageFormat, ex.Message));
            }
        }
        #endregion

        #region Public Methods
        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public double Add(double n1, double n2)
        {
            double n3 = n1 + n2;
            RecordToLog(String.Format(AddFormat, n1, n2, n3));
            return n3;
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public double Subtract(double n1, double n2)
        {
            double n3 = n1 - n2;
            RecordToLog(String.Format(SubtractFormat, n1, n2, n3));
            return n3;
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public double Multiply(double n1, double n2)
        {
            double n3 = n1 * n2;
            RecordToLog(String.Format(MultiplyFormat, n1, n2, n3));
            return n3;
        }

        [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)]
        public double Divide(double n1, double n2)
        {
            if (n2 == 0)
            {
                Trace.WriteLine(AttemptedToDivideByZero);
                throw new DivideByZeroException();
            }
            double n3 = n1 / n2;
            RecordToLog(String.Format(DivideFormat, n1, n2, n3));
            return n3;
        }

        #endregion

        #region Private Static Methods
        private static void RecordToLog(string message)
        {
            try
            {
                if (!string.IsNullOrEmpty(connectionString))
                {
                    string transactionId = None;
                    if (Transaction.Current != null &&
                        Transaction.Current.TransactionInformation != null)
                    {
                        transactionId = Transaction.Current.TransactionInformation.DistributedIdentifier.ToString();
                    }
                    Trace.WriteLine(string.Format(TransactionIdFormat, transactionId));
                    using (SqlConnection sqlConnection = new SqlConnection(connectionString))
                    {
                        sqlConnection.Open();
                        SqlParameter sqlParameter = null;

                        using (SqlCommand sqlCommand = new SqlCommand(InsertLogEntrySP, sqlConnection))
                        {
                            sqlParameter = new SqlParameter(IdParameter, SqlDbType.UniqueIdentifier);
                            sqlParameter.Direction = ParameterDirection.Input;
                            sqlParameter.Value = Guid.NewGuid();
                            sqlCommand.Parameters.Add(sqlParameter);

                            sqlParameter = new SqlParameter(MessageParameter, SqlDbType.VarChar, 512);
                            sqlParameter.Direction = ParameterDirection.Input;
                            sqlParameter.Value = message.Length <= 512 ? message : message.Substring(0, 512);
                            sqlCommand.Parameters.Add(sqlParameter);

                            sqlParameter = new SqlParameter(TypeParameter, SqlDbType.VarChar, 64);
                            sqlParameter.Direction = ParameterDirection.Input;
                            sqlParameter.Value = Information;
                            sqlCommand.Parameters.Add(sqlParameter);

                            sqlParameter = new SqlParameter(SourceParameter, SqlDbType.VarChar, 64);
                            sqlParameter.Direction = ParameterDirection.Input;
                            sqlParameter.Value = Source;
                            sqlCommand.Parameters.Add(sqlParameter);

                            sqlParameter = new SqlParameter(TransactionIdParameter, SqlDbType.VarChar, 64);
                            sqlParameter.Direction = ParameterDirection.Input;
                            sqlParameter.Value = transactionId;
                            sqlCommand.Parameters.Add(sqlParameter);

                            sqlCommand.CommandType = CommandType.StoredProcedure;
                            sqlCommand.ExecuteNonQuery();

                            Trace.WriteLine(string.Format(CallCompletedFormat, message));
                        }                        
                    }
                }
            }
            catch (Exception ex)
            {
                Trace.WriteLine(string.Format(MessageFormat, ex.Message));
            }
        }
        #endregion
    }
}

 

Now let’s assume that all the operations contained in a CalculatorRequest document must be processed as a unit of work within the same distributed transaction.  In this case, the BizTalk application will have to perform the following steps:

  • Initiate a MSDTC transaction.
  • Debatch the incoming CalculatorRequest document to extract individual Operation elements.
  • For each Operation, create a request message and invoke the corresponding operation exposed by the CalculatorService, making sure to flow the distributed transaction as part of each service call.

The following graph depicts the architecture of the described solution.

Image4

Message Flow:

  1. A WCF-BasicHttp or WCF-Custom Request-Response Receive Location receives a new CalculatorRequest xml document from a Test Agent.
  2. The XML disassembler component within the XMLTransmit pipeline promotes the Method element inside the CalculatorRequest xml document. The Message Agent submits the incoming message to the MessageBox (BizTalkMsgBoxDb).
  3. The inbound request is consumed by the POG.MessagingOnly.WCF-Custom.SendPort WCF-Custom Send Port. This latter uses a Filter Expression to receive CalculatorRequest messages with the Method promoted property = “MessagingOnly”.
  4. The WCF-Custom Send Port uses a Custom Binding which contains the following binding elements: TransactionFlowBindingElement, CalculatorServiceBindingElement, TextMessageEncodingBindingElement, HttpTransportBindingElement. The TransactionFlowBindingElement at runtime creates a TransactionFlowChannel that creates the CoordinationContext header that is used to coordinate the distributed transaction across BizTalk and the WCF service. The custom CalculatorServiceBindingElement at runtime creates a ServiceFactoryChannelFactory object that in turn creates a CalculatorServiceRequestChannel object. The Request method of this latter debatches the Operation elements contained in the inbound CalculatorRequest message, and for each of them  creates a new WCF message, copies to it the CoordinationContext header from the CalculatorRequest message, and makes a separate call to the downstream CalculatorService using the reference to the inner HttpTransportChannel.
  5. The CalculatorService logs the operation data to the LogDB on SQL Server using the BizTalk-initiated transaction.
  6. The CalculatorService returns a response message. The custom channel executed by the WCF Adapter repeats this pattern for each Operation element within the inbound CalculatorRequest xml document. The custom channel stores the return value of individual calls in a collection variable and finally uses results to generate the CalculatorResponse message (Scatter-Gather pattern) that is finally returned by the WCF-Custom Adapter.
  7. The POG.MessagingOnly.WCF-Custom.SendPort WCF-Custom Send Port publishes the CalculatorResponse message to the MessageBox (BizTalkMsgBoxDb).
  8. The response message is retrieved by the Request-Response WCF-BasicHttp or WCF-Custom Receive Location.
  9. The response message is returned to the Test Agent.

On the left hand side, the following picture shows the custom binding used by the WCF-Custom Send Port, while on the right hand side, it describes more in detail the individual steps performed by the WCF-Custom Send Port.

Image5

Message Flow:

  1. The BizTalk Messaging Runtime passes the CalculatorRequest xml document to the WCF-Custom Adapter.
  2. The WCF-Custom Adapter generates a WCF Message instance from the IBaseMessage object. The WCF Message contains a SOAP Envelope and in particular the Body element contains the CalculatorRequest as payload.
  3. The TransactionRequestChannel creates the CoordinationContext header that is used to coordinate the distributed transaction across BizTalk and the CalculatorService.
  4. The CalculatorServiceRequestChannel debatches the first operation element contained in the inbound CalculatorRequest message, copies the CoordinationContext header from the original batch message to the new message and makes a separate call to the downstream CalculatorService using the reference to the inner HttpTransportChannel.
  5. The CalculatorServiceRequestChannel stores the return value of the first call in a collection.
  6. The CalculatorServiceRequestChannel debatches the second operation element contained in the inbound CalculatorRequest message, copies the CoordinationContext header from the original batch message to the new message and makes a separate call to the downstream CalculatorService using the reference to the inner HttpTransportChannel. The CalculatorServiceRequestChannel  repeats the same pattern for the remaining operation items contained in the inbound batch message.
  7. The CalculatorServiceRequestChannel stores the return value of the second call in a collection.
  8. The CalculatorServiceRequestChannel finally uses the result value of individual calls to generate the CalculatorResponse message (Scatter-Gather pattern) that is finally returned by the WCF-Custom Adapter to the BizTalk Messaging Runtime.

Let’s look at the code. To create a custom protocol channel to debatch the request message at runtime within the WCF Send Port I built a Class Library called WCFExtensionLibrary composed of multiple components. As we noted in the first part of this article, on the sending side, a binding is used to build a IChannelFactory, which in turn builds a channel stack and returns a reference to the top channel in the stack. The application can then use this channel to send messages. A binding consists of an ordered set of binding elements that inherit from the BindingElement class. Each binding element at runtime creates a IChannelFactory, that in turn creates a channel that can be used to process the message in a given point of the channel stack. BizTalk developers can see the protocol channels that compose a channel stack as the pipeline components within a pipeline on a Receive Location or Send Port, and the transport channel as the transport Adapter used to receive or send a message. The only difference between the 2 approaches is that to create a custom pipeline component it’s necessary to create a single class, while to create a custom channel at runtime it’s necessary to create the multiple classes. Let’s see in detail the components that I created to register, configure, and execute my custom channel within the WCF-Custom Send Port.

CalculatorServiceBindingElement Class

This class inherits from the BindingElement class. This custom binding element exposes the following properties:

  • TraceEnabled: Gets or Sets a value indicating whether tracing is enabled.
  • MaxBufferSize: Gets or sets the maximum size for a buffer used the custom channel when creating a WCF message.
  • MessageVersion: Gets or sets the SOAP message and WS-Addressing versions that are expected by the target WCF service. This information is used at runtime by the custom channel to specify the version of request messages to send to the CalculatorService.

The BuildChannelFactory method initializes the CalculatorServiceChannelFactory that is responsible for creating an instance of the CalculatorServiceRequestChannel at runtime.

CalculatorServiceBindingElement Class

#region Using Directives
using System;
using System.ServiceModel.Channels;
using System.ServiceModel; 
#endregion

namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary
{
    public class CalculatorServiceBindingElement : BindingElement
    {
        #region Private Fields
        private bool traceEnabled = true;
        private int maxBufferSize = 2097152;
        private MessageVersionEnum messageVersion = MessageVersionEnum.Default;
        #endregion

        #region Public Constructors
        public CalculatorServiceBindingElement()
        {
        }

        protected CalculatorServiceBindingElement(CalculatorServiceBindingElement other)
            : base(other)
        {
            this.traceEnabled = other.traceEnabled;
            this.maxBufferSize = other.maxBufferSize;
            this.messageVersion = other.messageVersion;
        } 
        #endregion

        #region Public Methods
        public override BindingElement Clone()
        {
            return new CalculatorServiceBindingElement(this);
        }

        public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
        {
            return context.CanBuildInnerChannelFactory<TChannel>();
        }

        public override bool CanBuildChannelListener<TChannel>(BindingContext context)
        {
            return context.CanBuildInnerChannelListener<TChannel>();
        }

        public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
        {
            return new CalculatorServiceChannelFactory<TChannel>(context, traceEnabled, maxBufferSize, messageVersion);
        }

        public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
        {
            return new CalculatorServiceChannelListener<TChannel>(context, traceEnabled, maxBufferSize, messageVersion);
        }

        public override T GetProperty<T>(BindingContext context)
        {
            return context.GetInnerProperty<T>();
        } 
        #endregion

        #region Public Properties
        public bool TraceEnabled
        {
            get
            {
                return this.traceEnabled;
            }
            set
            {
                this.traceEnabled = value;
            }
        }

        public int MaxBufferSize
        {
            get
            {
                return this.maxBufferSize;
            }
            set
            {
                this.maxBufferSize = value;
            }
        }

        public MessageVersionEnum MessageVersion
        {
            get
            {
                return this.messageVersion;
            }
            set
            {
                this.messageVersion = value;
            }
        }
        #endregion
    }
}

 

CalculatorServiceBindingExtensionElement Class

This class inherits from the BindingElementExtensionElement class. In general, a BindingElementExtensionElement enables the use of a custom BindingElement implementation from a machine or application configuration file. In our case, the CalculatorServiceBindingExtensionElement has been created to achieve two results:

  • Register the CalculatorServiceBindingElement in the machine.config to make it visible to BizTalk when defining a WCF-Custom/WCF-CustomIsolated Receive Location or a WCF-Custom Send Port.
  • Define the properties that can be set in a declarative way when configuring a WCF-Custom/WCF-CustomIsolated Receive Location or a WCF-Custom Send Port.

The CalculatorServiceBindingExtensionElement class exposes the same properties as the CalculatorServiceBindingElement. The CreateBindingElement method creates an instance of CalculatorServiceBindingElement class and assign a value to its properties. In practice, the following classes: 

  • CalculatorServiceBindingExtensionElement
  • CalculatorServiceBindingElement
  • CalculatorServiceChannelFactory
  • CalculatorServiceRequestChannel

all expose the same properties. As we’ll see later on, these latter can be set when defining a WCF-Custom/WCF-CustomIsolated Receive Location or a WCF-Custom Send Port and at runtime they are propagated from the custom BindingElementExtensionElement to the custom channel as follows:

BindingElementExtensionElement –> CalculatorServiceBindingElement–>CalculatorServiceChannelFactory –> CalculatorServiceRequestChannel

CalculatorServiceBindingExtensionElement Class

namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary
{
    public enum MessageVersionEnum
    {
        Default,
        None,
        Soap11,
        Soap11WSAddressing10,
        Soap11WSAddressingAugust2004,
        Soap12, 
        Soap12WSAddressing10,
        Soap12WSAddressingAugust2004
    }

    public class CalculatorServiceBindingExtensionElement : BindingElementExtensionElement
    {
        #region Private Constants
        private const string TraceEnabledProperty = "TraceEnabled";
        private const string MaxBufferSizeProperty = "MaxBufferSize";
        private const string MessageVersionProperty = "MessageVersion";
        #endregion

        #region Private Fields
        private CalculatorServiceBindingElement calculatorServiceBindingElement;
        #endregion

        #region Public Constructors
        public CalculatorServiceBindingExtensionElement()
        {
        }
        #endregion

        #region Public Properties
        public override Type BindingElementType
        {
            get
            {
                return typeof(CalculatorServiceBindingElement);
            }
        }
        #endregion

        #region Public Methods
        public override void ApplyConfiguration(BindingElement bindingElement)
        {
            base.ApplyConfiguration(bindingElement);
            calculatorServiceBindingElement = (CalculatorServiceBindingElement)bindingElement;
        }
        #endregion

        #region Protected Methods
        protected override BindingElement CreateBindingElement()
        {
            calculatorServiceBindingElement = new CalculatorServiceBindingElement();
            calculatorServiceBindingElement.TraceEnabled = this.TraceEnabled;
            calculatorServiceBindingElement.MaxBufferSize = this.MaxBufferSize;
            calculatorServiceBindingElement.MessageVersion = this.MessageVersion;
            this.ApplyConfiguration(calculatorServiceBindingElement);
            return calculatorServiceBindingElement;
        }
        #endregion

        #region Configuration Properties
        [ConfigurationProperty(TraceEnabledProperty, DefaultValue = true)]
        public bool TraceEnabled
        {
            get
            {
                return (bool)base[TraceEnabledProperty];
            }
            set
            {
                base[TraceEnabledProperty] = value;
            }
        }

        [ConfigurationProperty(MaxBufferSizeProperty, DefaultValue = 2097152)]
        [IntegerValidator(MinValue = 0)]
        public int MaxBufferSize
        {
            get
            {
                return (int)base[MaxBufferSizeProperty];
            }
            set
            {
                base[MaxBufferSizeProperty] = value;
            }
        }

        [ConfigurationProperty(MessageVersionProperty, DefaultValue = MessageVersionEnum.Default)]
        public MessageVersionEnum MessageVersion
        {
            get
            {
                return (MessageVersionEnum)base[MessageVersionProperty];
            }
            set
            {
                base[MessageVersionProperty] = value;
            }
        }
        #endregion
    }
}

CalculatorServiceChannelFactory and CalculatorServiceRequestChannel Classes

The CalculatorServiceChannelFactory inherits from ChannelFactoryBase that, as the name suggests, provides a common base implementation for channel factories on the client to create channels of a specified type connected to a specified address. In particular, at runtime the OnCreateChannel method creates and returns a new instance of the CalculatorServiceRequestChannel class. Before doing that, the method uses the reference to the inner channel factory to create an instance of the inner channel, that in our case is an instance of the HttpTransportChannel class, and then passes this reference in the constructor of the CalculatorServiceRequestChannel. The custom channel will exploit the reference to the http transport channel to send multiple messages to the CalculatorService, one for each Operation element contained in the inbound request message.

The CalculatorServiceRequestChannel is the class responsible for processing the CalculatorRequest message at runtime. The Request method uses a XmlDictionaryReader object  to read through and disassembles the Operation elements contained within the inbound xml message and for each of them it creates and submits a separate message to the underlying CalculatorService. The The CreateOperationMessage method copies the headers of the CalculatorRequest message to each individual message sent to WCF service. One of these headers is the CoordinationContext used to coordinate the distributed transaction. To invoke the WCF service, the custom channel invokes the Request method exposed by the inner channel. Individual call responses are stored in a collection called  responseList. Finally, the Request method invokes the CreateResponseMessage function to create and return the response message that aggregates the result of individual calls.

CalculatorServiceChannelFactory and CalculatorServiceRequestChannel Classes

#region Using Directives
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.ServiceModel;
using System.ServiceModel.Channels;
using System.Transactions;
using System.Xml;
using System.Text;
#endregion

namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary
{
    /// <summary>
    /// ChannelFactory that performs message inspection
    /// </summary>
    class CalculatorServiceChannelFactory<TChannel>
        : ChannelFactoryBase<TChannel>
    {
        #region Private Constants
        private const string ErrorMessageFormat = 
"<CalculatorResponse xmlns=\"http://Microsoft.BizTalk.CAT.Samples.Schemas.CalculatorResponse\"><Status>{0}
</Status></CalculatorResponse>"
; private const string CalculatorResponseNamespace = "http://Microsoft.BizTalk.CAT.Samples.Schemas.CalculatorResponse"; private const string GenericErrorMessage = "An error occured while processing the request."; private const string OperationUnknownErrorMessageFormat = "The operation failed because the operator {0} is unknown."; private const string OperationFailedErrorMessageFormat = "The operation number {0} returned an error. {1}"; private const string OperationFormat = "[CalculatorServiceRequestChannel] {0} {1} {2} = {3}"; private const string OperationFailed = "[CalculatorServiceRequestChannel] {0} {1} {2} Failed: {3}"; private const string MessageReceived = "[CalculatorServiceRequestChannel] Request message received."; private const string MessageSuccessfullyProcessed =
"[CalculatorServiceRequestChannel] Response message successfully processed."; private const string TransactionIdFormat = "[CalculatorServiceRequestChannel] TransactionId: {0}"; #endregion #region Private Fields IChannelFactory<TChannel> innerChannelFactory; private bool traceEnabled = true; private int maxBufferSize = 2097152; private MessageVersionEnum messageVersion = MessageVersionEnum.Default; #endregion #region Public Constructors public CalculatorServiceChannelFactory(BindingContext context, bool traceEnabled, int maxBufferSize, MessageVersionEnum messageVersion) { this.innerChannelFactory = context.BuildInnerChannelFactory<TChannel>(); this.traceEnabled = traceEnabled; this.maxBufferSize = maxBufferSize; this.messageVersion = messageVersion; if (this.innerChannelFactory == null) { throw new InvalidOperationException("CalculatorServiceChannelFactory requires an inner IChannelFactory."); } } #endregion #region Public Methods public override T GetProperty<T>() { T baseProperty = base.GetProperty<T>(); if (baseProperty != null) { return baseProperty; } return this.innerChannelFactory.GetProperty<T>(); } #endregion #region Protected Methods protected override void OnOpen(TimeSpan timeout) { this.innerChannelFactory.Open(timeout); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginOpen(timeout, callback, state); } protected override void OnEndOpen(IAsyncResult result) { this.innerChannelFactory.EndOpen(result); } protected override void OnClose(TimeSpan timeout) { this.innerChannelFactory.Close(timeout); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginClose(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { this.innerChannelFactory.EndClose(result); } protected override TChannel OnCreateChannel(EndpointAddress to, Uri via) { TChannel innerChannel = this.innerChannelFactory.CreateChannel(to, via); if (typeof(TChannel) == typeof(IRequestChannel)) { return (TChannel)(object)new CalculatorServiceRequestChannel(this, (IRequestChannel)innerChannel, traceEnabled, maxBufferSize, messageVersion); } throw new InvalidOperationException(); } #endregion class CalculatorServiceRequestChannel : CalculatorServiceChannelBase<IRequestChannel>, IRequestChannel { #region Private Fields private bool traceEnabled = true; private int maxBufferSize = 2097152; private MessageVersion messageVersion = MessageVersion.Default; #endregion #region Public Constructors public CalculatorServiceRequestChannel(CalculatorServiceChannelFactory<TChannel> factory, IRequestChannel innerChannel, bool traceEnabled, int maxBufferSize, MessageVersionEnum messageVersion) : base(factory, innerChannel) { this.traceEnabled = traceEnabled; this.maxBufferSize = maxBufferSize; switch (messageVersion) { case MessageVersionEnum.Default: this.messageVersion = MessageVersion.Default; break; case MessageVersionEnum.None: this.messageVersion = MessageVersion.None; break; case MessageVersionEnum.Soap11: this.messageVersion = MessageVersion.Soap11; break; case MessageVersionEnum.Soap11WSAddressing10: this.messageVersion = MessageVersion.Soap11WSAddressing10; break; case MessageVersionEnum.Soap11WSAddressingAugust2004: this.messageVersion = MessageVersion.Soap11WSAddressingAugust2004; break; case MessageVersionEnum.Soap12: this.messageVersion = MessageVersion.Soap12; break; case MessageVersionEnum.Soap12WSAddressing10: this.messageVersion = MessageVersion.Soap12WSAddressing10; break; case MessageVersionEnum.Soap12WSAddressingAugust2004: this.messageVersion = MessageVersion.Soap12WSAddressingAugust2004; break; default: this.messageVersion = MessageVersion.Default; break; } } #endregion #region Public Properties public EndpointAddress RemoteAddress { get { return this.InnerChannel.RemoteAddress; } } public Uri Via { get { return this.InnerChannel.Via; } } #endregion #region Public Methods public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state) { return this.BeginRequest(message, this.DefaultSendTimeout, callback, state); } public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) { try { Message reply = this.Request(message); return new TypedCompletedAsyncResult<Message>(reply, callback, state); } catch (Exception e) { throw e; } } public Message EndRequest(IAsyncResult result) { TypedCompletedAsyncResult<Message> reply = (TypedCompletedAsyncResult<Message>)result; return reply.Data; } public Message Request(Message message) { return this.Request(message, this.DefaultSendTimeout); } public Message Request(Message message, TimeSpan timeout) { Message reply = null; if (message != null) { Message operationMessage; List<Response> responseList = new List<Response>(); string op = null; StringBuilder statusBuilder = new StringBuilder(); string error = null; double operand1 = 0; double operand2 = 0; bool ok = true; int i = 0; try { Trace.WriteLineIf(traceEnabled, MessageReceived); if (traceEnabled) { string transactionId = "None"; if (Transaction.Current != null && Transaction.Current.TransactionInformation != null) { transactionId = Transaction.Current.TransactionInformation.DistributedIdentifier.ToString(); } Trace.WriteLine(string.Format(TransactionIdFormat, transactionId)); } using (XmlDictionaryReader reader = message.GetReaderAtBodyContents()) { double result; while (reader.Read() && ok) { if (reader.NodeType == XmlNodeType.Element && reader.LocalName == "Operator") { error = "None"; op = reader.ReadElementContentAsString(); operand1 = reader.ReadElementContentAsDouble(); operand2 = reader.ReadElementContentAsDouble(); reply = null; i++; try { switch (op) { case "+": operationMessage = CreateOperationMessage("Add",
operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "-": operationMessage = CreateOperationMessage("Subtract",
operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "*": operationMessage = CreateOperationMessage("Multiply",
operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "/": operationMessage = CreateOperationMessage("Divide",
operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; default: error = string.Format(OperationUnknownErrorMessageFormat, op); SetStatus(statusBuilder,
string.Format(OperationFailedErrorMessageFormat, i, error)); ok = false; break; } } catch (Exception innerException) { error = innerException.Message; if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } ok = false; } if (reply != null && !reply.IsFault) { result = GetValue(reply); Trace.WriteLineIf(traceEnabled, string.Format(OperationFormat,
operand1, op, operand2, result)); } else { if (reply != null && reply.IsFault) { MessageFault fault = MessageFault.CreateFault(reply, maxBufferSize); error = fault.Reason.ToString(); } Trace.WriteLineIf(traceEnabled, string.Format(OperationFailed, operand1,
op, operand2, error)); SetStatus(statusBuilder, string.Format(OperationFailedErrorMessageFormat, i, error)); result = 0; } responseList.Add(new Response(error, result)); } } } string status = statusBuilder.ToString(); if (string.IsNullOrEmpty(status)) { status = "Ok"; } return CreateResponseMessage(status, responseList, null); } catch (Exception ex) { Trace.WriteLineIf(traceEnabled, ex.Message); if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } statusBuilder.Append(ex.Message); return null; } finally { Trace.WriteLineIf(traceEnabled, MessageSuccessfullyProcessed); } } else { Trace.WriteLineIf(traceEnabled, String.Format("Message action {0}", message.Headers.Action)); reply = this.InnerChannel.Request(message); Trace.WriteLineIf(traceEnabled, String.Format("Reply is: {0}", reply)); return reply; } } #endregion #region Private Methods private void SetStatus(StringBuilder statusBuilder, string message) { if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } statusBuilder.Append(message); } private Message CreateOperationMessage(string op, double operand1, double operand2, Message message) { string action = string.Format("http://Microsoft.ServiceModel.Samples/ICalculator/{0}", op); Message operationMessage = Message.CreateMessage(messageVersion, action, new OperationBodyWriter(op, operand1, operand2)); if (message != null) { if (message.Headers != null && message.Headers.Count > 0) { operationMessage.Headers.Clear(); operationMessage.Headers.CopyHeadersFrom(message); operationMessage.Headers.Action = action; } if (message.Properties != null && message.Properties.Count > 0) { operationMessage.Properties.CopyProperties(message.Properties); } } if (messageVersion != MessageVersion.Soap11) { operationMessage.Headers.MessageId = new UniqueId(); } return operationMessage; } private Message CreateResponseMessage(string status, List<Response> responseList, Message message) { Message reply = Message.CreateMessage(MessageVersion.Default, "*", new ResponseBodyWriter(status, responseList)); if (message != null) { if (message.Properties != null && message.Properties.Count > 0) { reply.Properties.CopyProperties(message.Properties); } } reply.Headers.MessageId = new UniqueId(); return reply; } private double GetValue(Message message) { XmlDictionaryReader reader = message.GetReaderAtBodyContents(); while (reader.Read()) { if (reader.Name == "AddResult" || reader.Name == "SubtractResult" || reader.Name == "MultiplyResult" || reader.Name == "DivideResult") { return reader.ReadElementContentAsDouble(); } } return 0; } #endregion } class OperationBodyWriter : BodyWriter { #region Private Fields private string op; private double operand1; private double operand2; #endregion #region Public Constructors public OperationBodyWriter(string op, double operand1, double operand2) : base(false) { this.op = op; this.operand1 = operand1; this.operand2 = operand2; } #endregion #region Protected Methods protected override void OnWriteBodyContents(XmlDictionaryWriter writer) { writer.WriteStartElement(op, "http://Microsoft.ServiceModel.Samples"); writer.WriteStartElement("n1"); writer.WriteValue(operand1); writer.WriteEndElement(); writer.WriteStartElement("n2"); writer.WriteValue(operand2); writer.WriteEndElement(); writer.WriteEndElement(); } #endregion } class ResponseBodyWriter : BodyWriter { #region Private Fields private string status; private List<Response> responseList; private Dictionary<int, Response> responseDictionary; #endregion #region Public Constructors public ResponseBodyWriter(string status, List<Response> responseList) : base(false) { this.status = status; this.responseList = responseList; } public ResponseBodyWriter(string status, Dictionary<int, Response> responseDictionary) : base(false) { this.status = status; this.responseDictionary = responseDictionary; } #endregion #region Protected Methods protected override void OnWriteBodyContents(XmlDictionaryWriter writer) { if (responseList != null) { writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace); writer.WriteStartElement("Status", CalculatorResponseNamespace); writer.WriteString(status); writer.WriteEndElement(); writer.WriteStartElement("Results", CalculatorResponseNamespace); for (int i = 0; i < responseList.Count; i++) { writer.WriteStartElement("Result", CalculatorResponseNamespace); writer.WriteStartElement("Value", CalculatorResponseNamespace); writer.WriteString(responseList[i].Value.ToString()); writer.WriteEndElement(); writer.WriteStartElement("Error", CalculatorResponseNamespace); writer.WriteString(responseList[i].Error); writer.WriteEndElement(); writer.WriteEndElement(); } writer.WriteEndElement(); writer.WriteEndElement(); } if (responseDictionary != null) { writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace); writer.WriteStartElement("Status", CalculatorResponseNamespace); writer.WriteString(status); writer.WriteEndElement(); writer.WriteStartElement("Results", CalculatorResponseNamespace); for (int i = 0; i < responseDictionary.Count; i++) { writer.WriteStartElement("Result", CalculatorResponseNamespace); writer.WriteStartElement("Value", CalculatorResponseNamespace); writer.WriteString(responseList[i].Value.ToString()); writer.WriteEndElement(); writer.WriteStartElement("Error", CalculatorResponseNamespace); writer.WriteString(responseDictionary[i].Error); writer.WriteEndElement(); writer.WriteEndElement(); } writer.WriteEndElement(); writer.WriteEndElement(); } } #endregion } public class Response { #region Private Fields private string error; private double value; #endregion #region Public Constructors public Response() { this.error = default(string); this.value = default(double); } public Response(string error, double value) { this.error = error; this.value = value; } #endregion #region Public Properties public string Error { get { return this.error; } set { this.error = value; } } public double Value { get { return this.value; } set { this.value = value; } } #endregion } public class Operation { #region Private Fields private string op; private double operand1; private double operand2; #endregion #region Public Constructors public Operation() { this.op = default(string); this.operand1 = 0; this.operand1 = 0; } public Operation(string op, double operand1, double operand2) { this.op = op; this.operand1 = operand1; this.operand2 = operand2; } #endregion #region Public Properties public string Operator { get { return this.op; } set { this.op = value; } } public double Operand1 { get { return this.operand1; } set { this.operand1 = value; } } public double Operand2 { get { return this.operand2; } set { this.operand2 = value; } } #endregion } } }

 

To register the WCFExtensionLibrary assembly containing my custom channel and use this latter within my BizTalk application I have to perform 3 operations:

  • Install the assemblies implementing the WCF extensibility points in the global assembly cache (GAC).
  • Modify the machine.config file on the BizTalk Server computer.
  • Configure the WCF-Custom Send Port by using the BizTalk Server Administration console.

Installing the WCFExtensionLibrary in the GAC

This step is pretty straightforward and can be accomplished using the gacutil tool. You can automatically install your assembly to the GAC whenever you build the class library by including the execution of the gacutil tool in the post-build event command-line of the project, as shown in following picture:

Image12

An easy and handy way to verify that the assembly has been successfully installed in the GAC is to use the following command:

gacutil /lr Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary

How to configure the machine.config file for a WCF binding element extension

To accomplish this task you can proceed as follow:

  • Use a text editor as the Notepad to open the machine.config that be found in the following folder:
    - %windir%\Microsoft.NET\Framework\v2.0.50727\CONFIG on a 32 bit BizTalk Server node.
    - %windir%\Microsoft.NET\Framework64\v2.0.50727\CONFIG on a 32 bit BizTalk Server node.
  • Add the following element containing the Fully-Qualified Name (FQDN) of the CalculatorServiceBindingExtensionElement class to the <configuration><system.serviceModel>\<extensions><bindingElementExtensions> node: 
     
<add name="calculatorService" type="Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary.CalculatorServiceBindingExtensionElement,
Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary, Version=1.0.0.0, Culture=neutral, PublicKeyToken=d7f63d8d08d8f3a2" />


How to configure the WCF-Custom Send Port by using the BizTalk Admin Console

Using the BizTalk Server Administration console, you can proceed as follows:

  • Create a new Two-Way Solicit-Response Static Send Port and select the WCF-Custom Adapter. Then select the PassThruTransmit as Send Pipeline and the PassThruReceive as Receive Pipeline. They do not contain any pipeline component and do not perform any processing of the message. For this reason, they ensure maximum performance in receiving or sending messages.

Image13

  • Click the Configure button. Enter the URL of the CalculatorService in the Address (URI) textbox:

Image14

  • Select the Binding Tab then choose the customBinding from the Binding Type drop-down list. Then right click the Binding panel on the left hand side and select the Add Extension item from the context menu

Image15

  • Select the transactionFlow and then repeat the same step to select the calculatorService binding element extension element.

Image16

  • Click the transactionFlow element in the Binding panel an set the value of the transactionProtocol property to WSAtomicTransactionOctober2004. If you review the configuration file (web.config) of the CalculatorService you’ll find out that this is the transaction protocol used by the downstream WCF service.

Image17

  • Click the calculatorService element in the Binding panel. As you can see, you can set a value for each of the properties exposed by the custom binding element extension element in the same way as you can configure the properties of a pipeline component. This technique is very powerful and handy as it allows to control the runtime behavior of your custom channel defining in a declarative way.

Image18

  • Click the httpTransport element in the Binding panel. As you can see, you can access and change the value of all the properties (authenticationScheme, transferModemaxReceivedMessageSize, etc.) exposed by binding element extension element and thus control the runtime behavior of the transport channel.

Image19

  • Select the Message Tab and perform the following steps:
    - Select Path in the Inbound BizTalk message body section and enter the following expression in the Body path expression:

    /*[local-name()='Fault']/*[local-name()='Detail']/* | /*[(local-name()='AddResponse')] | /*[(local-name()='CalculatorResponse')]

    The body path expression is evaluated against the immediate child element of the SOAP Body element of an incoming message. This property is valid only for solicit-response ports. This pseudo-XPath expression (its syntax is indeed a subset of the XPath syntax) defines the xml element that will be extracted and returned by the Adapter. I used the Union (|) operator to select the CalculatorResponse element in case of success, while in case of error I specified to extract the Detail element from the SOAP Fault returned by the CalculatorService.

    - Select Xml from the Node encoding drop-down list.
    - Select the Propagate fault message checkbox to route a fault message to a subscribing application (such as another receive port or orchestration schedule).
    - Select Use Transaction and set the Isolation Level to Read Committed. In this way you instruct the WCF-Custom Adapter to initiate a distributed transaction before passing the message to the channel stack. Note: the Isolation Level selected on the Send Port must match the Isolation Level used by the downstream WCF service, otherwise the call will immediately fail.

Image20

Running the Sample

For brevity, I’ll assume you have already deployed and configured all the remaining artifacts that compose the solution (e.g. the WCF Receive Location, the CalculatorService, etc.). To test application you can proceed as follows:

  • Open SQL Server Management Studio and run the following command in a Query Window to clean up the Log table in the LogDB
     
USE LogDb
GO
SET NOCOUNT ON
GO
DELETE Log
GO

 

  • Open the DebugView to capture the trace generated by the CalculatorServiceRequestChannel and  CalculatorService.
  • Open the Calculator Client application, choose MessagingOnly from the Method drop-down list, press the Create button to create a new request document and then the Submit button to send the message to WCF-Custom or WCF-BasicHttp Receive Location exposed by BizTalk Server.

Image21

  • The CalculatorServiceRequestChannel and  CalculatorService produce the following output captured by DebugView:

Image22

  • As you can see above, the TransactionId traced by the CalculatorServiceRequestChannel is the same traced by the CalculatorService during each call. To confirm that all the service calls participated to the same distributed transaction, I ran the following TSQL script within the Query Window: 
     
USE LogDb
GO
SET NOCOUNT ON
GO
SELECT * FROM Log ORDER BY DateCreated DESC 
GO

 

  • The picture below reports the data written by the individual calls. Note that the TransactionId is the same in all rows and matches the one traced down by the DebugView.

Image23

To verify that all service calls execute in the context of the same transaction, you can run negative test. For example,  if you try to submit a message that contains an invalid Operation as 72 / 0, this latter will cause the transaction to abort and as an effect nothing will be traced on the Log table.

Conclusions

Customizing the runtime behavior of WCF Adapters using WCF extensibility points (service behaviors, endpoint behaviors, custom channels, etc.) is a powerful technique as explained in my previous posts. Here you can download the PerformanceOptimizationGuide solution which contains other use cases in addition to the WCF custom channel project described in this post.  Feel free to download and customize the code and please let me know your feedbacks. :-)

  • Hi Paolo, very interesting article and useful. Do you have any idea of how we can aggregate the result of individual messages to multiple files and not only one based on a special code inside the input message. What Imean that I have the input message with several messges, each message has a customer code, I want to group all messages relevant to customer 1 in output file 1, messages relevant tocustomer 2 to output file 2 and so on.

    So if I n messages in my input, then individual elements should be debatched and aggregated in several files. If the n messages represent 10 messages, 2 of which are for cust1, 3 for cust 2, 3 forcust 3, 2 for cust 4, then then 4 output files should be produced by the process.

    Thanks in advance

  • Hi Salam,

    you can implement the behavior you describe using different approaches and this depends on the way you send message to the target endpoint:

    1) you send individual messages to individual target endpoints.

    2) you assemble the messages for a specific customer in a single interchange and send them all together to the target endpoint.

    To implement the expected behavior you have to implement the Splitter and Aggregator design patterns (see http://www.eaipatterns.com/Sequencer.html and http://www.eaipatterns.com/Aggregator.html).

    In order to do that you can adopt one the following strategies to achieve the expected behavior:

    1) You can create a custom disassembler component (assuming that you are processing XML document, your custom class could inherit from the XML disassembler component) that inspect and separate the messages contained in the incoming interchange and create a separate outbound message for each individual customerId. Later on, the Message Agent/EPM will post these messages to the MessageBox and a separate orchestration/send port will be spawned to process each of these documents separately.

    2) You can implement the same approach within an orchestration. This latter should contain a loop to separate messages by customerId (Resequencer pattern, see ) and might use the XLANGPipelineManager + a Send Pipeline + eventually an envelope to aggregate each group of messages in a separate outbound message (one for each target customer). Then you might use a second loop to send outbound messages to individual customers.

    3) use the ESBT (see http://msdn.microsoft.com/en-us/library/ee250076(BTS.10).aspx).

    4) Search over the internet, there are a lot of folks out there that implemented the Splitter, Aggregator, Resequencer patterns. ;-)

    Ciao,

    Paolo

Page 1 of 1 (2 items)
Leave a Comment
  • Please add 3 and 8 and type the answer here:
  • Post
Search Blogs