管理复杂的业务事务
并不是每个命令都能够在单个ACID事务中完全执行。银行转帐是一个很常见的例子,常常作为他们的论据。人们经常认为,将资金从一个账户转移到另一个账户的交易绝对需要原子性和一致性。其实呢,不是的这样的。相反,这是不可能的。如果钱从A银行的账户转移到B银行的另一个账户? A银行是否须要锁定B银行数据库?如果转账正在进行中,银行A已经扣除了这笔款项,但银行B还没有存入该钱,这有点奇怪?事实上并不是,他只是表示这是“正在进行”状态。另一方面,如果在将钱存入银行账户的过程中出现问题,则银行A的客户会要求退款。所以我们最终希望得到某种形式的一致性。
虽然ACID事务在某些情况下不是必要的,甚至是不可能的,但是有些情况下他是任然须要的。通常,这些事务被称为BASE事务:基本可用性,软性状态,最终一致性。与ACID相反,BASE事务不能轻易回滚。为了回滚,需要采取补偿行动来恢复作为交易一部分发生的任何事情。在汇款的例子中,B银行存款失败,将退还A银行的款项。
在CQRS中,Sagas可以用来管理这些BASE事务。他们对事件作出响应,并可能调度命令,调用外部应用程序等。在域驱动设计的背景下,Sagas被用作几个有界上下文之间的协调机制中经常使用。
Saga
Saga 是一个特殊类型的事件监听器:用来管理业务事物。一些交易可能会持续几天甚至几周,而另外一些则可能在几毫秒内完成。在Axon中,Saga 的每个实例都负责管理单个业务事务。这意味着Saga 必须保持必要的状态来管理这个交易,继续或者采取补偿行动来回滚已经采取的任何行动。通常情况下,与常规事件监听器相反,Saga 的起点和终点均由事件触发。虽然Saga 的开始点通常很清楚,但Saga 可以有很多方法结束。
在Axon中,Sagas是定义一个或多个@SagaEventHandler方法的类。与常规事件处理程序不同,saga的多个实例可能随时存在。saga是由一个处理器(追踪或订阅),这是专门处理该特定的saga类型的事件管理。
生命周期
单个Saga实例负责管理单个事务。这意味着您需要能够明确的声明出saga的生命周期的开始和结束。
在saga中,事件处理程序是用@SagaEventHandler注解的。如果一个特定的事件表示一个事务的开始,添加另一个注解到相同的方法:@StartSaga。这个注解将创建一个新的saga,他会对发布的事件进行匹配,如果找到了对应的事件该event handler方法就会被调用。
默认情况下,只有在当前没有合适的saga(相同类型)可以找到才会创建一个新的saga。您还可以通过将@StartSaga注解中的forceNew属性设置为true来强制创建新的Saga实例。
结束Saga 可以通过两种方式完成。如果某个事件总是表示Saga 的生命周期结束,那么用@EndSaga注解该saga中的事件处理器。Saga 的生命周期将在处理程序的调用结束后结束。或者,您可以从saga内部调用SagaLifecycle.end()来结束生命周期。这可以让你有条件地结束saga。
事件处理
事件处理在一个saga中和常规的事件监听器很类似。方法和参数解析的规则在这里是有效的。但是他们有一个主要的区别。Event Listener只有一个实例来处理所有的事件,而saga会有多个实例,每个实例对不同的事件感兴趣。例如,关于Order的id为1的业务Saga对Order Id为“2”的事件不感兴趣,反之亦然。
Axon不会将所有事件发布到所有Saga实例(这将完全浪费资源),而只会发布包含与Saga关联的属性的Events。这是使用AssociationValues完成的。 AssociationValue由一个键和一个值组成。键表示所用标识符的类型,例如“orderId”或“order”。该值表示相应的值,在前面的示例中为“1”或“2”。
评估带@SagaEventHandler注解方法的顺序与带@EventHandler方法的顺序相同(请参阅注解事件处理程序)。如果处理程序方法的参数与传入的事件匹配,并且该事件与处理程序方法中定义的属性关联,则方法匹配。
@SagaEventHandler注解具有两个属性,其中associationProperty是最重要的属性。这是传入事件上property的名称,应该用来寻找相关的saga。association值的key是property的名称。这个值是由property的getter方法返回的值。
例如,考虑一个带”String getOrderId()”方法传入事件,返回“123”。如果一个带@SagaEventHandler(associationProperty = orderId)注解的方法接受这个事件,这个事件被路由到所有已经与带一个键为orderId和值为“123”的AssociationValues关联的saga。这可能是一个,多个,甚至没有。
有时,您想要关联的属性的名称不是要使用的关联的名称。例如,你有一个销售订单相匹配购买订单的saga。你可以有一个包含“buyOrderId”和“sellOrderId”的事务对象。如果你想要的saga将“orderId”作为关联的值,则可以在@SagaEventHandler批注中定义不同的keyName。它会变成@SagaEventHandler(associationProperty =“sellOrderId”,keyName =“orderId”)
管理关联
当一个Saga管理跨多个领域概念时,如订单,发货,发票等交易,saga需要与这些領域的实例相关联。关联需要两个参数:标识关联类型(Order,Shipment等)的键和表示该領域标识符的值。
将saga与一个領域概念联系起来有几种方法。首先,在调用@StartSaga带注解的事件处理程序时新创建了一个Saga,它会自动关联到@SagaEventHandler方法中标识的属性。任何其他关联都可以使用SagaLifecycle.associateWith(String key,String / Number value)方法创建。使用SagaLifecycle.removeAssociationWith(String key,String / Number value)方法删除他们之间的关联。
想象一下,为一个订单的交易创建了一个saga。saga会自动关联对应订单,因为该方法是用了@StartSaga注解。saga负责为该订单创建发票,并告诉发货人为其创建货件(Shipment )。一旦货件到达,发票已经支付,交易完成,saga关闭。
想象一下,为一个订单的交易创建了一个saga。saga会自动关联对应订单,因为该方法是用了@StartSaga注解。saga负责为该订单创建Invoice(发票) ,并告诉发货人为其创建运输(Shipment )。一旦货物送达完,Invoice 已经支付,交易完成,saga关闭。
这里有一个saga的例子代码:
public class OrderManagementSaga {
private boolean paid = false;
private boolean delivered = false;
@Inject
private transient CommandGateway commandGateway;
@StartSaga
@SagaEventHandler(associationProperty = "orderId")
public void handle(OrderCreatedEvent event) {
// client generated identifiers
ShippingId shipmentId = createShipmentId();
InvoiceId invoiceId = createInvoiceId();
// associate the Saga with these values, before sending the commands
SagaLifecycle.associateWith("shipmentId", shipmentId);
SagaLifecycle.associateWith("invoiceId", invoiceId);
// send the commands
commandGateway.send(new PrepareShippingCommand(...));
commandGateway.send(new CreateInvoiceCommand(...));
}
@SagaEventHandler(associationProperty = "shipmentId")
public void handle(ShippingArrivedEvent event) {
delivered = true;
if (paid) { SagaLifecycle.end(); }
}
@SagaEventHandler(associationProperty = "invoiceId")
public void handle(InvoicePaidEvent event) {
paid = true;
if (delivered) { SagaLifecycle.end(); }
}
// ...
}
通过客户端生成一个标识符,一个saga可以很容易地与一个領域概念相关联,而不需要一个请求 - 响应类型的命令。在发布命令之前,我们将事件与这些領域概念联系起来。这样,我们保证也捕获作为这个命令的一部分生成的事件。一旦支付了订单并且货物已经到达,这将结束这个saga。
跟踪saga的截止日期
当事情发生的时候,很容易让saga作出响应。毕竟,有一个事件通知saga。但是如果你想让你的saga在没有任何事情发生时做点什么呢?这时最后期限就派上用场了。在发票这个領域,他的持继续时间通常是几个星期,而信用卡支付的确认应在几秒钟内发生。
在Axon中,您可以使用EventScheduler来預約发布事件。在发票的例子中,你会希望在30天内支付发票。发送CreateInvoiceCommand后,saga会安排一个InvoicePaymentDeadlineExpiredEvent事件在第30天截至时间发布。 EventScheduler在預約发布事件后会返回一个ScheduleToken。该令牌可用于取消计划,例如收到发票付款时。
Axon提供了两个EventScheduler实现:一个纯Java,一个使用Quartz 2作为后台调度机制。
EventScheduler是纯Java实现的,他使用ScheduledExecutorService来调度事件发布。虽然这个调度程序的时间是非常可靠的,但它是一个纯粹的在内存中实现。一旦JVM关闭,所有的日程安排都将丢失。这使得这种实现不适合长期计划。
QuartzEventScheduler是一个更可靠的企业级的实现。使用Quartz作为基本的调度机制,它提供了更强大的功能,如持久性,集群和失败管理。这意味着事件发布是有保证的。可能会晚一点,但最终会发布。
它需要配置一个Quartz Scheduler和一个EventBus。或者,您可以设置Quartz作业调度的组名,默认为“AxonFramework-Events”。
一个或多个组件将监听预定的事件。这些组件可能依赖于绑定到调用它们的线程的事务。计划事件由EventScheduler管理的线程发布。要管理这些线程上的事务,可以配置一个TransactionManager或一个UnitOfWorkFactory来创建一个事务绑定的工作单元。
注意:
Spring用户可以使用QuartzEventSchedulerFactoryBean或SimpleEventSchedulerFactoryBean来方便你配置他们。它允许你直接设置PlatformTransactionManager。
注入资源
saga一般做的不仅仅是维护基于事件的状态。它们与外部的组件进行交互。要做到这一点,他们需要访问处理组件所需的资源。通常,这些资源实际上并不是saga状态的一部分,也不应该持久化。但是,一旦重新构建一个saga,在事件被路由到该实例之前,必须注入这些资源。
我们可以用ResourceInjector来达到这个目的。