在现代编程中,处理大量数据时,异步和并行执行方法变得越来越重要。特别是当涉及到网络驱动器或其他I/O密集型操作时,这些技术可以显著提高性能。本文将介绍如何使用并发与协调运行时(CCR)来计算目录大小,这是一种异步递归方法。
递归计算目录大小是一个直接的过程。以下是一个使用C#语言的示例,展示了如何使用LINQ查询来计算每个文件的大小以及每个子目录的大小(递归地),然后将它们相加得到总和。
static long SerialRecursiveDirectorySize(string path)
{
return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) +
Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize);
}
这段代码首先计算指定路径下所有文件的总大小,然后递归地对每个子目录调用相同的方法,并将结果相加。
并发与协调运行时(CCR)允许在不同的线程中执行方法,通过声明方法之间的关系来实现。下面是一个使用CCR实现的并行递归计算目录大小的示例。
long ParallelDirectorySize(string path)
{
using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads"))
{
DispatcherQueue queue = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher);
var outputPort = new Port();
ParallelDirectorySizeRecursive(path, queue, outputPort);
return queue.WaitOnPort(outputPort);
}
}
在这个示例中,Dispatcher对象持有一个线程池,DispatcherQueue对象持有可以立即执行的待处理委托列表,这些委托正在等待线程变为可用。
ParallelDirectorySizeRecursive方法通过在dispatcher队列中排队任务来计算目录的大小。当计算完成时,它将结果发布到outputPort。
异步递归实现将以下任务排队:
void ParallelDirectorySizeRecursive(string directory, DispatcherQueue queue, Port outputPort)
{
var subDirectories = Directory.GetDirectories(directory);
var inputPort = new Port();
Arbiter.Activate(queue,
Arbiter.FromHandler(
delegate()
{
inputPort.Post(TotalFileSize(directory));
}),
Arbiter.MultipleItemReceive(
false,
inputPort,
subDirectories.Length + 1,
delegate(long[] subDirSize)
{
outputPort.Post(subDirSize.Sum());
}));
foreach (string subDir in subDirectories)
{
ParallelDirectorySizeRecursive(subDir, queue, inputPort);
}
}
long TotalFileSize(string path)
{
return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length);
}
如果不熟悉匿名方法(delegates),请注意这些任务在单独的线程上运行,但它们仍然可以访问局部变量(例如directory和subDirSize)。实际上,它们接收一个副本,可以在它们的线程中使用。这使得代码更短。
希望递归能够从UnauthorizedAccessException中恢复。简单的递归实现接收一个Collection<Exception>,用于收集捕获的异常。所有递归调用都接收相同的集合对象:
long SerialRecursiveDirectorySizeErrorHandling(string path, Collection errors)
{
long fileSize = 0;
var subDirs = new string[] {};
try
{
fileSize = TotalFileSize(path);
subDirs = Directory.GetDirectories(path);
}
catch (UnauthorizedAccessException ex)
{
errors.Add(ex);
}
return fileSize + subDirs.Sum(p => SerialRecursiveDirectorySizeErrorHandling(p, errors));
}
类似地,平行递归接收一个Port<Exception>对象(见outputPort.P1)。相同的异常端口对象被传递给所有递归调用。
void ParallelDirectorySizeWithErrorHandlerRecursive(string directory, DispatcherQueue queue, PortSet outputPort)
{
var subDirectories = new string[] {};
try
{
subDirectories = Directory.GetDirectories(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P0.Post(0);
outputPort.P1.Post(ex);
return;
}
var inputPort = new PortSet(new Port(), outputPort.P1);
Arbiter.Activate(queue,
Arbiter.FromHandler(
delegate()
{
long size = 0;
try
{
size = TotalFileSize(directory);
}
catch (UnauthorizedAccessException ex)
{
outputPort.P1.Post(ex);
}
finally
{
inputPort.P0.Post(size);
}
}),
Arbiter.MultipleItemReceive(
false,
inputPort.P0,
subDirectories.Length + 1,
delegate(long[] subDirSize)
{
outputPort.P0.Post(subDirSize.Sum());
}));
foreach (string subDir in subDirectories)
{
ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort);
}
}