事件溯源(Event Sourcing)系统在生产中的审计和调试是一大痛点。传统的做法是离线分析事件日志,或者构建专门的查询视图,整个过程被动且滞后。如果事件日志本身能像数据库一样被实时、交互式地查询、甚至注入补偿事件,并且所有操作都具备事务原子性,那么系统的可观测性和可操作性将得到质的提升。这里的挑战在于,如何将一个为高吞吐、低延迟设计的后端系统,同一个为探索性分析设计的交互式环境(如Jupyter)安全、高效地结合起来。
我们的目标是构建一个交互式事件溯源内核。这个内核使用Scala和Akka Persistence构建,确保事件持久化具备ACID特性;同时,它暴露一个接口,允许通过一个定制的Jupyter(Almond)环境对其进行命令注入、状态查询和事件回溯;最终,系统的状态变更会实时投射到一个由Jotai驱动的React前端。
技术选型决策
核心后端: Scala + Akka Persistence
- 原因: Scala的强类型系统和函数式特性非常适合构建领域模型。Akka的Actor模型天然地契合事件溯源中聚合根(Aggregate)的概念,每个Actor实例可以代表一个业务实体。Akka Persistence提供了事件溯源和CQRS所需的核心抽象,将业务逻辑与底层存储解耦。
事件存储与ACID保证: PostgreSQL (via
akka-persistence-jdbc)- 原因: 虽然有专门的事件存储数据库,但在真实项目中,使用成熟的关系型数据库如PostgreSQL作为事件日志(Journal)的后端是一个务实的选择。
akka-persistence-jdbc插件能将事件写入操作映射为数据库事务,从而直接利用PostgreSQL成熟的ACID能力,确保每个事件的写入都是原子性的、一致的、隔离的、持久的。这是整个系统稳定性的基石。
- 原因: 虽然有专门的事件存储数据库,但在真实项目中,使用成熟的关系型数据库如PostgreSQL作为事件日志(Journal)的后端是一个务实的选择。
交互式控制平面: Jupyter + Almond Kernel
- 原因: Almond为Jupyter提供了强大的Scala内核。相比于构建一个完整的GUI或CLI,Jupyter Notebook提供了一个无与伦比的环境,它融合了代码、文档和执行结果。我们可以为内核注入自定义的客户端库,使得运维或开发人员能用几行Scala代码就与后端的Actor进行交互,执行诊断或修复操作。
实时状态投射前端: React + Jotai + WebSockets
- 原因: 为了展示系统的实时状态,需要一个响应式前端。React是标准选择。Jotai作为一个原子化的状态管理库,其API简洁且易于集成。当后端状态变化时,通过WebSocket推送的细粒度更新可以直接作用于对应的atom,避免了复杂的reducer和样板代码。
架构概览
整个系统的交互流程可以用下面的图表来描述。
sequenceDiagram
participant Notebook as Jupyter (Almond)
participant KernelGateway as Scala Kernel Gateway
participant AccountActor as Akka Persistent Actor
participant Postgres as PostgreSQL (Journal)
participant Projection as Akka Projection
participant Frontend as React/Jotai UI
Notebook->>+KernelGateway: sendCommand("acct-1", Deposit(100))
KernelGateway->>+AccountActor: Forward(Deposit(100))
AccountActor->>+Postgres: Persist(Deposited(100))
Note over Postgres: Transaction (ACID)
Postgres-->>-AccountActor: Persist OK
AccountActor->>AccountActor: Update internal state
AccountActor-->>-KernelGateway: CommandProcessed
KernelGateway-->>-Notebook: "Success"
Postgres->>+Projection: Read event stream (Deposited(100))
Projection->>Projection: Update read model (e.g., in-memory map)
Projection->>+Frontend: WebSocket.send({ "acct-1", balance: 100 })
Frontend->>Frontend: Update Jotai atom
步骤化实现
1. 定义核心Actor与事件日志 (ACID核心)
首先,我们需要定义核心的业务实体。以一个银行账户Account为例,它有存款(Deposit)和取款(Withdraw)两种操作。
a. 定义命令、事件和状态
// src/main/scala/com/example/core/Account.scala
package com.example.core
// Commands (Actions to be performed)
sealed trait AccountCommand
case class Deposit(amount: BigDecimal) extends AccountCommand
case class Withdraw(amount: BigDecimal) extends AccountCommand
case object GetState extends AccountCommand
// Events (Facts that have happened)
sealed trait AccountEvent
case class Deposited(amount: BigDecimal) extends AccountEvent
case class Withdrawn(amount: BigDecimal) extends AccountEvent
// State
case class AccountState(balance: BigDecimal = 0) {
def updated(event: AccountEvent): AccountState = event match {
case Deposited(amount) => copy(balance = balance + amount)
case Withdrawn(amount) => copy(balance = balance - amount)
}
}
b. 实现Persistent Actor
这是业务逻辑的核心。Actor接收命令,验证后生成事件,然后持久化事件。persist方法的调用会被akka-persistence-jdbc插件包裹在一个数据库事务中。
// src/main/scala/com/example/core/AccountActor.scala
package com.example.core
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, RetentionCriteria}
import scala.concurrent.duration._
object AccountActor {
def apply(persistenceId: PersistenceId): Behavior[AccountCommand] = {
EventSourcedBehavior[AccountCommand, AccountEvent, AccountState](
persistenceId = persistenceId,
emptyState = AccountState(),
commandHandler = commandHandler,
eventHandler = eventHandler
)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100, keepNSnapshots = 2))
.onPersistFailure(SupervisorStrategy.restartWithBackoff(minBackoff = 200.millis, maxBackoff = 5.seconds, randomFactor = 0.1))
}
private val commandHandler: (AccountState, AccountCommand) => Effect[AccountEvent, AccountState] = {
(state, command) =>
command match {
case Deposit(amount) =>
if (amount <= 0) {
// 在真实项目中,这里应该回复一个具体的失败类型
Effect.none.thenRun(println(s"Deposit amount must be positive. Refusing ${amount}"))
} else {
Effect.persist(Deposited(amount))
}
case Withdraw(amount) =>
if (amount <= 0 || state.balance < amount) {
// 记录一次失败的尝试,但不持久化事件
println(s"Withdrawal failed: insufficient funds or invalid amount. Balance: ${state.balance}, tried to withdraw: ${amount}")
Effect.none
} else {
Effect.persist(Withdrawn(amount))
}
case GetState =>
// 仅用于调试,实际中会通过Read-Side查询
println(s"Current state for ${this.getClass.getSimpleName}: ${state}")
Effect.none
}
}
private val eventHandler: (AccountState, AccountEvent) => AccountState = {
(state, event) => state.updated(event)
}
}
c. 配置akka-persistence-jdbc
这是确保ACID的关键。在application.conf中,我们配置JDBC插件使用PostgreSQL。
# src/main/resources/application.conf
akka {
actor {
provider = "local"
}
persistence {
journal {
// The journal plugin for akka-persistence-jdbc
plugin = "akka.persistence.journal.jdbc"
}
snapshot-store {
// The snapshot-store plugin for akka-persistence-jdbc
plugin = "akka.persistence.snapshot.store.jdbc"
}
}
}
// JDBC Journal settings
akka-persistence-jdbc {
shared-databases {
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost:5432/events_db"
user = "user"
password = "password"
maxConnections = 5
numThreads = 5
}
}
}
}
你需要一个PostgreSQL实例,并使用akka-persistence-jdbc提供的SQL脚本创建journal和snapshot表。
2. 构建Jupyter交互网关
我们不会让Jupyter直接访问ActorSystem,而是通过一个类型安全的网关服务。
// src/main/scala/com/example/gateway/JupyterGateway.scala
package com.example.gateway
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import com.example.core.{AccountActor, AccountCommand, Deposit, Withdraw}
import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
class JupyterGateway(implicit system: ActorSystem[_]) {
implicit val ec: ExecutionContext = system.executionContext
implicit val timeout: Timeout = Timeout(3.seconds)
private val sharding = ClusterSharding(system)
// 定义实体类型键
val TypeKey = EntityTypeKey[AccountCommand]("Account")
// 在系统启动时初始化分片
sharding.init(Entity(TypeKey) { entityContext =>
AccountActor(PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
})
private def getEntityRef(entityId: String): ActorRef[AccountCommand] = {
sharding.entityRefFor(TypeKey, entityId)
}
// 定义一个统一的响应类型
sealed trait GatewayResponse
case object Ack extends GatewayResponse
case class Failure(reason: String) extends GatewayResponse
// 这里的Ask模式在真实项目中需要更复杂的错误处理
def deposit(accountId: String, amount: BigDecimal): Future[GatewayResponse] = {
val entityRef = getEntityRef(accountId)
// Akka的Ask模式返回Future,我们需要将其映射为我们自己的响应类型
// 在真实项目中,命令处理后Actor应该回复一个消息
// 为简化,这里我们假设持久化成功即操作成功
// 更好的做法是让Actor回复一个`DepositSuccessful`或`DepositFailed`
entityRef ! Deposit(amount)
Future.successful(Ack) // 简化处理
}
def withdraw(accountId: String, amount: BigDecimal): Future[GatewayResponse] = {
val entityRef = getEntityRef(accountId)
entityRef ! Withdraw(amount)
Future.successful(Ack) // 简化处理
}
}
注意,这里我们引入了Akka Cluster Sharding。即使在单机模式下,这也是管理大量持久化Actor的最佳实践。它根据entityId自动分发消息并按需创建Actor实例。
3. 为Jupyter Almond内核注入客户端
现在,我们需要让Jupyter Notebook能轻松调用JupyterGateway。我们可以在启动Almond时,将一个预配置的客户端实例注入到环境中。一个简单的方式是创建一个包含所有工具的predef (预定义)脚本。
假设我们已经将整个项目打包成一个jar。在启动Jupyter时可以这样加载:almond --predef init.scala --classpath my-project.jar
init.scala文件内容如下:
// init.scala
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import com.example.gateway.JupyterGateway
import scala.concurrent.Await
import scala.concurrent.duration._
// 初始化ActorSystem
implicit val system = ActorSystem(Behaviors.empty, "InteractiveKernel")
val gateway = new JupyterGateway()
println("Jupyter Gateway is ready. Use 'gateway' object to interact.")
// 增加一个辅助函数,简化调用
def exec[T](future: scala.concurrent.Future[T]): T = {
Await.result(future, 5.seconds)
}
现在,在Jupyter Notebook的一个单元格里,我们就可以直接编写Scala代码来操作后端系统:
// Jupyter Notebook Cell 1:
println("Sending deposit command...")
val result = exec(gateway.deposit("account-001", 150.75))
println(s"Result: $result")
// Jupyter Notebook Cell 2:
println("Sending withdrawal command...")
val result = exec(gateway.withdraw("account-001", 25.50))
println(s"Result: $result")
这种交互性是革命性的。你可以编写脚本来修复错误数据,执行批量操作,或者进行复杂的业务场景模拟,所有操作都经过了与生产环境完全一致的业务逻辑和ACID持久化保证。
4. 构建实时投射和前端UI
a. Akka Projections
我们需要一个服务来读取事件日志,并构建一个可供查询的视图(Read Model)。Akka Projections是实现这一点的标准方式。
// src/main/scala/com/example/projection/BalanceProjection.scala
package com.example.projection
import akka.actor.typed.ActorSystem
import akka.kafka.scaladsl.Producer
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.query.Offset
import akka.projection.{ProjectionId, scaladsl.SourceProvider}
import akka.projection.eventsourced.EventEnvelope
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.scaladsl.{AtLeastOnceProjection, SourceProvider}
import akka.stream.scaladsl.Source
import com.example.core.AccountEvent
import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
// 这是一个简单的内存投影,用于演示
// 在生产环境中,这可能会写入另一个数据库或缓存
object BalanceProjection {
// 使用线程安全的Map作为我们的Read Model
val balances = new ConcurrentHashMap[String, BigDecimal]()
def createProjection(system: ActorSystem[_]): AtLeastOnceProjection[Offset, EventEnvelope[AccountEvent]] = {
val sourceProvider: SourceProvider[Offset, EventEnvelope[AccountEvent]] =
EventSourcedProvider
.eventsByTag[AccountEvent](system, readJournalPluginId = JdbcReadJournal.Identifier, tag = "account") // 假设事件被打上了tag
// ... 实现投影逻辑
// ... 将更新推送到WebSocket
}
}
注意: 为了使用eventsByTag,你需要在EventSourcedBehavior中配置一个tagger。这是按类型查询事件的标准方式。
b. WebSocket服务
使用Akka HTTP,我们可以轻松创建一个WebSocket端点,当投影更新时推送消息。
// Part of the server setup
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream.scaladsl.{Flow, Source}
// 这是一个简化的WebSocket流程
// 生产级的代码需要管理多个连接,并使用更健壮的源,如`Source.actorRef`
def websocketFlow: Flow[Message, Message, Any] = {
val incomingMessages: Flow[Message, Message, Any] =
Flow[Message].map {
case TextMessage.Strict(txt) => TextMessage.Strict(s"Echo: $txt") // Ignore incoming
}
// 这里的Source应该是连接到Projection更新的事件流
val outgoingMessages: Source[Message, Any] =
Source.repeat("dummy update").map(TextMessage.Strict(_))
Flow.fromSinkAndSource(incomingMessages.to(Sink.ignore), outgoingMessages)
}
val route = path("ws-balance") {
get {
handleWebSocketMessages(websocketFlow)
}
}
c. Jotai 前端
前端部分,我们创建一个React组件来显示余额。
// src/components/AccountBalance.jsx
import React, { useEffect } from 'react';
import { atom, useAtom } from 'jotai';
// 1. 定义一个atom来存储账户余额
const accountBalanceAtom = atom(0);
// 2. 创建一个WebSocket连接并更新atom的hook
const useAccountWebSocket = (accountId) => {
const [, setBalance] = useAtom(accountBalanceAtom);
useEffect(() => {
const ws = new WebSocket(`ws://localhost:8080/ws-balance?accountId=${accountId}`);
ws.onopen = () => {
console.log('WebSocket connected');
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.accountId === accountId) {
console.log('Received balance update:', data.balance);
setBalance(data.balance);
}
} catch (error) {
console.error('Failed to parse WebSocket message:', error);
}
};
ws.onclose = () => {
console.log('WebSocket disconnected');
};
// 清理函数
return () => {
ws.close();
};
}, [accountId, setBalance]);
};
// 3. UI组件
export const AccountBalance = ({ accountId }) => {
const [balance] = useAtom(accountBalanceAtom);
useAccountWebSocket(accountId);
return (
<div>
<h1>Account: {accountId}</h1>
<h2>Balance: ${balance.toFixed(2)}</h2>
</div>
);
};
当Jupyter中的一个命令成功执行并持久化事件后,BalanceProjection会捕捉到这个事件,更新Read Model,并通过WebSocket将新余额推送到前端,Jotai atom更新,UI组件自动重新渲染。整个流程是完全响应式的。
局限性与未来迭代
当前这个实现虽然验证了核心理念,但在生产环境中还有几个问题需要解决:
交互式接口的安全性: 直接在Jupyter中暴露一个能操作生产数据的网关是极其危险的。一个完整的实现必须包含严格的认证和授权机制,确保只有特定角色的用户才能执行特定命令。所有操作都必须被审计。
查询能力的局限: 当前的设计只支持向Actor发送命令。一个更强大的内核应该支持直接在Jupyter中对事件日志进行复杂的、只读的查询,例如,使用Scala集合操作或类似Spark的API来分析特定账户在某个时间段内的事件历史。
最终一致性: 前端UI依赖于最终一致的投影。在某些场景下,用户(尤其是在Jupyter中执行命令的操作者)可能需要“读己之写”的一致性。这需要在网关层增加更复杂的逻辑,比如在命令成功后轮询投影直到状态更新。
内核健壮性: 在Jupyter中运行的
init.scala脚本创建的ActorSystem是临时的。更可靠的架构是让Jupyter内核通过一个独立的、高可用的API服务与后端的ActorSystem集群通信,而不是在自身进程中初始化。这样可以解耦Jupyter的生命周期和核心业务系统的生命周期。