事件流量控制:实现事件处理的平稳化

在开发软件时,经常会遇到事件处理的问题。想象一下,如果有一个在线商店,每当一个订单(从现在起称为订单)到来时,就会触发一个事件。然后执行一些事件处理程序来更新最新订单列表(或所有订单)。如果另一个订单在更新过程开始后到达怎么办?更新可能需要很长时间,比如说2秒钟,在这段时间内可能会有多个订单到达。不能为每一个订单开始一个新的更新,最终需要一个同步系统,只允许一次进行一个更新。不断更新控件可能会导致应用程序冻结。会使用一个持久化系统,比如数据库,所以最好是进行更大的查询,而不是不断的非常小的查询。如果可以使用一个对象来平息这些事件流量,那会怎么样呢?

事件流量控制的好处

在详细讨论问题或解决方案之前,让先看看一些示例应用程序。在图1中,左侧是使用事件流量平息器的“平静客户端”,右侧是实时处理订单的“实时客户端”。在平静的应用中,可以看到并浏览订单,至少在它每5秒刷新一次之前。另一方面,实时应用是一个闪烁的应用,除非捕捉屏幕,否则甚至无法阅读。如果事件来得稍微快一点,它甚至可能会冻结。

商店

让快速定义一下将在本文的其余部分使用的领域。需要一个实体和一个存储库。

public class Order { public string Title { get; set; } public DateTime CreatedAt { get; set; } } public interface IOrderRepository { IEnumerable<Order> ListFrom(DateTime startDate); event EventHandler OrderArrived; }

从示例代码中,将引用一个名为_orders的订单存储库实例。

_orders.OrderArrived += HandleOrderArrived;

每当一个订单到达时,处理程序就会被执行。然后,处理程序查询自上次执行以来到达的订单,并使用这个列表来更新UI。示例代码显示了这组订单。

构建一个对象来解决问题

需要一个对象(平息器),它有两个成员:一个处理程序来连接想要平息的事件,以及它将触发的事件。

public class EventCalmer { public void HandleEvent(object sender, EventArgs e) { ... } public event EventHandler EventOccurred; }

需要将这个对象放在实际的事件生成器和代码之间。

_calmer = new EventCalmer(TimeSpan.FromSeconds(5)); ... _orders.OrderArrived += _calmer.HandleEvent; _calmer.EventOccurred += HandleOrderArrived;

有了这个设置,每当一个订单到达时,它只会在一段时间后触发事件,例如例子中的5秒。如果在5秒内到达了3个以上的订单,第二个将不会触发事件,但第三个会。平息器将确保在最后一个订单到达后触发事件一次,因此永远不会有未处理的订单。平息器将触发其事件的次数接近(+/-1)所有5秒执行的次数,这些执行将适合总执行时间。也就是说,总时间除以5秒。如果某个处理程序的执行时间超过5秒,平息器将在前一个完成时立即开始下一个执行。如果执行提前完成,它将睡眠剩余的时间。

平息器的工作方式

这真的改变了一切。标记为“事件接收”的转换是不言而喻的。“完成”将用于从执行中退出的转换。“完成”将用于从等待中退出的转换。由于它看起来非常像一个自动机,一个类似自动机的实现可能是一个解决方案。

状态

将创建一个枚举,为图2中的每个状态创建一个值。

private enum CalmerState { Idle, Executing, ExecutingOneInQueue, Waiting, WaitingOneInQueue, }

转换函数。定义这个转换函数的一个简单方法是使用两个静态字典,一个用于“事件接收”,另一个用于“完成”和“完成”。

平息器的处理程序和触发器

标准的事件处理和触发代码将确保事件到达平息器,并且从平息器到达商店系统。就在触发平息器的事件之前,需要在一个实例变量中存储它被触发的时间。

public void HandleEvent(object sender, EventArgs e) { EventReceived(); } public event EventHandler EventOccurred; private void OnEventOccurred(EventArgs e) { _lastExecutionBeganAt = DateTime.Now; if (EventOccurred != null) EventOccurred(this, e); }

启动平息器。所有的魔法都始于第一个事件被接收并且EventReceived()被执行。

private void EventReceived() { lock (_criticalSectionDoor) { var newState = GetStateToMoveToWhenEventArrives(); SetCurrentStateTo(newState); if (_currentState == CalmerState.Executing) BeginExecutionThread(); } } private CalmerState GetStateToMoveToWhenEventArrives() { return EventArrivedTransitions[_currentState]; } private void SetCurrentStateTo(CalmerState newState) { _currentState = newState; }

在锁定块内,通过第一个字典进行新状态的转换。稍后,如果新状态是“执行”,这意味着平息器应该从空闲状态移动出来,执行开始。BeginExecutionThread()方法并不有趣,它只是启动了一个异步调用另一个方法ExecuteWhileNotIddle()。那个是。

执行线程

执行线程执行步骤,只要状态不是空闲的。

private void ExecuteWhileNotIddle() { while (_currentState != CalmerState.Idle) { if (currentState == CalmerState.Executing || currentState == CalmerState.ExecutingOneInQueue) DoExecuteStep(); else DoWaitStep(); UpdateStateAfterDoneOrFinished(); } }

执行线程执行步骤,只要状态不是空闲的。有两种类型的步骤:执行或等待步骤。执行,触发事件。等待,睡眠。

private void DoExecuteStep() { OnEventOccurred(EventArgs.Empty); } private void DoWaitStep() { var stillNeedToWait = _sleepTime - TimeItTookLastExecution(); if (stillNeedToWait > TimeSpan.Zero) Thread.Sleep(stillNeedToWait); } private TimeSpan TimeItTookLastExecution() { return DateTime.Now - _lastExecutionBeganAt; }

执行触发事件。在生产中使用时,一些错误处理会很有用。目前,如果处理程序中出现错误,平息器将会中断并且变得不可用。请注意,事件触发是同步的,所以它会像实际的处理程序一样长。等待睡眠剩余的时间。假设希望平息器每5秒释放一次事件。如果实际的处理程序需要2秒,平息器将会睡眠3秒。如果实际的处理程序需要6秒,平息器将不会睡眠。

执行步骤后更新状态

在前面的步骤完成后,需要更新自动机的状态。这次将使用第二个字典。

private void UpdateStateAfterDoneOrFinished() { lock (_criticalSectionDoor) { var newState = GetStateToMoveToWhenDoneOrFinished(); SetCurrentStateTo(newState); } } private CalmerState GetStateToMoveToWhenDoneOrFinished() { return DoneOrFinishedTransitions[_currentState] }

平息器已经准备好了。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485