异步轮询器实现指南

在软件开发中,经常需要实现一种轮询机制来监听数据库中的变化或新条目,并在它们出现时执行某些操作。轮询器可以从简单的while循环加Thread.Sleep到复杂的异步观察者模式和SqlDependency代码。最近,一直在与Entity Framework合作,需要轮询数据库中的特定状态的记录。因此,决定编写一个通用的轮询器,它易于使用,允许开发者指定一个lambda表达式来异步查询数据库。

使用指南

要使用附带的EntityPoller,只需编写以下代码:

C# var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int)Status.New); poller.ActiveCycle = 200; // 毫秒 poller.IdleCycle = 10000; // 10秒 poller.RetryAttempts = 5; poller.RetryInitialInterval = 10000; // 10秒 poller.EntityReceived += OnNotification; poller.Error += OnError; poller.Start();

这段代码创建了一个轮询器实例,用于查询具有特定StatusId的Notification实体。轮询器的ActiveCycle和IdleCycle属性决定了轮询的频率,RetryAttempts和RetryInitialInterval属性则定义了重试策略。

代码解析

轮询器的构造函数接受两个参数:第一个是包含实体集(DbSet<>)的DbContext,第二个是返回布尔值的lambda表达式。

public sealed class EntityPoller<T> where T : class { private readonly DbContext _context; private readonly Func<T, bool> _query; private readonly object _sync = new object(); private bool _polling; public EntityPoller(DbContext context, Func<T, bool> query) { _context = context; _query = query; IdleCycle = 10000; ActiveCycle = 1000; RetryAttempts = 5; RetryInitialInterval = 10000; } public int IdleCycle { get; set; } public int ActiveCycle { get; set; } public int RetryAttempts { get; set; } public int RetryInitialInterval { get; set; } }

轮询方法Poll()包含一个主循环,该循环会持续运行,直到返回一个实体。每个循环周期都有一个暂停(Thread.Sleep),暂停的时长取决于定义的查询结果。如果查询返回一个实体,则使用ActiveCycle,否则使用IdleCycle。这是为了以受控的方式从对象中发出实体通知。

private T Poll() { var set = _context.Set<T>(); T entity = null; try { while (_polling) { entity = Retry(() => set.FirstOrDefault(_query), RetryAttempts, RetryInitialInterval); if (entity != null) break; Thread.Sleep(IdleCycle); } Thread.Sleep(ActiveCycle); } catch (Exception ex) { Stop(); if (Error != null) Error.Invoke(ex); } return entity; }

Poll方法通过使用AsyncWorker委托和AsyncOperation来调用。本文不会涵盖异步代码,但源代码中包含了所有内容。

如上代码片段所示,查询Func被封装在Retry方法中。Retry模式将运行查询RetryAttempts次,从RetryInitialInterval毫秒开始,并以5的幂次方增加这个间隔。这是默认的逻辑,但可能希望根据需要调整这个逻辑。

private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000) { if (action == null) throw new ArgumentNullException("action"); for (int i = 1; i <= attempts; i++) { try { T result = action.Invoke(); return result; } catch (Exception) { if (i >= attempts) throw; Thread.Sleep(initialInterval); initialInterval *= 5; } } return null; }
沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485