最近团队遇到了一个棘手的状态一致性问题。我们的流处理平台核心是 Apache Flink,它消费实时数据,处理后需要同时写入到两个异构系统中:一个用于在线查询的 PostgreSQL 数据库,另一个是用于全文检索的 Elasticsearch 集群。问题在于,如果 Flink Job 在写入 Elasticsearch 成功后、写入 PostgreSQL 之前发生故障并重启,就会造成数据不一致。传统的 Flink Checkpoint 机制能保证其自身状态的 exactly-once,但无法原子化地保证对外部系统的多点写入。
手动进行数据对账和修复的成本越来越高,我们急需一个自动化的、能保证端到端原子性的解决方案。初步构想是,能否将整个流程——从 Flink Job 镜像的构建、推送到最终在 K8s 上部署运行,再到它与外部系统的交互——都包裹在一个大的分布式事务中?要么全部成功,要么全部回滚,不存在中间状态。
这个想法听起来很重,但对于我们金融场景下的某些核心数据流,强一致性的要求压倒了一切。我们决定探索一种基于 Kubernetes Operator 的实现,通过自定义资源(CRD)来声明式地管理这种“事务性 Flink 作业”的生命周期。
技术选型与架构决策
为了实现这个目标,我们需要一个协调器(Transaction Coordinator)和多个参与者(Participants),并选择一种强一致性的事务协议。
事务协议:Two-Phase Commit (2PC)
我们选择了经典的二阶段提交协议。是的,2PC 因其同步阻塞、单点故障等问题而备受诟病。但在我们的场景里,事务的持续时间很短(主要是在作业启动阶段),且参与者数量固定且有限。其提供的强原子性保证(ACID 中的 ‘A’)正是我们最需要的。相比 Saga 等最终一致性方案,2PC 避免了复杂的补偿逻辑,对于这种“部署即事务”的场景更为直接。协调器:Kubernetes Operator
将事务协调的逻辑内聚到一个自定义的 Kubernetes Operator 中是这个方案的核心。我们可以定义一个名为TransactionalFlinkJob的 CRD。Operator 的 Reconcile Loop 天然就是状态机,非常适合扮演 2PC 中协调器的角色。当用户kubectl apply一个 CR 时,Reconcile Loop 被触发,开始驱动整个事务流程。通信协议:gRPC & NATS
协调器与参与者之间的通信需要一个明确的契约。我们选择 gRPC 来定义 2PC 的接口(Prepare,Commit,Rollback)。gRPC 基于 Protobuf,提供了强类型、高性能的 RPC 调用。
然而,让 Operator 直接调用所有参与者的 gRPC 服务会增加耦合和网络配置的复杂性。因此,我们引入 NATS 作为消息总线。协调器将Prepare、Commit等指令发布到特定的 NATS 主题,所有参与者订阅这些主题来接收指令,并通过一个回复主题(Reply-To)将投票结果发回。这种发布-订阅模式解耦了协调器和参与者。参与者定义
在这个事务中,至少有三个参与者:- 构建服务 (Builder): Flink Job 的镜像构建和推送过程也必须是事务性的。如果代码编译失败或镜像推送不成功,整个事务必须回滚。我们选择 Tekton 来承担这个角色。一个
Tekton TaskRun可以被看作是一个短暂的事务参与者。 - Flink 作业 (Flink Job): Flink 作业本身是核心的业务参与者。在
Prepare阶段,它需要检查所有外部连接(PostgreSQL, Elasticsearch)是否正常,是否可以锁定相关资源。 - 其他外部服务: 任何需要与 Flink 同步写入的下游服务都可以实现 2PC 参与者接口,并加入到事务中。
- 构建服务 (Builder): Flink Job 的镜像构建和推送过程也必须是事务性的。如果代码编译失败或镜像推送不成功,整个事务必须回滚。我们选择 Tekton 来承担这个角色。一个
架构图如下所示:
graph TD
subgraph Kubernetes Cluster
A[User: kubectl apply -f job.yaml] --> B(API Server);
B -- Watches --> C{TransactionalFlinkJob Operator};
C -- Acts as Coordinator --> D[Transaction Logic];
subgraph NATS
E(nats-server)
end
D -- 1. Publishes Prepare --> E;
E -- 2. Delivers Prepare --> F[Tekton TaskRun Pod];
E -- 2. Delivers Prepare --> G[Flink Job Pod];
F -- 3. Votes Commit/Abort --> E;
G -- 3. Votes Commit/Abort --> E;
E -- 4. Delivers Votes --> D;
D -- 5. Decides & Publishes Commit/Rollback --> E;
E -- 6. Delivers Decision --> F & G;
C -- Creates/Manages --> F;
C -- Creates/Manages --> G[Flink Job Deployment];
end
F -- Interacts with --> H(Image Registry);
G -- Interacts with --> I(PostgreSQL);
G -- Interacts with --> J(Elasticsearch);
步骤化实现
1. 定义 TransactionalFlinkJob CRD
首先,我们需要定义 CRD,来描述我们的事务性作业。
transactionalflinkjob_crd.yaml:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: transactionalflinkjobs.stream.example.com
spec:
group: stream.example.com
versions:
- name: v1alpha1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
properties:
transactionId:
type: string
description: "Unique ID for the transaction, generated by the user."
flinkJob:
type: object
properties:
image:
type: string
description: "Base image for the Flink job."
jarUrl:
type: string
description: "URL of the Flink job JAR file to be built into the image."
entryClass:
type: string
parallelism:
type: integer
participants:
type: array
items:
type: object
properties:
name:
type: string
type:
type: string
enum: ["TektonBuild", "FlinkApp", "ExternalService"]
endpoint:
type: string
description: "gRPC endpoint for external services, or identifier for internal ones."
status:
type: object
properties:
phase:
type: string
enum: ["Pending", "Preparing", "Prepared", "Committing", "Committed", "RollingBack", "RolledBack", "Failed"]
conditions:
type: array
items:
# ... Standard Kubernetes conditions
scope: Namespaced
names:
plural: transactionalflinkjobs
singular: transactionalflinkjob
kind: TransactionalFlinkJob
shortNames:
- tfj
这里的 spec 包含了事务 ID、Flink 作业的构建信息和所有参与者的列表。status.phase 字段至关重要,它反映了当前事务的进展状态。
2. 定义 2PC 的 gRPC 接口
接下来,我们使用 Protobuf 定义 2PC 的核心接口。
proto/transaction.proto:
syntax = "proto3";
package transaction.v1;
option go_package = "example.com/stream/gen/transaction/v1;transactionv1";
// TransactionParticipantService defines the interface for a 2PC participant.
service TransactionParticipantService {
// Prepare asks the participant to prepare for a transaction.
// The participant must ensure it can commit if it votes to commit.
rpc Prepare(PrepareRequest) returns (PrepareResponse) {}
// Commit tells the participant to commit the transaction.
rpc Commit(CommitRequest) returns (CommitResponse) {}
// Rollback tells the participant to abort the transaction.
rpc Rollback(RollbackRequest) returns (RollbackResponse) {}
}
enum Vote {
VOTE_UNSPECIFIED = 0;
VOTE_COMMIT = 1;
VOTE_ABORT = 2;
}
message PrepareRequest {
string transaction_id = 1;
bytes payload = 2; // Any context needed for preparation
}
message PrepareResponse {
string transaction_id = 1;
string participant_name = 2;
Vote vote = 3;
string message = 4; // Optional message, e.g., reason for abort
}
message CommitRequest {
string transaction_id = 1;
}
message CommitResponse {
string transaction_id = 1;
string participant_name = 2;
bool success = 3;
}
message RollbackRequest {
string transaction_id = 1;
}
message RollbackResponse {
string transaction_id = 1;
string participant_name = 2;
bool success = 3;
}
这个 gRPC 服务定义了所有参与者都必须实现的标准接口,确保了通信的规范性。
3. 实现 Operator 的核心协调逻辑
我们使用 Kubebuilder 来搭建 Operator 的骨架。核心逻辑在 Reconcile 函数中。这里只展示关键部分的伪代码和逻辑片段,真实项目中代码会更复杂。
internal/controller/transactionalflinkjob_controller.go:
package controller
import (
// ... imports
"context"
"fmt"
"time"
"github.com/nats-io/nats.go"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
streamv1alpha1 "example.com/stream/api/v1alpha1"
)
type TransactionalFlinkJobReconciler struct {
client.Client
Scheme *runtime.Scheme
NatsConn *nats.Conn
}
func (r *TransactionalFlinkJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
var job streamv1alpha1.TransactionalFlinkJob
if err := r.Get(ctx, req.NamespacedName, &job); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// State machine based on job.Status.Phase
switch job.Status.Phase {
case "": // Initial state
logger.Info("New job detected, moving to Pending")
job.Status.Phase = "Pending"
// Set initial conditions
return ctrl.Result{}, r.Status().Update(ctx, &job)
case "Pending":
logger.Info("Starting transaction", "TransactionID", job.Spec.TransactionId)
// Here, we would create necessary resources like the Tekton TaskRun
// For simplicity, let's assume we just move to Preparing
job.Status.Phase = "Preparing"
return ctrl.Result{}, r.Status().Update(ctx, &job)
case "Preparing":
// This is the core of the first phase
logger.Info("Broadcasting Prepare message via NATS")
numParticipants := len(job.Spec.Participants)
votes := make(chan *transactionv1.PrepareResponse, numParticipants)
// A unique subject for this transaction's replies
replySubject := fmt.Sprintf("transaction.%s.replies", job.Spec.TransactionId)
// Subscribe to replies before publishing
sub, err := r.NatsConn.Subscribe(replySubject, func(msg *nats.Msg) {
var resp transactionv1.PrepareResponse
if err := proto.Unmarshal(msg.Data, &resp); err == nil {
votes <- &resp
}
})
if err != nil {
// Handle NATS subscription error, maybe retry
return ctrl.Result{Requeue: true}, err
}
defer sub.Unsubscribe()
// Publish Prepare request
prepareReq := &transactionv1.PrepareRequest{TransactionId: job.Spec.TransactionId}
reqBytes, _ := proto.Marshal(prepareReq)
publishSubject := fmt.Sprintf("transaction.%s.prepare", job.Spec.TransactionId)
err = r.NatsConn.PublishRequest(publishSubject, replySubject, reqBytes)
if err != nil {
return ctrl.Result{Requeue: true}, err
}
// Collect votes with a timeout
collectedVotes := make(map[string]transactionv1.Vote)
timeout := time.After(30 * time.Second)
collectLoop:
for i := 0; i < numParticipants; i++ {
select {
case vote := <-votes:
logger.Info("Received vote", "Participant", vote.ParticipantName, "Vote", vote.Vote)
collectedVotes[vote.ParticipantName] = vote.Vote
case <-timeout:
logger.Error(nil, "Timeout waiting for votes")
job.Status.Phase = "RollingBack" // Timeout leads to rollback
return ctrl.Result{}, r.Status().Update(ctx, &job)
}
}
// Decision logic
allCommit := true
for _, vote := range collectedVotes {
if vote != transactionv1.Vote_VOTE_COMMIT {
allCommit = false
break
}
}
if allCommit {
logger.Info("All participants voted to commit. Moving to Committing.")
job.Status.Phase = "Committing"
} else {
logger.Info("At least one participant voted to abort. Moving to RollingBack.")
job.Status.Phase = "RollingBack"
}
return ctrl.Result{}, r.Status().Update(ctx, &job)
case "Committing":
logger.Info("Broadcasting Commit message")
commitReq := &transactionv1.CommitRequest{TransactionId: job.Spec.TransactionId}
reqBytes, _ := proto.Marshal(commitReq)
publishSubject := fmt.Sprintf("transaction.%s.commit", job.Spec.TransactionId)
if err := r.NatsConn.Publish(publishSubject, reqBytes); err != nil {
// A real implementation needs a persistent transaction log to retry commits
logger.Error(err, "Failed to publish commit message. Critical state!")
job.Status.Phase = "Failed" // This state requires manual intervention
return ctrl.Result{}, r.Status().Update(ctx, &job)
}
// In a real system, we'd wait for ACKs from participants.
// For now, we assume commit will succeed.
job.Status.Phase = "Committed"
// Now we can safely create the FlinkDeployment resource
return ctrl.Result{}, r.Status().Update(ctx, &job)
case "RollingBack":
// Broadcast Rollback message, similar to Commit
logger.Info("Broadcasting Rollback message")
rollbackReq := &transactionv1.RollbackRequest{TransactionId: job.Spec.TransactionId}
reqBytes, _ := proto.Marshal(rollbackReq)
publishSubject := fmt.Sprintf("transaction.%s.rollback", job.Spec.TransactionId)
r.NatsConn.Publish(publishSubject, reqBytes) // Fire-and-forget for rollback
job.Status.Phase = "RolledBack"
// Clean up any resources created, e.g., Tekton TaskRun
return ctrl.Result{}, r.Status().Update(ctx, &job)
// ... other phases ...
}
return ctrl.Result{}, nil
}
一个常见的错误是:在协调器中直接处理所有逻辑。在真实项目中,协调器在做出 Commit 或 Rollback 决定后,必须将该决定持久化到事务日志中(例如一个 etcd 条目或数据库记录)。只有在日志记录成功后,才能向参与者发送指令。这样即使协调器崩溃重启,也能从日志中恢复事务状态,避免脑裂。
4. 实现 Tekton 参与者
我们需要一个 Tekton Task 来构建镜像。这个 Task 的 Pod 必须运行一个 gRPC 服务来实现 TransactionParticipantService。
tekton-build-task.yaml:
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: transactional-image-builder
spec:
params:
- name: transactionId
description: The ID of the transaction to participate in.
- name: natsUrl
description: The URL of the NATS server.
# ... other params like jarUrl, dockerfile, etc.
steps:
- name: build-and-push
image: gcr.io/kaniko-project/executor:latest # Example using Kaniko
# This step only runs AFTER the sidecar votes to commit
script: |
#!/busybox/sh
# Wait for the commit signal from the sidecar
while [ ! -f /workspace/tx/COMMIT ]; do
echo "Waiting for commit signal..."
sleep 1
done
echo "Commit signal received, starting build..."
# /kaniko/executor --context ...
echo "Build complete."
- name: participant-sidecar
image: my-company/tx-participant:latest # Our custom participant image
args:
- "--type=TektonBuild"
- "--participant-name=builder-$(context.taskRun.name)"
- "--transaction-id=$(params.transactionId)"
- "--nats-url=$(params.natsUrl)"
这里的技巧在于,build-and-push 步骤会等待一个信号文件 (/workspace/tx/COMMIT)。这个文件由 participant-sidecar 容器在收到 Commit 消息后创建。如果收到 Rollback 消息,Sidecar 会直接退出,导致 TaskRun 失败,从而实现了事务性。
Sidecar 的 Go 代码大致如下:
// Simplified sidecar logic
func main() {
// ... parse flags ...
// Connect to NATS and subscribe to prepare/commit/rollback topics
nc, _ := nats.Connect(natsUrl)
// Subscribe to Prepare
nc.Subscribe(fmt.Sprintf("transaction.%s.prepare", txId), func(m *nats.Msg) {
// In Tekton, "prepare" means checking if build tools are available
// and credentials for the registry are configured.
canBuild := checkBuildPrerequisites()
vote := transactionv1.Vote_VOTE_COMMIT
if !canBuild {
vote = transactionv1.Vote_VOTE_ABORT
}
resp := &transactionv1.PrepareResponse{Vote: vote, ...}
respBytes, _ := proto.Marshal(resp)
nc.Publish(m.Reply, respBytes)
})
// Subscribe to Commit
nc.Subscribe(fmt.Sprintf("transaction.%s.commit", txId), func(m *nats.Msg) {
// Create the signal file to unblock the build step
os.Create("/workspace/tx/COMMIT")
})
// Subscribe to Rollback
nc.Subscribe(fmt.Sprintf("transaction.%s.rollback", txId), func(m *nats.Msg) {
// Exit with a non-zero code to fail the TaskRun
os.Exit(1)
})
// Keep the sidecar running
select {}
}
5. 修改 Flink 作业以支持 2PC
最后,我们需要让 Flink 作业也成为一个参与者。这需要在 Flink 的 main 方法启动时,额外启动一个 gRPC/NATS 服务。
FlinkTransactionalJob.java:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
// ... other imports for gRPC, NATS, etc.
public class FlinkTransactionalJob {
private static final String PARTICIPANT_NAME = "flink-job";
private static volatile TransactionState state = TransactionState.PENDING;
// A simple in-memory state for the transaction
enum TransactionState { PENDING, PREPARED, COMMITTED, ROLLED_BACK }
public static void main(String[] args) throws Exception {
String transactionId = System.getenv("TRANSACTION_ID");
String natsUrl = System.getenv("NATS_URL");
// Start the transaction participant service in a background thread
Thread participantThread = new Thread(() -> startParticipantService(transactionId, natsUrl));
participantThread.setDaemon(true);
participantThread.start();
// The Flink job will not start processing until the transaction is committed
while (state != TransactionState.COMMITTED) {
if (state == TransactionState.ROLLED_BACK) {
System.out.println("Transaction rolled back. Exiting.");
System.exit(1);
}
Thread.sleep(1000);
}
System.out.println("Transaction committed. Starting Flink job execution.");
// --- Standard Flink Job Logic ---
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// ... define sources, transformations, and sinks ...
env.execute("My Transactional Flink Job");
}
private static void startParticipantService(String transactionId, String natsUrl) {
try {
Connection nc = Nats.connect(natsUrl);
Dispatcher d = nc.createDispatcher(msg -> {});
// Subscribe to Prepare
String prepareSubject = String.format("transaction.%s.prepare", transactionId);
d.subscribe(prepareSubject, msg -> {
System.out.println("Received Prepare request");
// Here is the critical part for a Flink job.
// "Prepare" means checking connections to all external systems.
boolean canConnectPostgres = checkPostgresConnection();
boolean canConnectElastic = checkElasticsearchConnection();
PrepareResponse.Builder respBuilder = PrepareResponse.newBuilder()
.setTransactionId(transactionId)
.setParticipantName(PARTICIPANT_NAME);
if (canConnectPostgres && canConnectElastic) {
System.out.println("Voting COMMIT");
respBuilder.setVote(Vote.VOTE_COMMIT);
state = TransactionState.PREPARED;
} else {
System.out.println("Voting ABORT");
respBuilder.setVote(Vote.VOTE_ABORT);
state = TransactionState.ROLLED_BACK;
}
nc.publish(msg.getReplyTo(), respBuilder.build().toByteArray());
});
// Subscribe to Commit
String commitSubject = String.format("transaction.%s.commit", transactionId);
d.subscribe(commitSubject, msg -> {
if (state == TransactionState.PREPARED) {
System.out.println("Received Commit. Unlocking job execution.");
state = TransactionState.COMMITTED;
}
});
// Subscribe to Rollback
String rollbackSubject = String.format("transaction.%s.rollback", transactionId);
d.subscribe(rollbackSubject, msg -> {
System.out.println("Received Rollback.");
state = TransactionState.ROLLED_BACK;
});
} catch (Exception e) {
e.printStackTrace();
state = TransactionState.ROLLED_BACK; // Fail safe
}
}
// Dummy check methods
private static boolean checkPostgresConnection() { return true; }
private static boolean checkElasticsearchConnection() { return true; }
}
这里的坑在于:Flink Job 的主线程和 gRPC 服务的线程需要正确同步。通过一个 volatile 状态变量,主线程会阻塞,直到后台的参与者服务收到 Commit 指令。如果收到 Rollback,或者发生任何异常,程序直接退出,防止 Flink Job 在一个不确定的状态下运行。
当前方案的局限性
这个方案虽然实现了我们最初设想的原子化部署和执行,但它的复杂性不容忽视,并且 2PC 的固有缺陷依然存在。
首先,同步阻塞。在 Preparing 阶段,如果任何一个参与者(比如 Tekton build pod 调度慢)响应延迟,整个事务都会被阻塞。这会增加 Operator 的资源消耗和处理延迟。
其次,协调器单点问题。虽然 Operator 本身可以做到高可用(多副本),但事务状态的管理必须依赖一个强一致性的存储(etcd 满足这个要求)。如果 Operator 在写入事务日志后、发送 Commit 消息前崩溃,恢复逻辑需要非常严谨,以确保不会丢失或重复发送决策。我们当前内存中的实现是不够健壮的。
最后,适用范围有限。这种重量级的事务管理方式只适用于对一致性要求极高、且事务周期较短的场景。对于长时间运行的业务流程,或者需要高吞吐量的场景,基于 Saga 或事件驱动的最终一致性模型通常是更合适的选择。它将一个大的技术事务分解为多个业务步骤,每个步骤都可以独立完成和补偿。