并发与协调运行时(CCR)在异步递归方法中计算目录大小

在现代编程中,处理大量数据时,异步和并行执行方法变得越来越重要。特别是当涉及到网络驱动器或其他I/O密集型操作时,这些技术可以显著提高性能。本文将介绍如何使用并发与协调运行时(CCR)来计算目录大小,这是一种异步递归方法。

递归计算目录大小

递归计算目录大小是一个直接的过程。以下是一个使用C#语言的示例,展示了如何使用LINQ查询来计算每个文件的大小以及每个子目录的大小(递归地),然后将它们相加得到总和。

static long SerialRecursiveDirectorySize(string path) { return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length) + Directory.GetDirectories(path).Sum(SerialRecursiveDirectorySize); }

这段代码首先计算指定路径下所有文件的总大小,然后递归地对每个子目录调用相同的方法,并将结果相加。

并行递归

并发与协调运行时(CCR)允许在不同的线程中执行方法,通过声明方法之间的关系来实现。下面是一个使用CCR实现的并行递归计算目录大小的示例。

long ParallelDirectorySize(string path) { using (Dispatcher dispatcher = new Dispatcher(0, "CCR Threads")) { DispatcherQueue queue = new DispatcherQueue("Pipeline DispatcherQueue", dispatcher); var outputPort = new Port(); ParallelDirectorySizeRecursive(path, queue, outputPort); return queue.WaitOnPort(outputPort); } }

在这个示例中,Dispatcher对象持有一个线程池,DispatcherQueue对象持有可以立即执行的待处理委托列表,这些委托正在等待线程变为可用。

ParallelDirectorySizeRecursive方法通过在dispatcher队列中排队任务来计算目录的大小。当计算完成时,它将结果发布到outputPort。

异步递归实现

异步递归实现将以下任务排队:

  • 立即执行的任务,用于计算目录中所有文件的总大小。完成后,将结果发布到inputPort。
  • 每个递归调用都会排队一个任务,将子目录大小发布到inputPort。递归调用被调用subDirectories.Length次。
  • 一旦inputPort中有subDirectories.Length + 1个值(这意味着文件大小和递归子目录大小计算已完成),就执行的任务,将所有结果相加并发布到outputPort。
void ParallelDirectorySizeRecursive(string directory, DispatcherQueue queue, Port outputPort) { var subDirectories = Directory.GetDirectories(directory); var inputPort = new Port(); Arbiter.Activate(queue, Arbiter.FromHandler( delegate() { inputPort.Post(TotalFileSize(directory)); }), Arbiter.MultipleItemReceive( false, inputPort, subDirectories.Length + 1, delegate(long[] subDirSize) { outputPort.Post(subDirSize.Sum()); })); foreach (string subDir in subDirectories) { ParallelDirectorySizeRecursive(subDir, queue, inputPort); } } long TotalFileSize(string path) { return Directory.GetFiles(path).Sum(f => new FileInfo(f).Length); }

如果不熟悉匿名方法(delegates),请注意这些任务在单独的线程上运行,但它们仍然可以访问局部变量(例如directory和subDirSize)。实际上,它们接收一个副本,可以在它们的线程中使用。这使得代码更短。

错误处理

希望递归能够从UnauthorizedAccessException中恢复。简单的递归实现接收一个Collection<Exception>,用于收集捕获的异常。所有递归调用都接收相同的集合对象:

long SerialRecursiveDirectorySizeErrorHandling(string path, Collection errors) { long fileSize = 0; var subDirs = new string[] {}; try { fileSize = TotalFileSize(path); subDirs = Directory.GetDirectories(path); } catch (UnauthorizedAccessException ex) { errors.Add(ex); } return fileSize + subDirs.Sum(p => SerialRecursiveDirectorySizeErrorHandling(p, errors)); }

类似地,平行递归接收一个Port<Exception>对象(见outputPort.P1)。相同的异常端口对象被传递给所有递归调用。

void ParallelDirectorySizeWithErrorHandlerRecursive(string directory, DispatcherQueue queue, PortSet outputPort) { var subDirectories = new string[] {}; try { subDirectories = Directory.GetDirectories(directory); } catch (UnauthorizedAccessException ex) { outputPort.P0.Post(0); outputPort.P1.Post(ex); return; } var inputPort = new PortSet(new Port(), outputPort.P1); Arbiter.Activate(queue, Arbiter.FromHandler( delegate() { long size = 0; try { size = TotalFileSize(directory); } catch (UnauthorizedAccessException ex) { outputPort.P1.Post(ex); } finally { inputPort.P0.Post(size); } }), Arbiter.MultipleItemReceive( false, inputPort.P0, subDirectories.Length + 1, delegate(long[] subDirSize) { outputPort.P0.Post(subDirSize.Sum()); })); foreach (string subDir in subDirectories) { ParallelDirectorySizeWithErrorHandlerRecursive(subDir, queue, inputPort); } }
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485