服務創(chuàng)造價值、存在造就未來
在本節(jié)中,我們將深入探討 Spring Modulith 如何通過事件機制實現(xiàn)模塊間的解耦通信,并重點關注如何確保事件發(fā)布的可靠性以及如何管理事件的整個生命周期,包括將事件外部化到消息中間件。
為了使應用程序模塊之間盡可能相互解耦,Spring Modulith 鼓勵使用事件發(fā)布和消費的方式進行通信。
OrderService發(fā)布事件
將 OrderService 修改為不再直接調用 NotificationFacadeService,而是通過 Spring 的 ApplicationEventPublisher 發(fā)布一個事件:
// OrderService.java @RequiredArgsConstructor @Service @Slf4j public class OrderService { private final ApplicationEventPublisher eventPublisher; @Transactional public void createOrder(Order order) { log.info("Order Module: Attempting to create order."); // 1. 持久化訂單 order.setNo(System.currentTimeMillis() + ""); log.info("Order Module: Order created successfully, order number: {}", order.getNo()); // 2. 發(fā)布 OrderCreated 事件 eventPublisher.publishEvent(new OrderCreated(order.getNo(), order.getItems().toString())); log.info("Order Module: OrderCreated event published for order number: {}", order.getNo()); // 補充日志,明確事件已發(fā)布 } }OrderCreated事件定義
// OrderCreated.java @AllArgsConstructor @NoArgsConstructor @Data public class OrderCreated { private String orderNo; private String orderItems; }NotificationFacadeService監(jiān)聽事件
NotificationFacadeService 現(xiàn)在將監(jiān)聽 OrderCreated 事件,而不是被直接調用。為了確保事件監(jiān)聽器與發(fā)布事務的隔離性(或稱一致性,但隔離更強調其獨立事務),我們使用 @TransactionalEventListener,并指定其傳播行為為 REQUIRES_NEW,這意味著監(jiān)聽器會在一個新的事務中執(zhí)行。
// NotificationFacadeService.java @Slf4j @Service public class NotificationFacadeService { @Async // 添加異步注解,提升性能 @Transactional(propagation = Propagation.REQUIRES_NEW) @TransactionalEventListener // 確保事件在發(fā)布事務提交后才被處理 public void on(OrderCreated orderCreated) { log.info("Notification Module (Facade): Received order created event: {}", orderCreated); // 模擬發(fā)送通知 log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo()); // 統(tǒng)一日志格式,避免 "send Order created message" 這種口語化表達 } }測試事件
@Test public void test_order_created(){ Order.Item item = new Order.Item("apple", 1, 500); Order order = new Order(); order.setItems(List.of(item)); orderService.createOrder(order); }測試結果
為了簡化通過事件集成模塊的聲明方式,Spring Modulith 提供了 @ApplicationModuleListener 作為快捷方式。它是一個復合注解,封裝了常用的事件監(jiān)聽行為(如異步執(zhí)行、新事務、事務性監(jiān)聽)。
// @ApplicationModuleListener 定義示例 (Spring Modulith 內部,無需在項目中創(chuàng)建) @Async @Transactional(propagation = Propagation.REQUIRES_NEW) @TransactionalEventListener @Documented @Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) public @interface ApplicationModuleListener { @AliasFor(annotation = Transactional.class, attribute = "readOnly") boolean readOnlyTransaction() default false; @AliasFor(annotation = EventListener.class, attribute = "id") String id() default ""; @AliasFor(annotation = EventListener.class, attribute = "condition") String condition() default ""; @AliasFor(annotation = Transactional.class, attribute = "propagation") Propagation propagation() default Propagation.REQUIRES_NEW; }通過使用 @ApplicationModuleListener,NotificationFacadeService 中的監(jiān)聽方法可以進一步簡化:
// NotificationFacadeService.java (使用 @ApplicationModuleListener 簡化后) @Slf4j @Service public class NotificationFacadeService { @ApplicationModuleListener // 簡化后的注解,默認異步、新事務、事務性監(jiān)聽 public void on(OrderCreated orderCreated) { log.info("Notification Module (Facade): Received order created event: {}", orderCreated); log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo()); } }在基于事件驅動的架構中,當一個模塊發(fā)布一個事件(例如,訂單模塊發(fā)布“訂單已創(chuàng)建”事件)后,這個事件需要被其他感興趣的監(jiān)聽器模塊可靠地接收并處理。然而,在實際應用中,事件的發(fā)布和處理過程可能面臨系統(tǒng)故障等問題,導致可靠性挑戰(zhàn)。
Spring Modulith 提供了強大的機制來管理這些事件的生命周期,確保事件的可靠發(fā)布和處理,并支持對事件記錄進行維護。
事件發(fā)布注冊表
Spring Modulith 內置了一個事件發(fā)布注冊表 (Event Publication Registry),它與 Spring Framework 的核心事件發(fā)布機制深度集成。當事件發(fā)布時,注冊表會識別將要接收事件的事務性事件監(jiān)聽器,并在原始業(yè)務事務中為每個監(jiān)聽器寫入一個條目到事件發(fā)布日志中。
The transactional event listener arrangement before execution
業(yè)務操作和事件記錄在同一個事務中提交,保證了要么都成功,要么都失敗。即使系統(tǒng)在事件成功寫入日志后、發(fā)送到監(jiān)聽器前崩潰,重啟后也能從日志中恢復并重試發(fā)送。
每個事務性事件監(jiān)聽器都會被一個切面(Aspect)包裹:如果監(jiān)聽器執(zhí)行成功,該切面會將對應的日志條目標記為已完成;如果監(jiān)聽器執(zhí)行失敗,日志條目則保持不變,這樣應用程序可以根據(jù)需要實現(xiàn)重試機制,確保事件最終被處理。
The transactional event listener arrangement after execution
事件發(fā)布持久化
為了實際寫入事件發(fā)布日志并利用事件發(fā)布注冊表,Spring Modulith 提供了 EventPublicationRepository SPI,并為常見的支持事務的持久化技術(如 JPA、JDBC 和 MongoDB)提供了開箱即用的實現(xiàn)。
為簡化集成過程,Spring Modulith 提供了針對不同持久化技術的 Starter POMs。您只需根據(jù)項目所使用的數(shù)據(jù)庫技術,添加對應的 Modulith 事件 Starter POM 即可。這些 Starter 通常會默認集成基于 Jackson 的 EventSerializer 實現(xiàn),用于將事件實例序列化為適合數(shù)據(jù)存儲的格式。
常用的持久化 Starter POMs 如下:
管理事件發(fā)布記錄
Spring Modulith 在應用運行期間,允許以多種方式管理事件發(fā)布記錄。未完成的事件發(fā)布(即還未被監(jiān)聽器成功處理的事件)可能需要在一段時間后重新提交給對應的監(jiān)聽器;而已完成的事件發(fā)布則通常需要從數(shù)據(jù)庫中清理或歸檔。由于這些“清理”需求因應用而異,Spring Modulith 提供了 API 來分別處理這兩類事件發(fā)布。
你可以通過添加 spring-modulith-events-api 依賴來獲得這些 API:
<dependency> <groupId>org.springframework.modulith</groupId> <artifactId>spring-modulith-events-api</artifactId> <version>1.3.5</version> </dependency>該依賴提供了兩個主要接口,作為 Spring Bean 可用:
CompletedEventPublications: 用于訪問所有已完成的事件發(fā)布,并提供 API 立即清除所有已完成的事件,或清除早于指定時間的已完成事件。IncompleteEventPublications: 用于訪問所有未完成的事件發(fā)布,可以按條件或按發(fā)布時間重新提交這些事件。這樣,你可以靈活地管理事件發(fā)布的生命周期,確保事件可靠發(fā)布和處理。
事件完成模式 (Event Publication Completion)
當事務性執(zhí)行或 @ApplicationModuleListener 執(zhí)行成功完成時,事件發(fā)布將被標記為已完成。默認情況下,通過在 EventPublication 上設置完成日期來記錄完成狀態(tài)(UPDATE 模式)。這意味著已完成的發(fā)布將保留在事件發(fā)布注冊表中,以便可以通過上述的 CompletedEventPublications 接口進行檢查。這樣做的一個后果是,你需要編寫一些代碼,定期清除舊的、已完成的 EventPublication。否則,它們的持久化存儲(例如關系數(shù)據(jù)庫表)將無限制地增長,并可能影響性能。
Spring Modulith 1.3 引入了一個配置屬性 spring.modulith.events.completion-mode,以支持兩種額外的完成模式。它默認為 UPDATE,采用上述策略。
DELETE 模式: 將 completion-mode 設置為 DELETE 時,注冊表的持久化機制會改為在完成時直接刪除 EventPublication 記錄。這意味著 CompletedEventPublications 將不再返回任何發(fā)布,但同時,你也不必再擔心手動從持久化存儲中清除已完成的事件。ARCHIVE 模式: 這是一個在 Spring Modulith 1.3 中引入的第三種選項,它會將事件發(fā)布記錄復制到存檔表、集合或節(jié)點中。對于該存檔條目,將設置完成日期并刪除原始條目。與 DELETE 模式不同,已完成的事件發(fā)布仍可通過 CompletedEventPublications 抽象進行訪問。事件序列化 (Event Serializer)
每條日志記錄都包含原始事件的序列化內容。EventSerializer 抽象(位于 spring-modulith-events-core 模塊中)允許你自定義事件對象如何被轉換為適合存儲的格式。Spring Modulith 默認提供了基于 Jackson 的 JSON 實現(xiàn)(spring-modulith-events-jackson 組件),它會自動注冊一個 JacksonEventSerializer,并通過 Spring Boot 的自動配置機制使用 ObjectMapper。
自定義事件發(fā)布日期 (Customizing the Event Publication Date)
默認情況下,事件發(fā)布注冊表會使用 Clock.systemUTC() 返回的日期作為事件的發(fā)布時間。如果你想自定義這個時間,只需在應用上下文中注冊一個類型為 Clock 的 Bean:
@Configuration class MyConfiguration { @Bean Clock myCustomClock() { // Your custom Clock instance created here. return Clock.fixed(Instant.now(), ZoneId.systemDefault()); // 示例:固定時間 } }添加依賴
<dependency> <groupId>org.springframework.modulith</groupId> <artifactId>spring-modulith-starter-jpa</artifactId> </dependency>配置application.properties
# Hibernate 配置 spring.jpa.hibernate.ddl-auto=update spring.jpa.show-sql=true # 事件完成刪除日志 spring.modulith.events.completion-mode=DELETE遠程單元測試
測試監(jiān)聽器運行失敗
修改 NotificationFacadeService 的 on 方法,拋出一個 RuntimeException 以模擬失敗場景:
@Slf4j @Service public class NotificationFacadeService { @Async @Transactional(propagation = Propagation.REQUIRES_NEW) @TransactionalEventListener public void on(OrderCreated orderCreated) { log.info("Notification Module (Facade): Received order created event: {}", orderCreated); // 模擬發(fā)送通知 log.info("Notification Module (Facade): Sending notification for order number: {}", orderCreated.getOrderNo()); throw new RuntimeException("模擬發(fā)送通知失敗"); // 模擬異常 } }運行測試
可知,事件監(jiān)聽器消費失敗,日志記錄未被刪除,仍然保留在數(shù)據(jù)庫中。
有些在應用模塊間交換的事件,可能對外部系統(tǒng)也很有價值。Spring Modulith 支持將選定的事件發(fā)布到多種消息中間件。要啟用該功能,請按照以下步驟操作:
添加依賴: 添加與所用消息中間件對應的 Spring Modulith 依賴到你的項目中。標記事件: 通過為需要外部化的事件類型添加 Spring Modulith 或 jMolecules 的 @Externalized 注解,標記這些事件。指定目標: 在注解的 value 屬性中指定消息中間件的路由目標(例如,Kafka 的 Topic 名稱、AMQP 的 Routing Key)。這樣配置后,Spring Modulith 就會自動將這些事件發(fā)布到指定的消息中間件,實現(xiàn)與外部系統(tǒng)的集成。
這塊內容暫時不做過多介紹,感興趣的讀者可以參考官方文檔中的相關章節(jié)。
感謝您的閱讀!希望這部分內容對您有所啟發(fā)。如果您覺得有價值,請不吝點贊支持,并在評論區(qū)留下您的想法,一起交流學習!別忘了關注我,獲取后續(xù)更精彩的教程內容。您的每一個肯定和互動,都是我持續(xù)分享知識的動力!