异步流:提升数据处理效率的利器

最近,遇到了一个有趣的性能挑战,最终通过一个令人惊叹的C#语言增强功能——异步流,将一个耗时3小时的任务缩短到了1.5小时。可能会说,这是2019年的技术,可能已经读过一些头条新闻和博客文章,但真的理解这项技术以及它的意义吗?之前并没有。在这篇文章中,将快速解释异步流是什么,描述它们帮助解决的实际问题,并展示一些常见的陷阱,以防遇到类似的情况。

什么是异步流?

简而言之,异步流是C#8中引入的一个语言特性,它允许异步地处理数据流。对,很明显。一个例子可能会有帮助。

IAsyncEnumerable<int> numbers = Producer.GetNumbersAsync(); await foreach (var number in numbers) { if (number > 10) break; }

在上面的代码中,正在检索一个类型为IAsyncEnumerable的数字集合(C# 8中引入的一个接口),并使用await foreach(C# 8中引入的另一个新语言特性)遍历前10个数字。这里的巧妙之处在于,循环的每次迭代都有一个隐藏的await,它创建了一个继续执行的操作,并将控制权返回给调用者,直到数据提供者提供了一个新的数字。这种将控制权返回给调用者的操作通常是await在C# 5中引入的。它释放了宿主,使其可以刷新移动应用的UI,或者响应HTTP请求。IAsyncEnumerable的新特点是,await现在在可枚举类型中成为了一等公民。

可以通过打开ILSpy中的代码来看到它是如何工作的。如果在C#8之前的版本中反编译并查看它(ILSpy在这方面非常出色),会得到这样的结果:

IAsyncEnumerable<int> numbers = Producer.GetNumbersAsync(); IAsyncEnumerator<int> asyncEnumerator = numbers.GetAsyncEnumerator(); try { while (await asyncEnumerator.MoveNextAsync()) { int number = asyncEnumerator.Current; if (number > 10) { break; } } } finally { if (asyncEnumerator != null) { await asyncEnumerator.DisposeAsync(); } }

现在可以清楚地看到IAsyncEnumerable的工作方式几乎与IEnumerable的工作方式完全相同,它有.MoveNext和.Current方法。除了三件事:

  • 方法名后缀为Async
  • 一切都是基于任务的
  • 有一些额外的清理工作

有趣,但它的实用性还有待观察。

晦涩的语言特性:遇见现实世界

在项目中,需要每天下载和处理大型文件。想象一下60GB的CSV文件。技术上,它们是60GB的BSON文件。如果是CSV,它们会更大。无论如何,需要读取和处理大量数据,而且这很慢。需要几个小时。问题在于,数据需要在一天中的某个特定时间准备好,如果出了什么问题,就得重新开始。所以只有几次机会,更糟糕的是:对于这个客户来说,数据在未来会变得更庞大。需要找到性能优化的方法。

从历史上看,通常会将这个过程分为几个步骤:

  • 下载文件
  • 读取和处理文件(使用任务并行库中的DataFlow,如果不熟悉,应该立即去学习)
  • 将结果(大约90MB)插入数据库

这简化了,但总体上这三个步骤需要超过2个小时。下载:大约40分钟。读取和处理:大约1.5小时。插入:大约10分钟。

团队花了很多时间思考性能解决方案。但有一件事让对这个过程感到困扰。也许重新阅读这些要点,看看是否有任何突出的地方。

答案:为什么要在磁盘上保存和读取?!理论上,这就是流存在的原因。应该能够在下载数据并将其处理成90MB,而根本不触及磁盘。对吧?!

此外,那个IO听起来很慢,但那是另一个故事。

捕获内容作为流?

但不确定异步流是否可以应用于通过HTTP下载大型文件。首先,团队一直在下载BSON格式的zip文件。需要数据以流的形式消费,所以压缩是正确的。以流的形式消费BSON最终是可行的,但那是后来的事情,超出了这篇文章的范围。因此,第一次尝试使用未压缩的CSV。

幸运的是,有一种方法可以在数据提供者的API中指定想要未压缩的CSV内容。这将增加下载时间,但打赌会在处理过程中弥补这一点,因为磁盘似乎是如此瓶颈。

接下来,很好奇TCP数据包是否在请求后立即开始,并在新行边界处中断。重要吗?不确定,尽管它确实为博客文章提供了一个很好的图片。

Wireshark数据包看起来像这样:

U0(ñòßGAäwÆMP'Û10d 2021-06-26T23:24:45,10.79 2021-06-26T23:24:53,97.83 2021-06-26T23:25:01,86.53 2021-06-26T23:25:09,3.83 2021-06-26T23:25:17,39.38 2021-06-26T23:25:25,37.94 2021-06-26T23:25:33,31.59 2021-06-26T23:25:41,12.55 2021-06-26T23:25:49,74.67 2021-06-26T23:25:57,95.25

一些随机的元数据在顶部,但以新行结尾。太好了。

顺便说一下,这实际上是为本文的目的构建的一个应用程序的结果,它模拟了实际数据提供者。它叫做DisklessAsynchronousStreams(也许不要试图快速说10次)。它是开源的,如果想更详细地探索本文的代码。

异步消费

回到重点,很快了解到,异步拉取数据而不写入磁盘的重要魔法是,在调用HttpClient上的GetAsync()或SendAsync()时设置HttpCompletionOption.ResponseHeadersRead标志。这告诉编译器仅在收到头部后才阻塞,然后继续执行。然后,对ReadLineAsync()的调用可以在数据仍在下载时进行。更具体地说:

using var response = await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead); response.EnsureSuccessStatusCode(); await using var stream = await response.Content.ReadAsStreamAsync(); using var streamReader = new StreamReader(stream, Encoding.UTF8); while (!streamReader.EndOfStream) { var line = await streamReader.ReadLineAsync(); var trade = GetTradeFromLine(rowNum, line); yield return trade; }

上面的代码可以工作,但只有在C# 8中。在C# 8之前,返回类型需要是async Task<IEnumerable<Trade>>。看起来合理。然而,编译器会给:

The return type of an async method must be void, Task, Task<T>, a task-like type, IAsyncEnumerable<T>, or IAsyncEnumerator<T>

然而,在C# 8和IAsyncEnumerable中,简单的解决方案是返回IAsyncEnumerable,然后可以随后用async foreach消费。

private async IAsyncEnumerable<trade> StreamReadLines() { ... }

如果对yield的工作原理感到不确定,请查看《System.Linq.Where()的工作原理》。

异步限制

这里有一个有趣的错误,猜猜它是什么意思:

Type: System.IO.IOException Message: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host. Inner exception Type: System.Net.Sockets.SocketError SocketErrorCode: ConnectionReset Message: An existing connection was forcibly closed by the remote host. StackTrace: ... at System.IO.StreamReader.d__67.MoveNext() System.IO.StreamReader.d__59.MoveNext

如果说是远程主机关闭了连接,恭喜,可以阅读,但遗憾的是,这根本不是发生的事情。实际问题是消费者超过了缓冲区(这种事情发生在最好的人身上),然后.NET对撒谎,这让感到难过。

当消费者从生产者读取数据的速度太慢时,就会出现问题。基本上,如果数据以比处理的速度快的速度进来,那么就需要有人将数据保存在一定大小的内存插槽中,最终数据将超过插槽的大小。

有趣的是,可以通过设置HttpClient上的MaxResponseContentBufferSize的较小值来更快地实现它。不幸的是,不能将MaxResponseContentBufferSize设置得超过其默认大小2GB。因此,确保不要在主消息处理循环中做任何慢的事情。

另一个陷阱

不要指望消费者能够成功地异步流式读取数据,同时打开Fiddler。Fiddler对于观察常规HTTP流量非常有用,但它会在转发之前批量处理整个请求,然后知道,浪费了30分钟试图弄清楚为什么不能在复制项目时重现生产环境,同时撰写博客文章。听从警告:不要成为那个人。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485