在.NET2.0中,无法实现许多现代编程技术。PLINQ的出现极大地扩展了并行编程的可能性。本文将探讨一个在多年编程生涯中经常遇到的问题:如何加速那些受限于阻塞函数或长时间运行的I/O操作的多线程应用程序?
最初是在VistaDB引擎中处理一些长时间运行的文件I/O例程时考虑使用这种方法。大多数情况下,在继续工作之前,都会阻塞在磁盘读取上,但通常已经拥有了所需的部分数据块。因此,可以开始工作,然后在其余数据块加载完成后继续。使用传统的线程代码添加这种逻辑非常复杂,并且容易出错。幸运的是,PLINQ提供了一种简化这些操作的方法。
以这个例子为例,将读取8个网站的首页,然后对这些信息进行后续处理。这是一种非常简单的并行操作,可以很好地分割。但这些长时间运行的读取操作与许多应用程序中发生的情况非常相似。
实际上,这个例子是从Joseph Albahari的《C# 4.0 简明教程》一书中采纳的。这本书虽然有1000页,但对已经了解C#的开发者来说,这是一本非常棒的书,他们只需要了解C#和CLR 4的新变化。书中的概念涵盖了.NET的旧版本,但对来说,最有趣的部分是所有的新变化。
这个表达式将访问这个列表中的8个网站,并获取每个网站的首页。页面的内容长度和内容类型然后存储在变量中,以便在并行计算之外使用。
static void Main(string[] args)
{
Stopwatch sw = new Stopwatch();
sw.Start();
var results = from site in new[]
{
"http://infinitecodex.com",
"http://www.vistadb.net",
"http://stackoverflow.com",
"http://cornerstonedb.com",
"http://www.bing.com/",
"http://www.linqpad.net",
"http://www.cnn.com",
"http://www.microsoft.com"
}
let p = WebRequest.Create(new Uri(site)).GetResponse()
select new
{
site,
Length = p.ContentLength,
ContentType = p.ContentType
};
foreach (var result in results)
{
Console.WriteLine("{0}:{1}:{2}", result.site, result.Length, result.ContentType);
}
sw.Stop();
Console.WriteLine("Total Time: {0}ms", sw.ElapsedMilliseconds);
}
最初的运行没有使用并行扩展。只是遍历每个网站,获取首页,将ContentLength和ContentType存储在临时变量p中。然后,使用foreach遍历结果,将它们输出到命令行。如果跳过这一步,由于LINQ的延迟执行,实际上什么也不会发生(必须对集合做一些事情,它才会真正运行)。将所有这些包装在一个Stopwatch中,以便知道它需要多长时间。本文顶部的图表显示了在运行每种方法10次后获得的3个最快时间。
普通执行的三个最快时间(毫秒):1916、2103、1992。
现在,让使用PLINQ,看看它是否运行得更快。唯一需要做的改变就是在let语句上方添加一行代码,如下所示:
.AsParallel()
let p = WebRequest.Create(new Uri(site)).GetResponse()
就是这样,整个LINQ查询现在将并行运行。它更快,但可以让它更快。
使用AsParallel()的三个最快时间(毫秒):745、790、814。
PLINQ在幕后所做的是创建一个线程池,并在4核机器上启动4个线程。但它不知道的是,每个操作都在阻塞等待网站的I/O。PLINQ假设每个线程都将执行一定量的CPU工作,因此它防止启动大量线程,这些线程只会压垮CPU。
WithDegreeOfParallelism
来自MSDN的帮助:
WithDegreeOfParallelism - Degree of parallelism is the maximum number of concurrently executing tasks that will be used to process the query.
这并不确切地用简单的英语解释可以使用它来告诉框架任务不是CPU密集型的。从技术上讲,正在覆盖PLINQ的默认行为,并告诉它知道应该允许多少并发运行。
在这种情况下,将设置为8,因为知道每个CPU核心的两个对象不会给系统带来负担。可以设置的最大值是64。现在,每个线程池将尝试一次运行多个线程。为什么可以在不增加很多任务切换开销的情况下做到这一点?因为对象都在I/O上阻塞。操作系统将让它们休眠并释放CPU供其他任务运行,只是给这些任务更多的工作,让它们稍微忙一点。
再次,只需要对第一个查询进行一行更改:
.AsParallel().WithDegreeOfParallelism(8)
// HERE
let p = WebRequest.Create(new Uri(site)).GetResponse()