事件溯源:金融应用中的事件驱动架构

事件溯源是一种记录对象在其生命周期中发生的事件,而不是存储对象某一时刻的状态的技术。这种技术在金融应用中特别有用,因为任何账户余额通常是导致该余额的事件的结果,希望既能看到数字的底层推导过程,也能看到在给定时间点的估值。

事件溯源允许通过过滤事件或在替代流中播放来进行“如果”分析。

简单银行账户示例

银行账户可能是最简单的金融流,仅由单一货币的存款和取款组成。在这个模型中,事件就是调整余额的金额——存款是正数,取款是负数。

[ - - +50 - - +50 - - -100 - ->]

在这个例子中,只有在实际发生存款或取款时,估值才会变化——所以通过播放事件流到结尾,可以看到当前余额现在是0。

股份持有账户示例

对于股份持有账户,有两个流。有一个位置流记录股份的购买或销售,还有一个定价流记录底层股份的价格。

[ - - +50 - - +50 - - -100 - ->] [178.1 - - - - 178.0 - - - - 178.9 - - >]

这些可以合并成一个估值流,即当前持有量乘以最新价格的组合。持有事件和/或定价事件都会触发估值。

[ 0 - - 8905.00 - - 17810.00 - - 17800.00 - - 0.00 - - 0.00 ]

非基础货币持有示例

如果股份持有是以非基础货币持有的,那么持有货币和账户基础货币之间的汇率变化也会影响估值。

[ - - +50- - +50- - -100- -> ] [178.1 - - - - 178.0 - - - - 178.9 -> ] [1.1- - - 1.2 - - - 1.3 - - - 1.4 -> ]

对于任何事件,需要知道——它何时发生,它在哪个流中,它的价值是多少。从上述例子中,也可以看到,价值可以是绝对金额(例如,价格或汇率),也可以是变化(或增量)金额。

对于流是增量金额的事件流,写入当前值的定期快照可能会很有帮助。这允许通过转到该点之前的快照,然后播放该快照时间之后的任何增量事件,来播放流到任何给定点。

派生事件

流可以是源数据,也可以是通过一个或多个源流的函数派生的。对于派生事件,使用函数从其组成流中计算值,并且对于这些流中的任何事件都会触发该函数。

复式记账

在复式记账中,持有事件记录在两个(或更多)流中——一个代表事件影响的每个账本账户(最常见的是,一个用于资产,一个用于负债)。传统上,这是通过在输入流上使用发布规则来发布到相关的两个(或更多)账户流来完成的。

快照

为了将读取访问时间保持在最低限度,这种架构应该支持读取和创建快照。这些快照可以被视为数据传输对象,用于将它们集成到基于它们的任何系统(如MVC应用程序)中。

定义所需操作的接口可能如下所示:

C# /// <summary> /// Repository to write to objects that are held in snapshot form /// </summary> /// <typeparam name="TEntity"> /// The type of entity in the repository /// </typeparam> /// <typeparam name="TKey"> /// The type of the key to uniquely identify the entity /// </typeparam> /// <remarks> /// This does not inherit from IRepositoryWrite /// because it is possible that some background task will be /// creating the snapshots independent of front-end write requests /// </remarks> public interface IRepositoryWriteSnapshot<TKey, TEntity> where TEntity : IKeyedEntity<TKey> { /// <summary> /// Request that a snapshot as-of-now be taken for the given entity aggregate /// </summary> /// <param name="key"> /// The unique identifier of the entity that we want snapshotted /// </param> /// <remarks> /// This does not return any value as it is designed to /// operate asynchronously by setting a "needs snapshot" flag to prevent /// the snapshot generator system being flooded /// </remarks> void RequestSnapshot(TKey key); /// <summary> /// Delete and regenerate any snapshots post the as-of synchronization value /// </summary> /// <param name="key"> /// The unique identifier of the entity that we want snapshotted /// </param> /// <param name="synchronization"> /// The synchronization point for which we want the data regenerated /// </param> void RegenerateFromAsOf(TKey key, long synchronisation); } And the read side can be implemented as an extension to the usual repository pattern: C# /// <summary> /// Repository to read objects that are held in snapshot form /// </summary> /// <typeparam name="TEntity"> /// The key-identified type of entity we are reading /// </typeparam> /// <typeparam name="TKey"> /// The type of the key /// </typeparam> /// <remarks> /// Where an object is based on one or more event streams, /// this allows for retrieval of the state of that object as at a given /// snapshot time. This extends repository-read /// </remarks> public interface IRepositoryReadSnaphot<TKey, TEntity> : IRepositoryRead<TKey, TEntity> where TEntity : IKeyedEntity<TKey> { /// <summary> /// Did any record matching the key exist as at the given point in time /// </summary> /// <param name="key"> /// The unique aggregate identifier of the record we are seeking /// </param> /// <returns> /// True if a matching record is found /// </returns> bool ExistedAsOf(TKey key, long synchronisation); /// <summary> /// Get the entity state as it was at the given point in time /// </summary> /// <param name="key"> /// The aggregate identifier of the record for which we want a point-in-time view /// </param> /// <param name="synchronisation"> /// The synchronisation point for which we want the data /// </param> /// <returns> </returns> Nullable<TEntity> GetByKeyAsOf(TKey key, long synchronisation); }

在实践中,审计或法律要求可能需要保留旧的快照(如果它们被外部发送)。需要一个版本控制机制,以确保任何显示(UI或打印)使用最新版本的快照。

要点

这种类型的架构需要考虑的最重要的事情之一是幂等性——使系统对错误免疫,如果一个事件运行了多次。在实践中,这通常意味着用执行绝对操作的消息替换执行增量操作的消息(例如,“将账户减少50美元”)。

然而,与其人为地制造绝对事件,不如深入挖掘,直到得到固有的绝对事件。在上面的例子中,有绝对的存款和取款事件比有一个“将余额设置为x”的派生事件更可取。

沪ICP备2024098111号-1
上海秋旦网络科技中心:上海市奉贤区金大公路8218号1幢 联系电话:17898875485