在现代编程中,异步编程模式是实现高效、响应式应用程序的关键技术之一。它允许长时间运行的操作在调用线程之外的线程上执行,从而让调用线程保持非阻塞状态,继续执行其他任务。这对于构建响应式的图形用户界面(GUI)和有效地进行远程调用(无论是调用WCF服务、使用.NET远程调用还是访问基于REST的Web服务等)都是非常重要的技术。
异步编程模式通常采用Begin/End模式,其基本形式如下:
C# IAsyncResult BeginOperation(...some number of parameters as input, AsyncCallback callback, object state);
SomeResult EndOperation(IAsyncResult);
尽管BeginInvoke/EndInvoke模式功能强大,但使用起来却相当笨拙且不够直观,特别是当EndOperation部分返回值时。从上面的模式可以看出,Begin部分接收一个回调委托和对象"state",并返回一个IAsyncResult。不清楚应该传递什么作为回调或状态,也不清楚如何处理调用函数后得到的IAsyncResult。最重要的是,不清楚如何获取EndOperation的结果。
为了简化Begin/End模式的使用,微软推荐使用一种称为基于事件的异步模式(Event-based Asynchronous Pattern,EAP)。尽管这种模式有所改进,但可以利用.NET的新响应式扩展库(Reactive Extensions for .NET,简称Rx)做得更好。
异步编程设计模式在.NET中的应用。
基于事件的异步模式。
.NET响应式扩展库的主页。
要使用包装器,只需创建一个泛型类型的AsyncPatternWrapper的新实例。至少需要使用一个泛型类型参数——该参数将代表EndOperation返回的内容。如果BeginOperation除了回调和状态参数之外还接收其他参数,则必须传递额外的泛型参数。
例如,如果BeginOperation接收一个string,而EndOperation返回一个int,则可以声明一个新的AsyncPatternWrapper
AsyncPatternWrapper类的构造函数接收两个参数——BeginOperation及其对应的EndOperation。
AsyncPatternWrapper类实现了IObservable
除了实现IObservable之外,AsyncPatternWrapper只有一个函数——Invoke()。它所做的就是调用BeginOperation。当该操作完成时,IObservable流中将产生另一个结果。只要已经订阅了IObservable流(通过"Subscribe"函数),将自动处理EndOperation的结果。
C# WebRequest request = HttpWebRequest.Create("http://services.digg.com/containers?appkey=http%3A%2F%2Fapidoc.digg.com");
var wrapper = new AsyncPatternWrapper<WebResponse>(request.BeginGetResponse, request.EndGetResponse);
wrapper.Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
wrapper.Invoke();
.NET的新响应式扩展库(以前称为Reactive LINQ)的亮点在于,可以在事件流上使用标准的LINQ查询操作符。因此,在上述示例1中,不仅可以打印每个WebResponse的ContentLength,还可以只打印ContentLength大于3000的WebResponse:
C# wrapper.Where(webResponse => webResponse.ContentLength > 3000).Subscribe(webResponse => Console.WriteLine(webResponse.ContentLength));
C# [ServiceContract] public interface IService { [OperationContract(AsyncPattern = true)] IAsyncResult BeginGetCustomers(AsyncCallback callback, object state); List<Customer> EndGetCustomers(IAsyncResult result); }
代码片段——展示了客户端如何使用WCF服务:
C# var cf = new ChannelFactory<IService>(new BasicHttpBinding(), new EndpointAddress("http://localhost:8085")); var service = cf.CreateChannel(); var wrapper = new AsyncPatternWrapper<List<Customer>>(service.BeginGetCustomers, service.EndGetCustomers); wrapper.ObserveOnDispatcher().Subscribe(customers => UpdateUI(customers)); // refreshButton是UI上的某个按钮——每次点击它时,都会异步刷新客户列表: refreshButton.Click += (s,e) => wrapper.Invoke();
请注意,在上述示例2中,不需要通过Dispatcher切换回UI线程以更新UI!因此,代码最终会变得更加清晰、更加易读。
另一个收获是,不必依赖WCF代理生成器,可以非常容易地使用AsyncPatternWrapper类来创建自己的代理。这在WCF服务不公开.svc文件的情况下非常有价值,或者在不可能有.svc文件的情况下,如WCF REST。
以下是AsyncPatternWrapper的源代码,适用于Begin操作中没有参数的情况,以及Begin操作中有一个参数的情况。请参阅附加的压缩文件,以获取所有参数数量的AsyncPatternWrapper类。
C# using System; using System.Linq; namespace System { #region No arguments public class AsyncPatternWrapper<TResult> : IObservable<TResult> { Func<AsyncCallback, object, IAsyncResult> beginOp; Func<IAsyncResult, TResult> endOp; IObservable<TResult> observable; event EventHandler<EventArgs<TResult>> done; public AsyncPatternWrapper(Func<AsyncCallback, object, IAsyncResult> beginOp, Func<IAsyncResult, TResult> endOp) { this.beginOp = beginOp; this.endOp = endOp; observable = Observable.FromEvent<EventArgs<TResult>>(e => this.done += e, e => this.done -= e) .Select(s => s.EventArgs.Item); } public void Invoke() { IAsyncResult result = null; result = beginOp(new AsyncCallback(notUsed => { try { var res = endOp(result); if (done != null) done(null, new EventArgs<TResult> { Item = res }); } catch (Exception) { // Don't raise an event if there was an exception } }), null); } #region IObservable<TResult> Members public IDisposable Subscribe(IObserver<TResult> observer) { return observable.Subscribe(observer); } #endregion } #endregion #region One argument public class AsyncPatternWrapper<T, TResult> : IObservable<TResult> { Func<T, AsyncCallback, object, IAsyncResult> beginOp; Func<IAsyncResult, TResult> endOp; IObservable<TResult> observable; event EventHandler<EventArgs<TResult>> done; public AsyncPatternWrapper(Func<T, AsyncCallback, object, IAsyncResult> beginOp, Func<IAsyncResult, TResult> endOp) { this.beginOp = beginOp; this.endOp = endOp; observable = Observable.FromEvent<EventArgs<TResult>>(e => this.done += e, e => this.done -= e) .Select(s => s.EventArgs.Item); } public void Invoke(T param) { IAsyncResult result = null; result = beginOp(param, new AsyncCallback(notUsed => { try { var res = endOp(result); if (done != null) done(null, new EventArgs<TResult> { Item = res }); } catch (Exception) { // Don't raise an event if there was an exception } }), null); } public IDisposable Subscribe(IObserver<TResult> observer) { return observable.Subscribe(observer); } } #endregion internal class EventArgs<T> : EventArgs { public T Item { get; set; } } }
请注意,在上述代码中,异常只是被丢弃——如果发现重新抛出异常更有用,或者选择暴露另一个专门用于异常的IObservable流,这是一个很容易的更改。