当我们将一个基于 LangChain 构建的 Node.js Agent 服务从单机部署扩展到 Kubernetes 上的多个 Pod 时,灾难发生了。原本设计用于处理复杂、长周期任务(例如:分析用户评论、生成周报、调用外部 API 并整合数据)的 Agent 开始出现行为错乱。同一个任务被两个甚至三个 Pod 同时抢占,导致重复调用昂贵的 LLM API、向数据库写入重复或冲突的数据。问题的根源很明确:我们的 Agent 是有状态的,但我们的架构却是无状态的、可水平扩展的。我们缺少一个可靠的分布式协调机制。
最初的修补方案是在数据库中增加一个状态字段,比如 processing。一个 Worker 在开始处理任务前,先将任务记录的 status 从 pending 更新为 processing。这在低并发下似乎可行,但在真实负载下,竞态条件的问题立刻暴露无遗。两个 Pod 同时读取到 pending 状态,都尝试更新,最终导致其中一个的更新被覆盖,或者更糟,两者都认为自己成功获取了任务的所有权。
我们需要一个真正的分布式锁。社区里常见的方案是基于 Redis 的 SETNX 命令。但这又引出了新的问题:如果一个持有锁的 Worker 进程异常崩溃,没有机会释放锁,那么这个任务就会被永久锁定,造成系统停滞。虽然可以通过设置锁的过期时间来缓解,但过期时间的设置本身就是一个难题。设置太短,可能在任务执行期间锁就过期了;设置太长,又会延长故障恢复的时间。Redlock 算法虽然试图解决这些问题,但其复杂性和争议性让我们望而却步。
在真实项目中,我们需要的是一个更根本、更可靠的解决方案。这让我们将目光投向了 etcd。etcd 不仅仅是一个键值存储,它的核心是 Raft 一致性协议,是为分布式系统协调而生的。它的 Lease(租约)机制,为我们提供了解决这个问题的完美模型。一个客户端可以创建一个租约,并在这个租约之上附加一个或多个 Key。客户端必须在租约的 TTL(Time To Live)内定期“续约”(keep-alive),证明自己仍然存活。一旦客户端崩溃或网络中断,无法续约,租约就会自动过期,所有与之关联的 Key 也会被 etcd 自动删除。这意味着锁的释放是自动且可靠的,完美解决了 Worker 崩溃导致死锁的核心痛点。
我们的目标是:利用 etcd 的 Lease 机制,为 LangChain Agent 构建一个健壮的、支持故障转移的分布式任务执行引擎。
架构设计与核心组件
在深入代码之前,我们先明确一下系统的核心流程和组件。
- Task Queue: 一个任务的来源,为简化模型,我们这里不引入 Kafka 或 RabbitMQ,而是假设有一个可以拉取待处理任务的来源。
- Agent Worker (Node.js): 运行 LangChain Agent 的实际工作进程。我们会启动多个 Worker 实例来模拟分布式环境。
- etcd Cluster: 作为我们分布式协调的中枢。
- LeaseManager: 一个封装了 etcd Lease 创建、续约和撤销逻辑的模块。这是保证锁可靠性的核心。
- DistributedTaskRunner: 负责将 Agent 的执行与
LeaseManager结合起来,确保在任何时刻,一个任务只被一个 Worker 执行。
整个流程如下:
sequenceDiagram
participant Worker A
participant Worker B
participant etcd
participant TaskSource
loop Poll for tasks
Worker A->>TaskSource: Get pending task (e.g., task-123)
Worker B->>TaskSource: Get pending task (e.g., task-123)
end
par Try to acquire lock
Worker A->>etcd: Create Lease (leaseA) & KeepAlive
Worker A->>etcd: Try to PUT /tasks/locks/task-123 with leaseA
and
Worker B->>etcd: Create Lease (leaseB) & KeepAlive
Worker B->>etcd: Try to PUT /tasks/locks/task-123 with leaseB
end
etcd-->>Worker A: PUT Success (Acquired Lock)
etcd-->>Worker B: PUT Failed (Already locked)
Worker A->>Worker A: Start executing LangChain Agent for task-123
Worker B->>Worker B: Log "Task locked" and skip
Note over Worker A: While executing, leaseA is kept alive in background.
Worker A->>etcd: Update task status: /tasks/status/task-123 = {step: 1}
Worker A->>etcd: Update task status: /tasks/status/task-123 = {step: 2}
critical Simulating Worker A crash
Worker A-xetcd: Stop sending KeepAlive for leaseA
end
Note over etcd: etcd detects leaseA expired
etcd->>etcd: Delete /tasks/locks/task-123 automatically
loop Poll for tasks
Worker B->>TaskSource: Get pending task (task-123)
end
Worker B->>etcd: Try to PUT /tasks/locks/task-123 with leaseB
etcd-->>Worker B: PUT Success (Acquired Lock)
Worker B->>etcd: Read /tasks/status/task-123
etcd-->>Worker B: Return {step: 2}
Worker B->>Worker B: Resume LangChain Agent from step 3
这个时序图清晰地展示了锁的竞争、持有锁的 Worker 崩溃后锁的自动释放,以及新的 Worker 如何接管并从上次的状态恢复执行。
代码实现:构建 LeaseManager
我们首先来构建与 etcd 交互的核心 LeaseManager。这个模块必须是稳定和可复用的。我们将使用 etcd3 这个 Node.js 客户端库。
config.js
// config.js
// 生产环境中,这些配置应来自环境变量或配置中心
export const etcdConfig = {
hosts: 'http://127.0.0.1:2379',
};
export const taskConfig = {
// 租约的TTL,单位秒。
// 需要在任务最长单步执行时间 和 故障恢复速度之间做权衡
leaseTTL: 15,
// 任务锁在etcd中的前缀
lockPrefix: '/tasks/locks/',
// 任务状态在etcd中的前缀
statusPrefix: '/tasks/status/',
};
leaseManager.js
// leaseManager.js
import { Etcd3, Lease } from 'etcd3';
import { etcdConfig, taskConfig } from './config.js';
import pino from 'pino';
const logger = pino({ level: 'info' });
/**
* 负责管理etcd租约和分布式锁的获取与释放。
* 每个需要获取锁的worker都应该实例化一个LeaseManager。
*/
export class LeaseManager {
constructor(taskId, workerId) {
if (!taskId || !workerId) {
throw new Error('taskId and workerId are required');
}
this.client = new Etcd3(etcdConfig);
this.taskId = taskId;
this.workerId = workerId;
this.lockKey = `${taskConfig.lockPrefix}${this.taskId}`;
/** @type {Lease | null} */
this.lease = null;
}
/**
* 尝试获取指定任务的分布式锁。
* @returns {Promise<boolean>} - 如果成功获取锁,返回 true,否则返回 false。
*/
async acquireLock() {
try {
// 1. 创建一个租约,并立即开始续约(keep-alive)
this.lease = this.client.lease(taskConfig.leaseTTL);
// keep-alive的心跳会自动在后台运行。
// 如果程序崩溃,心跳停止,租约会在TTL后过期。
this.lease.on('lost', (err) => {
logger.error({
err,
taskId: this.taskId,
workerId: this.workerId,
},
'Lease lost unexpectedly. This worker must stop processing.'
);
// 在真实项目中,这里应该有一个紧急停止机制
// 比如 process.exit(1),让容器编排系统重启它
});
const leaseId = await this.lease.grant();
logger.info({ taskId: this.taskId, workerId: this.workerId, leaseId }, 'Lease granted');
// 2. 尝试使用事务来原子性地创建锁
// 事务的条件是:当且仅当 lockKey 不存在时 (version is 0)
const txn = await this.client.if(this.lockKey, 'version', '==', 0)
// Then: 创建这个 key,并附加我们的租约
.put(this.lockKey)
.value(JSON.stringify({ workerId: this.workerId, acquiredAt: new Date().toISOString() }))
.lease(leaseId)
// Else: 什么都不做
.commit();
if (txn.succeeded) {
logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Lock acquired successfully');
return true;
} else {
logger.warn({ taskId: this.taskId, workerId: this.workerId }, 'Failed to acquire lock, another worker holds it');
// 获取锁失败,我们应该立即撤销刚刚创建的租约,避免资源浪费
await this.revokeLease();
return false;
}
} catch (error) {
logger.error({ err: error, taskId: this.taskId, workerId: this.workerId }, 'Error acquiring lock');
// 如果在获取过程中发生任何错误,也需要尝试清理租约
if (this.lease) {
await this.revokeLease().catch(err => logger.error({ err }, 'Failed to revoke lease during error handling'));
}
return false;
}
}
/**
* 主动释放锁(任务完成时调用)。
* 这会撤销租约,etcd会自动删除所有关联的key。
*/
async releaseLock() {
if (this.lease) {
logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Releasing lock...');
await this.revokeLease();
logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Lock released');
}
}
/**
* 封装租约撤销逻辑,确保lease对象被清理。
* @private
*/
async revokeLease() {
if (this.lease) {
try {
await this.lease.revoke();
this.lease = null;
} catch (error) {
logger.error({ err: error, taskId: this.taskId, workerId: this.workerId }, 'Error revoking lease');
// 即使撤销失败,也要将本地引用置空
this.lease = null;
}
}
}
/**
* 获取任务当前状态
*/
async getTaskStatus() {
const statusKey = `${taskConfig.statusPrefix}${this.taskId}`;
const status = await this.client.get(statusKey).json();
return status || { currentStep: 0, history: [] };
}
/**
* 更新任务状态
* @param {object} status - 新的状态对象
*/
async updateTaskStatus(status) {
const statusKey = `${taskConfig.statusPrefix}${this.taskId}`;
await this.client.put(statusKey).value(JSON.stringify(status));
}
}
这段代码是系统的基石。注意 acquireLock 方法中的事务用法,if(key, 'version', '==', 0) 是实现原子性“当Key不存在时创建”的关键。同时,错误处理和资源清理(revokeLease)也至关重要,确保了即使在获取锁的逻辑中途失败,也不会留下悬空的租约。
集成 LangChain Agent
现在我们将 LeaseManager 与一个模拟的 LangChain Agent 结合起来。这个 Agent 会执行一个多步骤的任务,并在每一步后更新其状态。
agent.js
// agent.js
import { taskConfig } from './config.js';
import { LeaseManager } from './leaseManager.js';
import pino from 'pino';
const logger = pino({ level: 'info' });
// 模拟一个 LangChain Agent 执行的函数
// 在真实世界中,这里会是 LangChain 的 RunnableSequence 或 AgentExecutor
async function runLangChainTask(taskId, currentStep) {
logger.info({ taskId, currentStep }, `Starting LangChain execution from step ${currentStep + 1}`);
// 模拟多步骤任务
const steps = [
{ name: 'Data Ingestion', duration: 3000 },
{ name: 'LLM Analysis', duration: 5000 },
{ name: 'Report Generation', duration: 4000 },
{ name: 'Finalization', duration: 2000 },
];
for (let i = currentStep; i < steps.length; i++) {
const step = steps[i];
logger.info({ taskId, step: step.name }, `Executing step ${i + 1}`);
// 模拟异步操作
await new Promise(resolve => setTimeout(resolve, step.duration));
logger.info({ taskId, step: step.name }, `Step ${i + 1} completed`);
// 返回下一步的索引
yield i + 1;
}
}
/**
* 分布式任务执行器
*/
export class DistributedTaskRunner {
constructor(taskId, workerId) {
this.taskId = taskId;
this.workerId = workerId;
this.leaseManager = new LeaseManager(taskId, workerId);
this.logger = logger.child({ taskId, workerId });
}
async run() {
this.logger.info('Attempting to run task...');
// 1. 尝试获取锁
const lockAcquired = await this.leaseManager.acquireLock();
if (!lockAcquired) {
// 获取锁失败,直接退出
this.logger.warn('Could not acquire lock, another worker is active. Exiting.');
return;
}
try {
this.logger.info('Lock acquired. Starting task processing.');
// 2. 获取任务的最后已知状态
let status = await this.leaseManager.getTaskStatus();
this.logger.info({ status }, 'Retrieved initial task status.');
// 3. 执行 Agent 任务
const agentExecutor = runLangChainTask(this.taskId, status.currentStep);
for await (const completedStepIndex of agentExecutor) {
// 4. 每完成一步,就更新一次状态
// 这是一个关键的容错点。如果在这之后、下一次循环前崩溃,
// 新的 worker 会从这一步恢复。
const newStatus = {
...status,
currentStep: completedStepIndex,
history: [...status.history, { step: completedStepIndex, completedAt: new Date().toISOString() }],
lastUpdatedBy: this.workerId,
};
await this.leaseManager.updateTaskStatus(newStatus);
this.logger.info({ currentStep: completedStepIndex }, 'Updated task status to etcd.');
}
this.logger.info('Task completed successfully.');
// 可以在这里清理状态信息
// await this.leaseManager.client.delete().key(`${taskConfig.statusPrefix}${this.taskId}`);
} catch (error) {
this.logger.error({ err: error }, 'An error occurred during task execution.');
// 发生未知错误,但因为我们持有租约,所以锁依然是我们的。
// 这里的策略可以是:
// 1. 记录错误状态到 etcd
// 2. 直接退出,让租约过期,由其他 worker 重试
// 我们选择后者,因为租约机制保证了最终会释放锁。
} finally {
// 5. 任务完成或异常后,主动释放锁
await this.leaseManager.releaseLock();
}
}
}
DistributedTaskRunner 是整个逻辑的核心。它首先尝试获取锁。成功后,它不是从头开始执行任务,而是先从 etcd 读取任务的 status,然后从 currentStep 继续。runLangChainTask 被实现为一个异步生成器 (async function*),这使得我们可以在每一步执行完成后,暂停执行,更新状态,然后再继续。这种模式在处理长周期、多步骤任务时非常有用。
模拟分布式环境
最后,我们创建一个主文件来模拟多个 Worker 同时尝试处理同一个任务。
main.js
// main.js
import { DistributedTaskRunner } from './agent.js';
import { randomUUID } from 'crypto';
import pino from 'pino';
const logger = pino({ level: 'info' });
async function main() {
const taskId = 'complex-report-gen-123';
const workerCount = 3;
logger.info(`Simulating ${workerCount} workers competing for task: ${taskId}`);
const promises = [];
for (let i = 0; i < workerCount; i++) {
const workerId = `worker-${randomUUID().slice(0, 8)}`;
const runner = new DistributedTaskRunner(taskId, workerId);
// 让每个worker的启动时间稍微错开,以模拟真实世界
const delay = Math.random() * 1000;
const p = new Promise(resolve => setTimeout(resolve, delay)).then(() => runner.run());
promises.push(p);
}
await Promise.all(promises);
logger.info('All worker simulations finished.');
// 在真实应用中,进程会持续运行,监听新任务。
// 这里我们为了演示目的,需要手动退出。
// 注意:etcd3 客户端会保持连接,导致进程不退出。
process.exit(0);
}
main().catch(err => {
logger.error(err, 'Simulation failed');
process.exit(1);
});
要运行这个模拟,首先确保你有一个本地运行的 etcd 实例 (例如通过 Docker: docker run -d -p 2379:2379 --name etcd-gcr-v3.5.0 gcr.io/etcd-development/etcd:v3.5.0 /usr/local/bin/etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379)。
然后在一个终端运行 node main.js。你会看到类似以下的日志输出:
{"level":30,"time":...,"pid":... ,"msg":"Simulating 3 workers competing for task: complex-report-gen-123"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-a6b7c8d9","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","leaseId":"...","msg":"Lease granted"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Lock acquired successfully"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","leaseId":"...","msg":"Lease granted"}
{"level":40,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Failed to acquire lock, another worker holds it"}
{"level":40,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Could not acquire lock, another worker is active. Exiting."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Lock acquired. Starting task processing."}
...
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","step":{"name":"Data Ingestion"},"msg":"Executing step 1"}
...
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","step":{"name":"Data Ingestion"},"msg":"Step 1 completed"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","currentStep":1,"msg":"Updated task status to etcd."}
...
日志清楚地显示,尽管三个 Worker 同时启动,但只有一个成功获取了锁并开始处理任务,其他两个则 gracefully 地退出了。你可以通过在任务执行中途(例如,在 Report Generation 步骤)手动 Ctrl+C 停止程序来模拟 Worker 崩溃。等待大约 15 秒(我们设置的 leaseTTL)后,再次运行 main.js,你会看到新的 Worker 会获取锁,并从崩溃前记录的最后一步继续执行,而不是从头开始。
局限性与未来展望
我们构建的这套机制解决了分布式环境下 LangChain Agent 的任务协调和幂等执行问题,但这并非银弹。
首先,此方案的核心是任务执行的互斥性,它并不适用于需要并行处理子任务的场景。它的定位是确保有状态、长周期的单个任务在分布式环境下的正确性和一致性。
其次,etcd 本身不适合存储大量数据。我们的设计明智地将任务的详细状态(可能包含大量文本或数据)与协调元数据(锁和当前步骤索引)分离开。在生产系统中,任务的完整上下文和历史记录应该存储在更合适的存储中,如 S3、MongoDB 或 PostgreSQL。etcd 只负责存储小体积、高频率读写的元数据。
租约的 TTL 是一个需要精细调整的参数。它必须大于任何单步任务可能的最长执行时间,否则可能在任务正常执行时因续约延迟而丢失锁。同时,它又直接决定了系统在节点故障后的最长恢复时间。设置一个合理的 TTL 并配合完善的监控告警是生产环境的必要条件。
未来的一个优化方向是,可以利用 etcd 的 Watch 机制来代替 Worker 轮询任务源。可以设计一个调度器,当新任务到来时,在 etcd 中创建一个表示任务的 Key。所有空闲的 Worker 都在 Watch 这个 Key 的前缀,一旦有新 Key 创建,它们就会被唤醒并尝试获取锁。这能构建一个更为高效、事件驱动的调度系统。