[T2-401 デモ (1)] WCF の Transport レベルの Custom Channel Sample
こんにちは。
- WCF の Transport レベルの Custom Channel Sample
- Rehosting を使ったエンドユーザへのワークフロー公開
- WF による Rule Base のアプリケーション
- Custom の SharePoint Workflow Editor
製品開発者向けということで実施した Tech Ed の T2-401 セッションですが、お約束通り、デモを添付しておきます。
まずは、最初にお話した、WCF の TransportBindingElement を継承したバインディング要素のカスタムチャネルをみて頂くための BakaChannel というあまりきれいな名前ではないサンプルです。
チャネルのメカニズムを詳しくみていくための簡単なサンプルです。
download sample
「WCF がその内部でどのようなパイプラインで処理されるか ?」、「WCF のチャネルとは何か ?」 については、申し訳ありませんが、ここでは記載しません。(何某かのドキュメントを参照して事前に理解しておいてください)
セッションでもご説明しましたが、上位のチャネル (TransportBindingElement 以外の層の Channel) の構築は仕組みを理解しておけば一般にそれほどむずかしいものではありませんでした。セッションの中でお見せしたように、こうしたチャネルでは、大元で呼び出される BindingElement に渡されてくる BindingContext というオブジェクトを使い、BuildInnerChannelFactory / BuildInnerChannelListener という各メソッドを呼び出すことで、例えば Request-Reply モデルの場合には、IRequestChannel / IReplyChannel 型を受け付ける内部の (基底の) ファクトリー (ChannelFactory) やリスナー (ChannelListener) を取得することができ、さらに、このファクトリーの CreateChannel メソッドや、リスナーの AcceptChannel メソッドで、今後は内部チャネルを取得することができます。乱暴な書き方かもしれませんが、独自な処理が必要でない箇所では、すべてこれらのメソッドをそのまま使ってインタフェース実装や override メソッドを実装すれば、それでカスタムチャネルの完成ということになります。(独自な処理が必要なメソッドだけカスタムコードを記述して、内部の ChannelListener / ChannelFactory / Channel に処理を渡していけば OK です。)
コードの一部を抜粋すると、以下のようなスタイルになるでしょう。
IDuplexSessionChannel innerChannel;
protected override void OnOpen(TimeSpan timeout)
{
innerChannel.Open(timeout);
}
public bool WaitForMessage(TimeSpan timeout)
{
return innerChannel.WaitForMessage(timeout);
}
こうしたサンプルは数多く存在し、例えばダウンロードセンターにある 「Windows Communication Foundation (WCF), Windows Workflow Foundation (WF) and Windows CardSpace Samples」 からも 「ChunkingChannel」 という動くサンプルが入手できます。(ChunkingChannel は、メッセージを特定のバイト数に分割して送信するという Channel のサンプルです)
では、最後のトランスポートレベルのチャネルではどのように処理すれば良いでしょうか ? そのサンプルが、ここで掲載している上記のサンプルになります。
サンプルは、最終的な Message をファイルに書き込んで、サービス側ではそのファイルの内容をウォッチして処理します。(Reply も同様にファイルを使用します。)
大枠の流れは Tech*Ed のセッションでご説明しますので省略しますが、セッションを受けていない方は、ブレークポイント (F9) などを設定して追跡 (F10 / F11) するなどして流れを追うとよいでしょう。(WCF の Message の中身なども見ることができます)
コードは Channel の理解をすることが目的ですので、極力簡単になるように作成しています。そのため、排他の処理などもいっさい配慮していないので注意してください。(このプロジェクト名はそうしたことから名付けています)
以下にソースコードと要旨を簡単に記載しておきます。
==========
まず最初に、サービス側もクライアント側も、すべては BindingElement からはじまります。
今回サンプルとしている単純な Request - Reply 型の通信では、サービスは BindingElement を使って ChannelListener を構築し、クライアントはこの BindingElement を使って ChannelFactory を構築しますので、ここではその構築のための処理を記載しています。
public class BakaBindingElement : TransportBindingElement
{
public override string Scheme
{
get {
return "baka.file";
}
}
public override BindingElement Clone()
{
BakaBindingElement clone = new BakaBindingElement();
return clone;
}
// Request-Reply 型の通信のみサポート
// (Input/Output, Duplex, Session, などはサポートしない !)
public override bool CanBuildChannelFactory<TChannel>(BindingContext context)
{
if ((typeof(TChannel) != typeof(IRequestChannel)) &&
(typeof(TChannel) != typeof(IReplyChannel)))
return false;
return true;
}
// Request-Reply 型の通信のみサポート
// (Input/Output, Duplex, Session, などはサポートしない !)
public override bool CanBuildChannelListener<TChannel>(BindingContext context)
{
if( (typeof(TChannel) != typeof(IRequestChannel)) &&
(typeof(TChannel) != typeof(IReplyChannel)) )
return false;
return true;
}
// SampleClient が Factory をつかいます !
public override IChannelFactory<TChannel> BuildChannelFactory<TChannel>(BindingContext context)
{
BakaChannelFactory factory = new BakaChannelFactory();
return (IChannelFactory<TChannel>)factory;
}
// SampleService が Listener を使います !
public override IChannelListener<TChannel> BuildChannelListener<TChannel>(BindingContext context)
{
BakaChannelListner listener = new BakaChannelListner(new Uri(context.ListenUriBaseAddress, context.ListenUriRelativeAddress));
return (IChannelListener<TChannel>)listener;
}
}
Request - Reply モデルのサービス側では、まず ChannelListener に処理が渡されました。このリスナーでは、別スレッド (BeginInvoke メソッド) により Channel の受け入れ処理 (AcceptChannel) が呼び出されます。よって、この処理の中 (下記の OnAcceptChannel イベント処理の中) で、Channel の構築をおこないます。
class BakaChannelListner : ChannelListenerBase<IReplyChannel>
{
Uri uri;
BakaChannel channel = null;
string fileName;
public BakaChannelListner(Uri uri)
: base()
{
this.uri = uri;
}
// Service が Channel を構築する際に呼びます ! (非常に大事)
protected override IReplyChannel OnAcceptChannel(TimeSpan timeout)
{
if (channel != null)
return default(IReplyChannel);
channel = new BakaChannel(this, new EndpointAddress(uri));
return channel;
}
// 同上
private AsyncAcceptChannel asyncAcceptChannel;
protected override IAsyncResult OnBeginAcceptChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
asyncAcceptChannel = new AsyncAcceptChannel(_AcceptChannel);
return asyncAcceptChannel.BeginInvoke(timeout, callback, state);
}
protected delegate IReplyChannel AsyncAcceptChannel(TimeSpan timeout);
protected IReplyChannel _AcceptChannel(TimeSpan timeout)
{
return AcceptChannel(timeout);
}
// 同上
protected override IReplyChannel OnEndAcceptChannel(IAsyncResult result)
{
return asyncAcceptChannel.EndInvoke(result);
}
public override Uri Uri
{
get
{
return uri;
}
}
// リスナー作成時の初期化処理はここに記述します
protected override void OnOpen(TimeSpan timeout)
{
// 今回は、メッセージ交換用のファイルを作成!
fileName = uri.LocalPath.Trim(new char[] { '/' });
string dirName = Path.GetDirectoryName(fileName);
if (!Directory.Exists(dirName))
throw new FaultException(string.Format("Please create folder : {0} !", dirName));
File.Create(fileName).Close();
}
// リスナー終了時のクリーンアップ処理はここに記述します
protected override void OnClose(TimeSpan timeout)
{
// 作成したファイルを削除
if (File.Exists(fileName))
File.Delete(fileName);
}
protected override IAsyncResult OnBeginWaitForChannel(TimeSpan timeout, AsyncCallback callback, object state)
{
throw new NotImplementedException();
}
// . . . 以下省略します . . .
}
サービス側で構築された Channel では、クライアントからの要求 (Request) が到達するのを待機します。今回はファイルに要求が書きこまれますので、.NET の FileSystemWatcher クラスを使用してその到着を監視しています。
サービスは要求を受け取ると、RequestContext と呼ばれるオブジェクトを生成して、クライアントとの仮想的な接続状態が管理されます。(応答 (Reply) の処理も、ここで管理されます)
class BakaChannel : ChannelBase, IReplyChannel, IChannel, ICommunicationObject
{
EndpointAddress address;
string fileName;
FileSystemWatcher watcher;
bool fileChanged = false;
public BakaChannel(ChannelManagerBase channelManager, EndpointAddress address)
: base(channelManager)
{
this.address = address;
this.fileName = address.Uri.LocalPath.Trim(new char[] {'/'});
}
// Service から呼ばれます
// FileSystemWatcher クラスを使って監視します (ここだけは少し頭が良い)
protected override void OnOpen(TimeSpan timeout)
{
watcher = new FileSystemWatcher(Path.GetDirectoryName(fileName),
Path.GetFileName(fileName));
watcher.NotifyFilter = NotifyFilters.LastWrite;
watcher.Changed += new FileSystemEventHandler(watcher_Changed);
watcher.EnableRaisingEvents = true;
}
// Service が Close する際に呼ばれます
protected override void OnClose(TimeSpan timeout)
{
base.OnClosed();
}
// FileWatcher クラスのイベントハンドラです (良くないコードなのでマネをしない!)
void watcher_Changed(object sender, FileSystemEventArgs e)
{
// Note !!!
// これは Sample です.
// (Synchronize のための Lock をおなうよう修正しましょう !)
fileChanged = true;
Thread.Sleep(500);
}
protected delegate bool AsyncTryReceiveRequest(TimeSpan timeout, out RequestContext context);
private AsyncTryReceiveRequest asyncTryReceiveRequest;
// Service が Request の到着を監視する目的で使います
public IAsyncResult BeginTryReceiveRequest(TimeSpan timeout, AsyncCallback callback, object state)
{
asyncTryReceiveRequest = new AsyncTryReceiveRequest(TryReceiveRequest);
RequestContext context;
return asyncTryReceiveRequest.BeginInvoke(timeout, out context, callback, state);
}
// 同上
public bool TryReceiveRequest(TimeSpan timeout, out RequestContext context)
{
context = null;
Thread.Sleep(500);
// Close のときは監視を終了!
if (base.State != CommunicationState.Opened)
return true;
// ファイルをバカチェックして取得
// (Note !! 実際にはちゃんと Lock をおこなうこと !)
if(!fileChanged)
return false;
fileChanged = false;
FileInfo fileinfo = new FileInfo(fileName);
if (!(fileinfo.Length > 0))
return false;
string action, body;
using (StreamReader reader = new StreamReader(fileName))
{
action = reader.ReadLine();
body = reader.ReadToEnd();
}
XmlReader bodyreader = XmlReader.Create(new StringReader(body));
Message msg = Message.CreateMessage(MessageVersion.Default, action, bodyreader);
msg.Headers.To = address.Uri;
context = new BakaRequestContext(msg, fileName, timeout);
File.Delete(fileName);
File.Create(fileName).Close();
fileChanged = false;
return true;
}
// 同上
public bool EndTryReceiveRequest(IAsyncResult result, out RequestContext context)
{
return asyncTryReceiveRequest.EndInvoke(out context, result);
}
public EndpointAddress RemoteAddress
{
get
{
return this.address;
}
}
public Uri Via
{
get
{
return this.address.Uri;
}
}
// . . . 以下省略します . . .
}
// RequestContext はリクエスト交換をおこなう際に必要になります .
// Service - Client のメッセージ交換ごとに毎回 1 つずつ作成される、
// メッセージ交換のための仮想的な仲介オブジェクトです
class BakaRequestContext : RequestContext
{
Message message;
TimeSpan defaultTimeout;
string filePath;
public BakaRequestContext(Message msg, string path, TimeSpan timeout)
{
this.message = msg;
filePath = path;
this.defaultTimeout = timeout;
}
// Client が Request をおこなう際に使用されます
public override Message RequestMessage
{
get
{
return this.message;
}
}
// Service が Response を返す際に使用されます
public override void Reply(Message message, TimeSpan timeout)
{
// Reply を渡す (file でバカ通信)
// (Note !! 実際にはちゃんと Lock をおこなうこと !)
string action, body;
action = message.Headers.Action;
XmlDictionaryReader xmlDicReader = message.GetReaderAtBodyContents();
body = xmlDicReader.ReadOuterXml();
using(Stream stream = File.Create(filePath + "_Response"))
using(StreamWriter writer = new StreamWriter(stream))
{
writer.WriteLine(action);
writer.WriteLine(body);
}
}
public override void Reply(Message message)
{
this.Reply(message, defaultTimeout);
}
// . . . 以下省略します . . .
}
一方、Request - Reply モデルのクライアント側では、クライアントが ChannelFactory の CreateChannel メソッドを能動的に呼び出して Channel を構築します。よって、CreateChannel の際に呼ばれる OnCreateChannel メソッドで Channel 構築をおこなうことになります。(Channel は、Reply 用と Request 用に分けて構築しても構いません。今回は双方の処理を同じ BakaChannel クラスに実装します)
class BakaChannelFactory : ChannelFactoryBase<IRequestChannel>
{
protected override IRequestChannel OnCreateChannel(System.ServiceModel.EndpointAddress address, Uri via)
{
BakaChannel channel = new BakaChannel(this, address);
return channel;
}
protected override void OnOpen(TimeSpan timeout)
{
// 何もしない . . .
// (Factory 構築時の初期化処理などは、ここに記述します . . .)
}
// . . . 以下省略します . . .
}
クライアント側の Channel の処理はいたってシンプルです。Request の処理としてファイルへの書き込みをおこなうのが主な仕事です。(Channel は、Reply 用と Request 用に分けて構築しても構いませんが、今回は双方の処理を同じ BakaChannel クラスに実装しています)
応答 (Response) は、「だいたい 3 秒くらい待てば帰ってくるだろう」という非常にいいかげんな処理を実装していますので、決してマネしないでください。(あくまでも、サンプルのためだけの、Baka = Foolish な Channel なのです)
class BakaChannel : ChannelBase, IReplyChannel, IRequestChannel, IChannel, ICommunicationObject
{
EndpointAddress address;
string fileName;
FileSystemWatcher watcher;
bool fileChanged = false;
public BakaChannel(ChannelManagerBase channelManager, EndpointAddress address)
: base(channelManager)
{
this.address = address;
this.fileName = address.Uri.LocalPath.Trim(new char[] {'/'});
}
// Client から Request を発行する際に呼ばれます
public Message Request(Message message, TimeSpan timeout)
{
// メッセージを送信 (file でバカ通信)
// (Note !! 実際にはちゃんと Lock をおこなうこと !)
string action, body;
action = message.Headers.Action;
XmlDictionaryReader xmlDicReader = message.GetReaderAtBodyContents();
body = xmlDicReader.ReadOuterXml();
using (StreamWriter writer = new StreamWriter(fileName))
{
writer.WriteLine(action);
writer.WriteLine(body);
}
// メッセージを受信 (file でバカ受信)
Thread.Sleep(3000); // 3 秒待って回答が来なければ失敗 !!??
string actionResponse, bodyResponse;
using (StreamReader streamReader = new StreamReader(fileName + "_Response"))
{
actionResponse = streamReader.ReadLine();
bodyResponse = streamReader.ReadToEnd();
}
XmlReader xmlReader = XmlReader.Create(new StringReader(bodyResponse));
Message messageResponse = Message.CreateMessage(MessageVersion.Default, actionResponse, xmlReader);
File.Delete(fileName + "_Response");
return messageResponse;
}
public Message Request(Message message)
{
return this.Request(message, base.DefaultSendTimeout);
}
public EndpointAddress RemoteAddress
{
get
{
return this.address;
}
}
public Uri Via
{
get
{
return this.address.Uri;
}
}
// . . . 以下省略します . . .
}