In the following post I’ll explain in detail a demo that I presented at TechReady 9 and TechEd 2009 in Berlin.
Consider the following scenario:
Your BizTalk application receives an XML request message through a two-way Receive Location (e.g. WCF, SOAP, HTTP). The inbound document contains multiple elements. For each item contained in the request message, the application needs to synchronously invoke a downstream WCF service that returns a response message. The application needs to process all the elements contained in the request document as a unit of work and aggregate the result of individual calls in a single response message that will be returned to the original caller.
Let’s make an example to better scope the problem. Let’s assume that our BizTalk application is exposed via a two-way WCF Receive Location and receives a request message, as the one shown below, containing multiple Operation elements.
CalculatorRequest Message
<CalculatorRequest xmlns="http://Microsoft.BizTalk.CAT.Samples.CalculatorRequest"> <Method>XmlDocumentOrchestration</Method> <Operations> <Operation> <Operator>+</Operator> <Operand1>82</Operand1> <Operand2>18</Operand2> </Operation> <Operation> <Operator>-</Operator> <Operand1>30</Operand1> <Operand2>12</Operand2> </Operation> <Operation> <Operator>*</Operator> <Operand1>25</Operand1> <Operand2>8</Operand2> </Operation> <Operation> <Operator>\</Operator> <Operand1>100</Operand1> <Operand2>25</Operand2> </Operation> <Operation> <Operator>+</Operator> <Operand1>100</Operand1> <Operand2>32</Operand2> </Operation> </Operations> </CalculatorRequest>
For each Operation element contained in the inbound XML document the application has to read the child Operator element and accordingly invoke one of the four methods (Add, Subtract, Multiply, Divide) exposed by the downstream WCF service. This latter is a variation of the CalculatorService that can be found on MSDN between the WCF samples. The return value of the individual calls to the CalculatorService must be aggregated in a single response message, as the one shown below. This latter will be finally returned to the original caller.
CalculatorResponse Message
<CalculatorResponse xmlns="http://Microsoft.BizTalk.CAT.Samples.CalculatorResponse"> <Status>Ok</Status> <Results> <Result> <Value>100</Value> <Error>None</Error> </Result> <Result> <Value>18</Value> <Error>None</Error> </Result> <Result> <Value>200</Value> <Error>None</Error> </Result> <Result> <Value>4</Value> <Error>None</Error> </Result> <Result> <Value>132</Value> <Error>None</Error> </Result> </Results> </CalculatorResponse>
The easier way to solve the above problem is to create a BizTalk application that exploits an orchestration to implement the Scatter-Gather design pattern. The orchestration in question should perform the following steps:
The following graph depicts the architecture of the solution described above. In the remainder of this post, I’ll refer to this solution as LogicalPortsOrchestration.
Message Flow:
The solution described above works fine but as you can easily note it requires 4 distinct roundtrips to the MessageBox for each individual call to the underlying WCF service and therefore this architectural approach is not adequate when performance goals in terms of latency and throughput are very aggressive. More in general, whenever possible you should minimize the use of orchestrations and privilege messaging only patterns to increase the overall throughput and reduce the latency of the business processes. However, the Gather-Scatter and Service Composition are 2 scenarios that can be easily implemented using an orchestration, so the question is: how can I solve the same problem using a different pattern?
An alternative solution could be using the Inline Send design pattern to solve the problem: in this case to invoke the underlying CalculatorService our orchestration would not use a Logical Port bound to Physical WCF Send Port, but it rather use a custom WCF proxy component within an Expression shape that would replace the Send and Receive shapes. Note that not using Adapters and Physical Send Ports, the application would not benefit from their functional capabilities such as batching, retries, correlation sets initialization, declarative configuration and secondary transports. The advantage of the Inline Send technique is that for each Solicit Response call made by the orchestration, it allows to eliminate 4 roundtrips to the MessageBox:
The following graph depicts the architecture of the solution based on the Inline Send pattern. In the remainder of this post, I’ll refer to this solution as InlineSendOrchestration.
In February 2009, the BizTalk Customer Advisory Team conducted a performance lab to determine and compare the performance in terms of latency and throughput of the LogicalPortsOrchestration and InlineSendOrchestration patterns. To this purpose, we used a lab rig composed of 2 BizTalk Server 2009 (Beta version) nodes (2 Dual-Core CPUs, 8 GB RAM), a SQL Server 2008 machine to host the MessageBox. The following table reports the results obtained using a request message that contained 5 distinct Operations:
As you can easily note reviewing the numbers reported in the table above, the InlineSendOrchestration implementation provides better performance than the LogicalPortsOrchestration solution, both in terms of latency and throughput. Therefore, a question arises naturally: should I renounce to the capabilities provided by Adapters and apply the Inline Send pattern to debatch an incoming message and implement the Scatter-Gather pattern in a performant way? Fortunately, the answer is no as there’s another architectural approach to solve the problem using a messaging-only approach that does not require any orchestration.
The following picture depicts the architecture of the third solution that I’ll refer to as MessagingOnly. Unfortunately, this solution was developed after the performance lab, so no comparative tests were conducted to compare its latency and throughput with the 2 preceding implementations. Nevertheless, at the end of this article I provide a link to the code of the 3 implementations, so you can eventually run your own tests and eventually customize the solution to fit your needs.
As I have noted in my previous article, WCF Bindings provide a mechanism for configuring channel stacks. In other words, a binding defines a precise recipe for building a channel stack using a transport channel, a message encoder, and a set of protocol channels. WCF ships with several built-in bindings that target common communication scenarios, such as the BasicHttpBinding, WsHttpBinding and NetTcpBinding and BizTalk Server provides a full range of WCF Adapters that correspond 1:1 to the most commonly used WCF bindings:
However, if you need maximum flexibility and you need to use one or multiple custom protocol channels at runtime, you can use the CustomBinding that gives you the possibility to control which binding elements compose your binding. Likewise, in a BizTalk application, if you want to extend the default behavior of the WCF Adapters with a custom component (Service Behavior, Endpoint Behavior, Custom Binding, Custom Binding Element), you have to use the WCF-Custom and/or WCF-CustomIsolated Adapters. In fact, these latter offer you complete control over the channel stack and behaviors configuration, and as a consequence, they are the only WCF adapters you really need. Compared with the other WCF Adapters, they are the only ones to provide the possibility to:
Before looking at the code of the custom channel, let’s complicate the scenario as follows: let’s assume that the underlying WCF Calculator service invoked by the BizTalk application is hosted by IIS 7.0 and exposes an endpoint that uses a CustomBinding composed of the following binding elements:
The following picture reports the web.config used by the Calculator service:
Web.config
<?xml version="1.0" encoding="utf-8" ?> <configuration> <connectionStrings> <add name="LogDB" providerName="System.Data.SqlClient" connectionString="Data Source=.;Initial Catalog=LogDB;Integrated Security=SSPI;" /> </connectionStrings> <system.diagnostics> <sources> <source name="System.ServiceModel.MessageLogging" switchValue="Warning, ActivityTracing"> <listeners> <add type="System.Diagnostics.DefaultTraceListener" name="Default"> <filter type="" /> </add> <add name="ServiceModelMessageLoggingListener"> <filter type="" /> </add> </listeners> </source> </sources> <sharedListeners> <add initializeData="C:\calculatorservicetxn.svclog" type="System.Diagnostics.XmlWriterTraceListener, System, Version=2.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089" name="ServiceModelMessageLoggingListener" traceOutputOptions="Timestamp"> <filter type="" /> </add> </sharedListeners> </system.diagnostics> <system.serviceModel> <diagnostics performanceCounters="All"> <messageLogging logEntireMessage="false" logMalformedMessages="false" logMessagesAtServiceLevel="false" logMessagesAtTransportLevel="false" maxSizeOfMessageToLog="1000000" /> </diagnostics> <!-- Service Endpoints --> <services> <service behaviorConfiguration="CalculatorServiceBehavior" name="Microsoft.ServiceModel.Samples.CalculatorService"> <endpoint address="" binding="customBinding" bindingConfiguration="customBinding" name="customBinding" contract="Microsoft.ServiceModel.Samples.ICalculator" /> </service> </services> <!-- Binding Condfiguration --> <bindings> <customBinding> <binding name="customBinding"> <transactionFlow transactionProtocol="WSAtomicTransactionOctober2004" /> <textMessageEncoding /> <httpTransport /> </binding> </customBinding> </bindings> <!-- Service Behaviors --> <behaviors> <serviceBehaviors> <behavior name="CalculatorServiceBehavior"> <serviceMetadata httpGetEnabled="true" /> <serviceDebug includeExceptionDetailInFaults="true" /> <serviceThrottling maxConcurrentCalls="200" maxConcurrentSessions="200" maxConcurrentInstances="200" /> </behavior> </serviceBehaviors> </behaviors> </system.serviceModel> <system.web> <customErrors mode="RemoteOnly"/> <trust level="Full" /> </system.web> </configuration>
The Calculator service requires the client application to create and flow a transaction using the WS-AtomicTransaction protocol. In particular, the WCF service will use this client-initiated transaction to log the data of the invoked operation to a SQL database. To implement this behavior, all the service operations exposed by the ICalculatorService contract interface have been decorated with the TransactionFlowAttribute. In particular, the TransactionFlowOption.Mandatory value has been used in the constructor of the TransactionFlow attribute to indicate that the client application must initiate and transmit a transaction, otherwise the service operation will immediately throw an exception.
In the CalculatorService class, all the service operations have been decorated with the OperationBehaviorAttribute. In particular, the value of the TransactionScopeRequired and TransactionAutoComplete properties has been set to true. This indicates that service operations require a transaction scope for their execution and this latter automatically completes if no unhandled exceptions occur and the method returns successfully. All the service operations invoke the RecordToLog method that logs the current operation data and transactionId to the LogDB on SQL Server. This sample on MSDN demonstrates various aspects of creating a transactional service and the use of a client-initiated transaction to coordinate service operations.
Note in particular that the Transaction.Current.TransactionInformation.DistributedIdentifier static property is used by the RecordToLog method to retrieve the unique identifier of the client-initiated distributed transaction.
CalculatorService Class
#region Using Directives using System; using System.Diagnostics; using System.ServiceModel; using System.Transactions; using System.Configuration; using System.Data; using System.Data.SqlClient; using System.Data.SqlTypes; #endregion namespace Microsoft.ServiceModel.Samples { // Define a service contract. [ServiceContract(Namespace = "http://Microsoft.ServiceModel.Samples")] public interface ICalculator { #region Contract Operations [OperationContract] [TransactionFlow(TransactionFlowOption.Mandatory)] double Add(double n1, double n2); [OperationContract] [TransactionFlow(TransactionFlowOption.Mandatory)] double Subtract(double n1, double n2); [OperationContract] [TransactionFlow(TransactionFlowOption.Mandatory)] double Multiply(double n1, double n2); [OperationContract] [TransactionFlow(TransactionFlowOption.Mandatory)] double Divide(double n1, double n2); #endregion } // Service class which implements the service contract. [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Single)] public class CalculatorService : ICalculator { #region Private Constants private const string AttemptedToDivideByZero = "Attempted to divide by zero."; private const string TransactionIdFormat = "[CalculatorService] TransactionId: {0}"; private const string CallCompletedFormat = "[CalculatorService] Message [{0}] stored to LogDB."; private const string MessageFormat = "[CalculatorService] {0}"; private const string InsertLogEntrySP = "usp_InsertLogEntry"; private const string IdParameter = "@Id"; private const string MessageParameter = "@Message"; private const string TypeParameter = "@Type"; private const string SourceParameter = "@Source"; private const string TransactionIdParameter = "@TransactionID"; private const string Information = "Information"; private const string Source = "CalculatorService"; private const string None = "None"; private const string AddFormat = "{0} + {1} = {2}"; private const string SubtractFormat = "{0} - {1} = {2}"; private const string MultiplyFormat = "{0} * {1} = {2}"; private const string DivideFormat = "{0} / {1} = {2}"; #endregion #region Private Static Fields private static string connectionString = null; #endregion #region Static Constructor static CalculatorService() { try { if (ConfigurationManager.ConnectionStrings != null && ConfigurationManager.ConnectionStrings.Count > 0) { connectionString = ConfigurationManager.ConnectionStrings["LogDB"].ConnectionString; } } catch (Exception ex) { Trace.WriteLine(string.Format(MessageFormat, ex.Message)); } } #endregion #region Public Methods [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] public double Add(double n1, double n2) { double n3 = n1 + n2; RecordToLog(String.Format(AddFormat, n1, n2, n3)); return n3; } [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] public double Subtract(double n1, double n2) { double n3 = n1 - n2; RecordToLog(String.Format(SubtractFormat, n1, n2, n3)); return n3; } [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] public double Multiply(double n1, double n2) { double n3 = n1 * n2; RecordToLog(String.Format(MultiplyFormat, n1, n2, n3)); return n3; } [OperationBehavior(TransactionScopeRequired = true, TransactionAutoComplete = true)] public double Divide(double n1, double n2) { if (n2 == 0) { Trace.WriteLine(AttemptedToDivideByZero); throw new DivideByZeroException(); } double n3 = n1 / n2; RecordToLog(String.Format(DivideFormat, n1, n2, n3)); return n3; } #endregion #region Private Static Methods private static void RecordToLog(string message) { try { if (!string.IsNullOrEmpty(connectionString)) { string transactionId = None; if (Transaction.Current != null && Transaction.Current.TransactionInformation != null) { transactionId = Transaction.Current.TransactionInformation.DistributedIdentifier.ToString(); } Trace.WriteLine(string.Format(TransactionIdFormat, transactionId)); using (SqlConnection sqlConnection = new SqlConnection(connectionString)) { sqlConnection.Open(); SqlParameter sqlParameter = null; using (SqlCommand sqlCommand = new SqlCommand(InsertLogEntrySP, sqlConnection)) { sqlParameter = new SqlParameter(IdParameter, SqlDbType.UniqueIdentifier); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = Guid.NewGuid(); sqlCommand.Parameters.Add(sqlParameter); sqlParameter = new SqlParameter(MessageParameter, SqlDbType.VarChar, 512); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = message.Length <= 512 ? message : message.Substring(0, 512); sqlCommand.Parameters.Add(sqlParameter); sqlParameter = new SqlParameter(TypeParameter, SqlDbType.VarChar, 64); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = Information; sqlCommand.Parameters.Add(sqlParameter); sqlParameter = new SqlParameter(SourceParameter, SqlDbType.VarChar, 64); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = Source; sqlCommand.Parameters.Add(sqlParameter); sqlParameter = new SqlParameter(TransactionIdParameter, SqlDbType.VarChar, 64); sqlParameter.Direction = ParameterDirection.Input; sqlParameter.Value = transactionId; sqlCommand.Parameters.Add(sqlParameter); sqlCommand.CommandType = CommandType.StoredProcedure; sqlCommand.ExecuteNonQuery(); Trace.WriteLine(string.Format(CallCompletedFormat, message)); } } } } catch (Exception ex) { Trace.WriteLine(string.Format(MessageFormat, ex.Message)); } } #endregion } }
Now let’s assume that all the operations contained in a CalculatorRequest document must be processed as a unit of work within the same distributed transaction. In this case, the BizTalk application will have to perform the following steps:
The following graph depicts the architecture of the described solution.
On the left hand side, the following picture shows the custom binding used by the WCF-Custom Send Port, while on the right hand side, it describes more in detail the individual steps performed by the WCF-Custom Send Port.
Let’s look at the code. To create a custom protocol channel to debatch the request message at runtime within the WCF Send Port I built a Class Library called WCFExtensionLibrary composed of multiple components. As we noted in the first part of this article, on the sending side, a binding is used to build a IChannelFactory, which in turn builds a channel stack and returns a reference to the top channel in the stack. The application can then use this channel to send messages. A binding consists of an ordered set of binding elements that inherit from the BindingElement class. Each binding element at runtime creates a IChannelFactory, that in turn creates a channel that can be used to process the message in a given point of the channel stack. BizTalk developers can see the protocol channels that compose a channel stack as the pipeline components within a pipeline on a Receive Location or Send Port, and the transport channel as the transport Adapter used to receive or send a message. The only difference between the 2 approaches is that to create a custom pipeline component it’s necessary to create a single class, while to create a custom channel at runtime it’s necessary to create the multiple classes. Let’s see in detail the components that I created to register, configure, and execute my custom channel within the WCF-Custom Send Port.
This class inherits from the BindingElement class. This custom binding element exposes the following properties:
The BuildChannelFactory method initializes the CalculatorServiceChannelFactory that is responsible for creating an instance of the CalculatorServiceRequestChannel at runtime.
CalculatorServiceBindingElement Class
#region Using Directives using System; using System.ServiceModel.Channels; using System.ServiceModel; #endregion namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary { public class CalculatorServiceBindingElement : BindingElement { #region Private Fields private bool traceEnabled = true; private int maxBufferSize = 2097152; private MessageVersionEnum messageVersion = MessageVersionEnum.Default; #endregion #region Public Constructors public CalculatorServiceBindingElement() { } protected CalculatorServiceBindingElement(CalculatorServiceBindingElement other) : base(other) { this.traceEnabled = other.traceEnabled; this.maxBufferSize = other.maxBufferSize; this.messageVersion = other.messageVersion; } #endregion #region Public Methods public override BindingElement Clone() { return new CalculatorServiceBindingElement(this); } public override bool CanBuildChannelFactory<TChannel>(BindingContext context) { return context.CanBuildInnerChannelFactory<TChannel>(); } public override bool CanBuildChannelListener<TChannel>(BindingContext context) { return context.CanBuildInnerChannelListener<TChannel>(); } public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context) { return new CalculatorServiceChannelFactory<TChannel>(context, traceEnabled, maxBufferSize, messageVersion); } public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context) { return new CalculatorServiceChannelListener<TChannel>(context, traceEnabled, maxBufferSize, messageVersion); } public override T GetProperty<T>(BindingContext context) { return context.GetInnerProperty<T>(); } #endregion #region Public Properties public bool TraceEnabled { get { return this.traceEnabled; } set { this.traceEnabled = value; } } public int MaxBufferSize { get { return this.maxBufferSize; } set { this.maxBufferSize = value; } } public MessageVersionEnum MessageVersion { get { return this.messageVersion; } set { this.messageVersion = value; } } #endregion } }
This class inherits from the BindingElementExtensionElement class. In general, a BindingElementExtensionElement enables the use of a custom BindingElement implementation from a machine or application configuration file. In our case, the CalculatorServiceBindingExtensionElement has been created to achieve two results:
The CalculatorServiceBindingExtensionElement class exposes the same properties as the CalculatorServiceBindingElement. The CreateBindingElement method creates an instance of CalculatorServiceBindingElement class and assign a value to its properties. In practice, the following classes:
all expose the same properties. As we’ll see later on, these latter can be set when defining a WCF-Custom/WCF-CustomIsolated Receive Location or a WCF-Custom Send Port and at runtime they are propagated from the custom BindingElementExtensionElement to the custom channel as follows:
BindingElementExtensionElement –> CalculatorServiceBindingElement–>CalculatorServiceChannelFactory –> CalculatorServiceRequestChannel
CalculatorServiceBindingExtensionElement Class
namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary { public enum MessageVersionEnum { Default, None, Soap11, Soap11WSAddressing10, Soap11WSAddressingAugust2004, Soap12, Soap12WSAddressing10, Soap12WSAddressingAugust2004 } public class CalculatorServiceBindingExtensionElement : BindingElementExtensionElement { #region Private Constants private const string TraceEnabledProperty = "TraceEnabled"; private const string MaxBufferSizeProperty = "MaxBufferSize"; private const string MessageVersionProperty = "MessageVersion"; #endregion #region Private Fields private CalculatorServiceBindingElement calculatorServiceBindingElement; #endregion #region Public Constructors public CalculatorServiceBindingExtensionElement() { } #endregion #region Public Properties public override Type BindingElementType { get { return typeof(CalculatorServiceBindingElement); } } #endregion #region Public Methods public override void ApplyConfiguration(BindingElement bindingElement) { base.ApplyConfiguration(bindingElement); calculatorServiceBindingElement = (CalculatorServiceBindingElement)bindingElement; } #endregion #region Protected Methods protected override BindingElement CreateBindingElement() { calculatorServiceBindingElement = new CalculatorServiceBindingElement(); calculatorServiceBindingElement.TraceEnabled = this.TraceEnabled; calculatorServiceBindingElement.MaxBufferSize = this.MaxBufferSize; calculatorServiceBindingElement.MessageVersion = this.MessageVersion; this.ApplyConfiguration(calculatorServiceBindingElement); return calculatorServiceBindingElement; } #endregion #region Configuration Properties [ConfigurationProperty(TraceEnabledProperty, DefaultValue = true)] public bool TraceEnabled { get { return (bool)base[TraceEnabledProperty]; } set { base[TraceEnabledProperty] = value; } } [ConfigurationProperty(MaxBufferSizeProperty, DefaultValue = 2097152)] [IntegerValidator(MinValue = 0)] public int MaxBufferSize { get { return (int)base[MaxBufferSizeProperty]; } set { base[MaxBufferSizeProperty] = value; } } [ConfigurationProperty(MessageVersionProperty, DefaultValue = MessageVersionEnum.Default)] public MessageVersionEnum MessageVersion { get { return (MessageVersionEnum)base[MessageVersionProperty]; } set { base[MessageVersionProperty] = value; } } #endregion } }
The CalculatorServiceChannelFactory inherits from ChannelFactoryBase that, as the name suggests, provides a common base implementation for channel factories on the client to create channels of a specified type connected to a specified address. In particular, at runtime the OnCreateChannel method creates and returns a new instance of the CalculatorServiceRequestChannel class. Before doing that, the method uses the reference to the inner channel factory to create an instance of the inner channel, that in our case is an instance of the HttpTransportChannel class, and then passes this reference in the constructor of the CalculatorServiceRequestChannel. The custom channel will exploit the reference to the http transport channel to send multiple messages to the CalculatorService, one for each Operation element contained in the inbound request message.
The CalculatorServiceRequestChannel is the class responsible for processing the CalculatorRequest message at runtime. The Request method uses a XmlDictionaryReader object to read through and disassembles the Operation elements contained within the inbound xml message and for each of them it creates and submits a separate message to the underlying CalculatorService. The The CreateOperationMessage method copies the headers of the CalculatorRequest message to each individual message sent to WCF service. One of these headers is the CoordinationContext used to coordinate the distributed transaction. To invoke the WCF service, the custom channel invokes the Request method exposed by the inner channel. Individual call responses are stored in a collection called responseList. Finally, the Request method invokes the CreateResponseMessage function to create and return the response message that aggregates the result of individual calls.
CalculatorServiceChannelFactory and CalculatorServiceRequestChannel Classes
#region Using Directives using System; using System.Collections.Generic; using System.Diagnostics; using System.ServiceModel; using System.ServiceModel.Channels; using System.Transactions; using System.Xml; using System.Text; #endregion namespace Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary { /// <summary> /// ChannelFactory that performs message inspection /// </summary> class CalculatorServiceChannelFactory<TChannel> : ChannelFactoryBase<TChannel> { #region Private Constants private const string ErrorMessageFormat = "<CalculatorResponse xmlns=\"http://Microsoft.BizTalk.CAT.Samples.Schemas.CalculatorResponse\"><Status>{0}</Status></CalculatorResponse>"; private const string CalculatorResponseNamespace = "http://Microsoft.BizTalk.CAT.Samples.Schemas.CalculatorResponse"; private const string GenericErrorMessage = "An error occured while processing the request."; private const string OperationUnknownErrorMessageFormat = "The operation failed because the operator {0} is unknown."; private const string OperationFailedErrorMessageFormat = "The operation number {0} returned an error. {1}"; private const string OperationFormat = "[CalculatorServiceRequestChannel] {0} {1} {2} = {3}"; private const string OperationFailed = "[CalculatorServiceRequestChannel] {0} {1} {2} Failed: {3}"; private const string MessageReceived = "[CalculatorServiceRequestChannel] Request message received."; private const string MessageSuccessfullyProcessed = "[CalculatorServiceRequestChannel] Response message successfully processed."; private const string TransactionIdFormat = "[CalculatorServiceRequestChannel] TransactionId: {0}"; #endregion #region Private Fields IChannelFactory<TChannel> innerChannelFactory; private bool traceEnabled = true; private int maxBufferSize = 2097152; private MessageVersionEnum messageVersion = MessageVersionEnum.Default; #endregion #region Public Constructors public CalculatorServiceChannelFactory(BindingContext context, bool traceEnabled, int maxBufferSize, MessageVersionEnum messageVersion) { this.innerChannelFactory = context.BuildInnerChannelFactory<TChannel>(); this.traceEnabled = traceEnabled; this.maxBufferSize = maxBufferSize; this.messageVersion = messageVersion; if (this.innerChannelFactory == null) { throw new InvalidOperationException("CalculatorServiceChannelFactory requires an inner IChannelFactory."); } } #endregion #region Public Methods public override T GetProperty<T>() { T baseProperty = base.GetProperty<T>(); if (baseProperty != null) { return baseProperty; } return this.innerChannelFactory.GetProperty<T>(); } #endregion #region Protected Methods protected override void OnOpen(TimeSpan timeout) { this.innerChannelFactory.Open(timeout); } protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginOpen(timeout, callback, state); } protected override void OnEndOpen(IAsyncResult result) { this.innerChannelFactory.EndOpen(result); } protected override void OnClose(TimeSpan timeout) { this.innerChannelFactory.Close(timeout); } protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state) { return this.innerChannelFactory.BeginClose(timeout, callback, state); } protected override void OnEndClose(IAsyncResult result) { this.innerChannelFactory.EndClose(result); } protected override TChannel OnCreateChannel(EndpointAddress to, Uri via) { TChannel innerChannel = this.innerChannelFactory.CreateChannel(to, via); if (typeof(TChannel) == typeof(IRequestChannel)) { return (TChannel)(object)new CalculatorServiceRequestChannel(this, (IRequestChannel)innerChannel, traceEnabled, maxBufferSize, messageVersion); } throw new InvalidOperationException(); } #endregion class CalculatorServiceRequestChannel : CalculatorServiceChannelBase<IRequestChannel>, IRequestChannel { #region Private Fields private bool traceEnabled = true; private int maxBufferSize = 2097152; private MessageVersion messageVersion = MessageVersion.Default; #endregion #region Public Constructors public CalculatorServiceRequestChannel(CalculatorServiceChannelFactory<TChannel> factory, IRequestChannel innerChannel, bool traceEnabled, int maxBufferSize, MessageVersionEnum messageVersion) : base(factory, innerChannel) { this.traceEnabled = traceEnabled; this.maxBufferSize = maxBufferSize; switch (messageVersion) { case MessageVersionEnum.Default: this.messageVersion = MessageVersion.Default; break; case MessageVersionEnum.None: this.messageVersion = MessageVersion.None; break; case MessageVersionEnum.Soap11: this.messageVersion = MessageVersion.Soap11; break; case MessageVersionEnum.Soap11WSAddressing10: this.messageVersion = MessageVersion.Soap11WSAddressing10; break; case MessageVersionEnum.Soap11WSAddressingAugust2004: this.messageVersion = MessageVersion.Soap11WSAddressingAugust2004; break; case MessageVersionEnum.Soap12: this.messageVersion = MessageVersion.Soap12; break; case MessageVersionEnum.Soap12WSAddressing10: this.messageVersion = MessageVersion.Soap12WSAddressing10; break; case MessageVersionEnum.Soap12WSAddressingAugust2004: this.messageVersion = MessageVersion.Soap12WSAddressingAugust2004; break; default: this.messageVersion = MessageVersion.Default; break; } } #endregion #region Public Properties public EndpointAddress RemoteAddress { get { return this.InnerChannel.RemoteAddress; } } public Uri Via { get { return this.InnerChannel.Via; } } #endregion #region Public Methods public IAsyncResult BeginRequest(Message message, AsyncCallback callback, object state) { return this.BeginRequest(message, this.DefaultSendTimeout, callback, state); } public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state) { try { Message reply = this.Request(message); return new TypedCompletedAsyncResult<Message>(reply, callback, state); } catch (Exception e) { throw e; } } public Message EndRequest(IAsyncResult result) { TypedCompletedAsyncResult<Message> reply = (TypedCompletedAsyncResult<Message>)result; return reply.Data; } public Message Request(Message message) { return this.Request(message, this.DefaultSendTimeout); } public Message Request(Message message, TimeSpan timeout) { Message reply = null; if (message != null) { Message operationMessage; List<Response> responseList = new List<Response>(); string op = null; StringBuilder statusBuilder = new StringBuilder(); string error = null; double operand1 = 0; double operand2 = 0; bool ok = true; int i = 0; try { Trace.WriteLineIf(traceEnabled, MessageReceived); if (traceEnabled) { string transactionId = "None"; if (Transaction.Current != null && Transaction.Current.TransactionInformation != null) { transactionId = Transaction.Current.TransactionInformation.DistributedIdentifier.ToString(); } Trace.WriteLine(string.Format(TransactionIdFormat, transactionId)); } using (XmlDictionaryReader reader = message.GetReaderAtBodyContents()) { double result; while (reader.Read() && ok) { if (reader.NodeType == XmlNodeType.Element && reader.LocalName == "Operator") { error = "None"; op = reader.ReadElementContentAsString(); operand1 = reader.ReadElementContentAsDouble(); operand2 = reader.ReadElementContentAsDouble(); reply = null; i++; try { switch (op) { case "+": operationMessage = CreateOperationMessage("Add", operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "-": operationMessage = CreateOperationMessage("Subtract", operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "*": operationMessage = CreateOperationMessage("Multiply", operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; case "/": operationMessage = CreateOperationMessage("Divide", operand1, operand2, message); reply = this.InnerChannel.Request(operationMessage); break; default: error = string.Format(OperationUnknownErrorMessageFormat, op); SetStatus(statusBuilder, string.Format(OperationFailedErrorMessageFormat, i, error)); ok = false; break; } } catch (Exception innerException) { error = innerException.Message; if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } ok = false; } if (reply != null && !reply.IsFault) { result = GetValue(reply); Trace.WriteLineIf(traceEnabled, string.Format(OperationFormat, operand1, op, operand2, result)); } else { if (reply != null && reply.IsFault) { MessageFault fault = MessageFault.CreateFault(reply, maxBufferSize); error = fault.Reason.ToString(); } Trace.WriteLineIf(traceEnabled, string.Format(OperationFailed, operand1, op, operand2, error)); SetStatus(statusBuilder, string.Format(OperationFailedErrorMessageFormat, i, error)); result = 0; } responseList.Add(new Response(error, result)); } } } string status = statusBuilder.ToString(); if (string.IsNullOrEmpty(status)) { status = "Ok"; } return CreateResponseMessage(status, responseList, null); } catch (Exception ex) { Trace.WriteLineIf(traceEnabled, ex.Message); if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } statusBuilder.Append(ex.Message); return null; } finally { Trace.WriteLineIf(traceEnabled, MessageSuccessfullyProcessed); } } else { Trace.WriteLineIf(traceEnabled, String.Format("Message action {0}", message.Headers.Action)); reply = this.InnerChannel.Request(message); Trace.WriteLineIf(traceEnabled, String.Format("Reply is: {0}", reply)); return reply; } } #endregion #region Private Methods private void SetStatus(StringBuilder statusBuilder, string message) { if (statusBuilder.Length > 0) { statusBuilder.Append(" - "); } statusBuilder.Append(message); } private Message CreateOperationMessage(string op, double operand1, double operand2, Message message) { string action = string.Format("http://Microsoft.ServiceModel.Samples/ICalculator/{0}", op); Message operationMessage = Message.CreateMessage(messageVersion, action, new OperationBodyWriter(op, operand1, operand2)); if (message != null) { if (message.Headers != null && message.Headers.Count > 0) { operationMessage.Headers.Clear(); operationMessage.Headers.CopyHeadersFrom(message); operationMessage.Headers.Action = action; } if (message.Properties != null && message.Properties.Count > 0) { operationMessage.Properties.CopyProperties(message.Properties); } } if (messageVersion != MessageVersion.Soap11) { operationMessage.Headers.MessageId = new UniqueId(); } return operationMessage; } private Message CreateResponseMessage(string status, List<Response> responseList, Message message) { Message reply = Message.CreateMessage(MessageVersion.Default, "*", new ResponseBodyWriter(status, responseList)); if (message != null) { if (message.Properties != null && message.Properties.Count > 0) { reply.Properties.CopyProperties(message.Properties); } } reply.Headers.MessageId = new UniqueId(); return reply; } private double GetValue(Message message) { XmlDictionaryReader reader = message.GetReaderAtBodyContents(); while (reader.Read()) { if (reader.Name == "AddResult" || reader.Name == "SubtractResult" || reader.Name == "MultiplyResult" || reader.Name == "DivideResult") { return reader.ReadElementContentAsDouble(); } } return 0; } #endregion } class OperationBodyWriter : BodyWriter { #region Private Fields private string op; private double operand1; private double operand2; #endregion #region Public Constructors public OperationBodyWriter(string op, double operand1, double operand2) : base(false) { this.op = op; this.operand1 = operand1; this.operand2 = operand2; } #endregion #region Protected Methods protected override void OnWriteBodyContents(XmlDictionaryWriter writer) { writer.WriteStartElement(op, "http://Microsoft.ServiceModel.Samples"); writer.WriteStartElement("n1"); writer.WriteValue(operand1); writer.WriteEndElement(); writer.WriteStartElement("n2"); writer.WriteValue(operand2); writer.WriteEndElement(); writer.WriteEndElement(); } #endregion } class ResponseBodyWriter : BodyWriter { #region Private Fields private string status; private List<Response> responseList; private Dictionary<int, Response> responseDictionary; #endregion #region Public Constructors public ResponseBodyWriter(string status, List<Response> responseList) : base(false) { this.status = status; this.responseList = responseList; } public ResponseBodyWriter(string status, Dictionary<int, Response> responseDictionary) : base(false) { this.status = status; this.responseDictionary = responseDictionary; } #endregion #region Protected Methods protected override void OnWriteBodyContents(XmlDictionaryWriter writer) { if (responseList != null) { writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace); writer.WriteStartElement("Status", CalculatorResponseNamespace); writer.WriteString(status); writer.WriteEndElement(); writer.WriteStartElement("Results", CalculatorResponseNamespace); for (int i = 0; i < responseList.Count; i++) { writer.WriteStartElement("Result", CalculatorResponseNamespace); writer.WriteStartElement("Value", CalculatorResponseNamespace); writer.WriteString(responseList[i].Value.ToString()); writer.WriteEndElement(); writer.WriteStartElement("Error", CalculatorResponseNamespace); writer.WriteString(responseList[i].Error); writer.WriteEndElement(); writer.WriteEndElement(); } writer.WriteEndElement(); writer.WriteEndElement(); } if (responseDictionary != null) { writer.WriteStartElement("CalculatorResponse", CalculatorResponseNamespace); writer.WriteStartElement("Status", CalculatorResponseNamespace); writer.WriteString(status); writer.WriteEndElement(); writer.WriteStartElement("Results", CalculatorResponseNamespace); for (int i = 0; i < responseDictionary.Count; i++) { writer.WriteStartElement("Result", CalculatorResponseNamespace); writer.WriteStartElement("Value", CalculatorResponseNamespace); writer.WriteString(responseList[i].Value.ToString()); writer.WriteEndElement(); writer.WriteStartElement("Error", CalculatorResponseNamespace); writer.WriteString(responseDictionary[i].Error); writer.WriteEndElement(); writer.WriteEndElement(); } writer.WriteEndElement(); writer.WriteEndElement(); } } #endregion } public class Response { #region Private Fields private string error; private double value; #endregion #region Public Constructors public Response() { this.error = default(string); this.value = default(double); } public Response(string error, double value) { this.error = error; this.value = value; } #endregion #region Public Properties public string Error { get { return this.error; } set { this.error = value; } } public double Value { get { return this.value; } set { this.value = value; } } #endregion } public class Operation { #region Private Fields private string op; private double operand1; private double operand2; #endregion #region Public Constructors public Operation() { this.op = default(string); this.operand1 = 0; this.operand1 = 0; } public Operation(string op, double operand1, double operand2) { this.op = op; this.operand1 = operand1; this.operand2 = operand2; } #endregion #region Public Properties public string Operator { get { return this.op; } set { this.op = value; } } public double Operand1 { get { return this.operand1; } set { this.operand1 = value; } } public double Operand2 { get { return this.operand2; } set { this.operand2 = value; } } #endregion } } }
To register the WCFExtensionLibrary assembly containing my custom channel and use this latter within my BizTalk application I have to perform 3 operations:
This step is pretty straightforward and can be accomplished using the gacutil tool. You can automatically install your assembly to the GAC whenever you build the class library by including the execution of the gacutil tool in the post-build event command-line of the project, as shown in following picture:
An easy and handy way to verify that the assembly has been successfully installed in the GAC is to use the following command:
gacutil /lr Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary
To accomplish this task you can proceed as follow:
<add name="calculatorService" type="Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary.CalculatorServiceBindingExtensionElement, Microsoft.BizTalk.CAT.Samples.WCFExtensionLibrary, Version=1.0.0.0, Culture=neutral, PublicKeyToken=d7f63d8d08d8f3a2" />
Using the BizTalk Server Administration console, you can proceed as follows:
For brevity, I’ll assume you have already deployed and configured all the remaining artifacts that compose the solution (e.g. the WCF Receive Location, the CalculatorService, etc.). To test application you can proceed as follows:
USE LogDb GO SET NOCOUNT ON GO DELETE Log GO
USE LogDb GO SET NOCOUNT ON GO SELECT * FROM Log ORDER BY DateCreated DESC GO
To verify that all service calls execute in the context of the same transaction, you can run negative test. For example, if you try to submit a message that contains an invalid Operation as 72 / 0, this latter will cause the transaction to abort and as an effect nothing will be traced on the Log table.
Customizing the runtime behavior of WCF Adapters using WCF extensibility points (service behaviors, endpoint behaviors, custom channels, etc.) is a powerful technique as explained in my previous posts. Here you can download the PerformanceOptimizationGuide solution which contains other use cases in addition to the WCF custom channel project described in this post. Feel free to download and customize the code and please let me know your feedbacks. :-)