最近,遇到了一个有趣的性能挑战,最终通过一个令人惊叹的C#语言增强功能——异步流,将一个耗时3小时的任务缩短到了1.5小时。可能会说,这是2019年的技术,可能已经读过一些头条新闻和博客文章,但真的理解这项技术以及它的意义吗?之前并没有。在这篇文章中,将快速解释异步流是什么,描述它们帮助解决的实际问题,并展示一些常见的陷阱,以防遇到类似的情况。
简而言之,异步流是C#8中引入的一个语言特性,它允许异步地处理数据流。对,很明显。一个例子可能会有帮助。
IAsyncEnumerable<int> numbers = Producer.GetNumbersAsync();
await foreach (var number in numbers)
{
if (number > 10)
break;
}
在上面的代码中,正在检索一个类型为IAsyncEnumerable的数字集合(C# 8中引入的一个接口),并使用await foreach(C# 8中引入的另一个新语言特性)遍历前10个数字。这里的巧妙之处在于,循环的每次迭代都有一个隐藏的await,它创建了一个继续执行的操作,并将控制权返回给调用者,直到数据提供者提供了一个新的数字。这种将控制权返回给调用者的操作通常是await在C# 5中引入的。它释放了宿主,使其可以刷新移动应用的UI,或者响应HTTP请求。IAsyncEnumerable的新特点是,await现在在可枚举类型中成为了一等公民。
可以通过打开ILSpy中的代码来看到它是如何工作的。如果在C#8之前的版本中反编译并查看它(ILSpy在这方面非常出色),会得到这样的结果:
IAsyncEnumerable<int> numbers = Producer.GetNumbersAsync();
IAsyncEnumerator<int> asyncEnumerator = numbers.GetAsyncEnumerator();
try
{
while (await asyncEnumerator.MoveNextAsync())
{
int number = asyncEnumerator.Current;
if (number > 10)
{
break;
}
}
}
finally
{
if (asyncEnumerator != null)
{
await asyncEnumerator.DisposeAsync();
}
}
现在可以清楚地看到IAsyncEnumerable的工作方式几乎与IEnumerable的工作方式完全相同,它有.MoveNext和.Current方法。除了三件事:
有趣,但它的实用性还有待观察。
在项目中,需要每天下载和处理大型文件。想象一下60GB的CSV文件。技术上,它们是60GB的BSON文件。如果是CSV,它们会更大。无论如何,需要读取和处理大量数据,而且这很慢。需要几个小时。问题在于,数据需要在一天中的某个特定时间准备好,如果出了什么问题,就得重新开始。所以只有几次机会,更糟糕的是:对于这个客户来说,数据在未来会变得更庞大。需要找到性能优化的方法。
从历史上看,通常会将这个过程分为几个步骤:
这简化了,但总体上这三个步骤需要超过2个小时。下载:大约40分钟。读取和处理:大约1.5小时。插入:大约10分钟。
团队花了很多时间思考性能解决方案。但有一件事让对这个过程感到困扰。也许重新阅读这些要点,看看是否有任何突出的地方。
答案:为什么要在磁盘上保存和读取?!理论上,这就是流存在的原因。应该能够在下载数据并将其处理成90MB,而根本不触及磁盘。对吧?!
此外,那个IO听起来很慢,但那是另一个故事。
但不确定异步流是否可以应用于通过HTTP下载大型文件。首先,团队一直在下载BSON格式的zip文件。需要数据以流的形式消费,所以压缩是正确的。以流的形式消费BSON最终是可行的,但那是后来的事情,超出了这篇文章的范围。因此,第一次尝试使用未压缩的CSV。
幸运的是,有一种方法可以在数据提供者的API中指定想要未压缩的CSV内容。这将增加下载时间,但打赌会在处理过程中弥补这一点,因为磁盘似乎是如此瓶颈。
接下来,很好奇TCP数据包是否在请求后立即开始,并在新行边界处中断。重要吗?不确定,尽管它确实为博客文章提供了一个很好的图片。
Wireshark数据包看起来像这样:
U0(ñòßGAäwÆMP'Û10d
2021-06-26T23:24:45,10.79
2021-06-26T23:24:53,97.83
2021-06-26T23:25:01,86.53
2021-06-26T23:25:09,3.83
2021-06-26T23:25:17,39.38
2021-06-26T23:25:25,37.94
2021-06-26T23:25:33,31.59
2021-06-26T23:25:41,12.55
2021-06-26T23:25:49,74.67
2021-06-26T23:25:57,95.25
一些随机的元数据在顶部,但以新行结尾。太好了。
顺便说一下,这实际上是为本文的目的构建的一个应用程序的结果,它模拟了实际数据提供者。它叫做DisklessAsynchronousStreams(也许不要试图快速说10次)。它是开源的,如果想更详细地探索本文的代码。
回到重点,很快了解到,异步拉取数据而不写入磁盘的重要魔法是,在调用HttpClient上的GetAsync()或SendAsync()时设置HttpCompletionOption.ResponseHeadersRead标志。这告诉编译器仅在收到头部后才阻塞,然后继续执行。然后,对ReadLineAsync()的调用可以在数据仍在下载时进行。更具体地说:
using var response = await httpClient.GetAsync(uri, HttpCompletionOption.ResponseHeadersRead);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync();
using var streamReader = new StreamReader(stream, Encoding.UTF8);
while (!streamReader.EndOfStream)
{
var line = await streamReader.ReadLineAsync();
var trade = GetTradeFromLine(rowNum, line);
yield return trade;
}
上面的代码可以工作,但只有在C# 8中。在C# 8之前,返回类型需要是async Task<IEnumerable<Trade>>。看起来合理。然而,编译器会给:
The return type of an async method must be void, Task, Task<T>,
a task-like type, IAsyncEnumerable<T>, or IAsyncEnumerator<T>
然而,在C# 8和IAsyncEnumerable中,简单的解决方案是返回IAsyncEnumerable,然后可以随后用async foreach消费。
private async IAsyncEnumerable<trade> StreamReadLines()
{
...
}
如果对yield的工作原理感到不确定,请查看《System.Linq.Where()的工作原理》。
这里有一个有趣的错误,猜猜它是什么意思:
Type: System.IO.IOException
Message: Unable to read data from the transport connection: An existing connection was forcibly closed by the remote host.
Inner exception
Type: System.Net.Sockets.SocketError
SocketErrorCode: ConnectionReset
Message: An existing connection was forcibly closed by the remote host.
StackTrace:
...
at System.IO.StreamReader.d__67.MoveNext()
System.IO.StreamReader.d__59.MoveNext
如果说是远程主机关闭了连接,恭喜,可以阅读,但遗憾的是,这根本不是发生的事情。实际问题是消费者超过了缓冲区(这种事情发生在最好的人身上),然后.NET对撒谎,这让感到难过。
当消费者从生产者读取数据的速度太慢时,就会出现问题。基本上,如果数据以比处理的速度快的速度进来,那么就需要有人将数据保存在一定大小的内存插槽中,最终数据将超过插槽的大小。
有趣的是,可以通过设置HttpClient上的MaxResponseContentBufferSize的较小值来更快地实现它。不幸的是,不能将MaxResponseContentBufferSize设置得超过其默认大小2GB。因此,确保不要在主消息处理循环中做任何慢的事情。
不要指望消费者能够成功地异步流式读取数据,同时打开Fiddler。Fiddler对于观察常规HTTP流量非常有用,但它会在转发之前批量处理整个请求,然后知道,浪费了30分钟试图弄清楚为什么不能在复制项目时重现生产环境,同时撰写博客文章。听从警告:不要成为那个人。