在软件开发中,经常需要实现一种轮询机制来监听数据库中的变化或新条目,并在它们出现时执行某些操作。轮询器可以从简单的while循环加Thread.Sleep到复杂的异步观察者模式和SqlDependency代码。最近,一直在与Entity Framework合作,需要轮询数据库中的特定状态的记录。因此,决定编写一个通用的轮询器,它易于使用,允许开发者指定一个lambda表达式来异步查询数据库。
要使用附带的EntityPoller
C#
var poller = new EntityPoller<Notification>(context, q => q.StatusId == (int)Status.New);
poller.ActiveCycle = 200; // 毫秒
poller.IdleCycle = 10000; // 10秒
poller.RetryAttempts = 5;
poller.RetryInitialInterval = 10000; // 10秒
poller.EntityReceived += OnNotification;
poller.Error += OnError;
poller.Start();
这段代码创建了一个轮询器实例,用于查询具有特定StatusId的Notification实体。轮询器的ActiveCycle和IdleCycle属性决定了轮询的频率,RetryAttempts和RetryInitialInterval属性则定义了重试策略。
轮询器的构造函数接受两个参数:第一个是包含实体集(DbSet<>)的DbContext,第二个是返回布尔值的lambda表达式。
public sealed class EntityPoller<T> where T : class {
private readonly DbContext _context;
private readonly Func<T, bool> _query;
private readonly object _sync = new object();
private bool _polling;
public EntityPoller(DbContext context, Func<T, bool> query) {
_context = context;
_query = query;
IdleCycle = 10000;
ActiveCycle = 1000;
RetryAttempts = 5;
RetryInitialInterval = 10000;
}
public int IdleCycle { get; set; }
public int ActiveCycle { get; set; }
public int RetryAttempts { get; set; }
public int RetryInitialInterval { get; set; }
}
轮询方法Poll()包含一个主循环,该循环会持续运行,直到返回一个实体。每个循环周期都有一个暂停(Thread.Sleep),暂停的时长取决于定义的查询结果。如果查询返回一个实体,则使用ActiveCycle,否则使用IdleCycle。这是为了以受控的方式从对象中发出实体通知。
private T Poll() {
var set = _context.Set<T>();
T entity = null;
try {
while (_polling) {
entity = Retry(() => set.FirstOrDefault(_query), RetryAttempts, RetryInitialInterval);
if (entity != null) break;
Thread.Sleep(IdleCycle);
}
Thread.Sleep(ActiveCycle);
} catch (Exception ex) {
Stop();
if (Error != null) Error.Invoke(ex);
}
return entity;
}
Poll方法通过使用AsyncWorker委托和AsyncOperation来调用。本文不会涵盖异步代码,但源代码中包含了所有内容。
如上代码片段所示,查询Func
private static T Retry(Func<T> action, int attempts = 5, int initialInterval = 10000) {
if (action == null) throw new ArgumentNullException("action");
for (int i = 1; i <= attempts; i++) {
try {
T result = action.Invoke();
return result;
} catch (Exception) {
if (i >= attempts) throw;
Thread.Sleep(initialInterval);
initialInterval *= 5;
}
}
return null;
}