基于 etcd Lease 机制为 Node.js LangChain Agent 实现分布式任务协调


当我们将一个基于 LangChain 构建的 Node.js Agent 服务从单机部署扩展到 Kubernetes 上的多个 Pod 时,灾难发生了。原本设计用于处理复杂、长周期任务(例如:分析用户评论、生成周报、调用外部 API 并整合数据)的 Agent 开始出现行为错乱。同一个任务被两个甚至三个 Pod 同时抢占,导致重复调用昂贵的 LLM API、向数据库写入重复或冲突的数据。问题的根源很明确:我们的 Agent 是有状态的,但我们的架构却是无状态的、可水平扩展的。我们缺少一个可靠的分布式协调机制。

最初的修补方案是在数据库中增加一个状态字段,比如 processing。一个 Worker 在开始处理任务前,先将任务记录的 statuspending 更新为 processing。这在低并发下似乎可行,但在真实负载下,竞态条件的问题立刻暴露无遗。两个 Pod 同时读取到 pending 状态,都尝试更新,最终导致其中一个的更新被覆盖,或者更糟,两者都认为自己成功获取了任务的所有权。

我们需要一个真正的分布式锁。社区里常见的方案是基于 Redis 的 SETNX 命令。但这又引出了新的问题:如果一个持有锁的 Worker 进程异常崩溃,没有机会释放锁,那么这个任务就会被永久锁定,造成系统停滞。虽然可以通过设置锁的过期时间来缓解,但过期时间的设置本身就是一个难题。设置太短,可能在任务执行期间锁就过期了;设置太长,又会延长故障恢复的时间。Redlock 算法虽然试图解决这些问题,但其复杂性和争议性让我们望而却步。

在真实项目中,我们需要的是一个更根本、更可靠的解决方案。这让我们将目光投向了 etcd。etcd 不仅仅是一个键值存储,它的核心是 Raft 一致性协议,是为分布式系统协调而生的。它的 Lease(租约)机制,为我们提供了解决这个问题的完美模型。一个客户端可以创建一个租约,并在这个租约之上附加一个或多个 Key。客户端必须在租约的 TTL(Time To Live)内定期“续约”(keep-alive),证明自己仍然存活。一旦客户端崩溃或网络中断,无法续约,租约就会自动过期,所有与之关联的 Key 也会被 etcd 自动删除。这意味着锁的释放是自动且可靠的,完美解决了 Worker 崩溃导致死锁的核心痛点。

我们的目标是:利用 etcd 的 Lease 机制,为 LangChain Agent 构建一个健壮的、支持故障转移的分布式任务执行引擎。

架构设计与核心组件

在深入代码之前,我们先明确一下系统的核心流程和组件。

  1. Task Queue: 一个任务的来源,为简化模型,我们这里不引入 Kafka 或 RabbitMQ,而是假设有一个可以拉取待处理任务的来源。
  2. Agent Worker (Node.js): 运行 LangChain Agent 的实际工作进程。我们会启动多个 Worker 实例来模拟分布式环境。
  3. etcd Cluster: 作为我们分布式协调的中枢。
  4. LeaseManager: 一个封装了 etcd Lease 创建、续约和撤销逻辑的模块。这是保证锁可靠性的核心。
  5. DistributedTaskRunner: 负责将 Agent 的执行与 LeaseManager 结合起来,确保在任何时刻,一个任务只被一个 Worker 执行。

整个流程如下:

sequenceDiagram
    participant Worker A
    participant Worker B
    participant etcd
    participant TaskSource

    loop Poll for tasks
        Worker A->>TaskSource: Get pending task (e.g., task-123)
        Worker B->>TaskSource: Get pending task (e.g., task-123)
    end

    par Try to acquire lock
        Worker A->>etcd: Create Lease (leaseA) & KeepAlive
        Worker A->>etcd: Try to PUT /tasks/locks/task-123 with leaseA
    and
        Worker B->>etcd: Create Lease (leaseB) & KeepAlive
        Worker B->>etcd: Try to PUT /tasks/locks/task-123 with leaseB
    end

    etcd-->>Worker A: PUT Success (Acquired Lock)
    etcd-->>Worker B: PUT Failed (Already locked)

    Worker A->>Worker A: Start executing LangChain Agent for task-123
    Worker B->>Worker B: Log "Task locked" and skip

    Note over Worker A: While executing, leaseA is kept alive in background.

    Worker A->>etcd: Update task status: /tasks/status/task-123 = {step: 1}
    Worker A->>etcd: Update task status: /tasks/status/task-123 = {step: 2}

    critical Simulating Worker A crash
        Worker A-xetcd: Stop sending KeepAlive for leaseA
    end

    Note over etcd: etcd detects leaseA expired
    etcd->>etcd: Delete /tasks/locks/task-123 automatically

    loop Poll for tasks
        Worker B->>TaskSource: Get pending task (task-123)
    end

    Worker B->>etcd: Try to PUT /tasks/locks/task-123 with leaseB
    etcd-->>Worker B: PUT Success (Acquired Lock)

    Worker B->>etcd: Read /tasks/status/task-123
    etcd-->>Worker B: Return {step: 2}

    Worker B->>Worker B: Resume LangChain Agent from step 3

这个时序图清晰地展示了锁的竞争、持有锁的 Worker 崩溃后锁的自动释放,以及新的 Worker 如何接管并从上次的状态恢复执行。

代码实现:构建 LeaseManager

我们首先来构建与 etcd 交互的核心 LeaseManager。这个模块必须是稳定和可复用的。我们将使用 etcd3 这个 Node.js 客户端库。

config.js

// config.js
// 生产环境中,这些配置应来自环境变量或配置中心
export const etcdConfig = {
  hosts: 'http://127.0.0.1:2379',
};

export const taskConfig = {
  // 租约的TTL,单位秒。
  // 需要在任务最长单步执行时间 和 故障恢复速度之间做权衡
  leaseTTL: 15, 
  // 任务锁在etcd中的前缀
  lockPrefix: '/tasks/locks/',
  // 任务状态在etcd中的前缀
  statusPrefix: '/tasks/status/',
};

leaseManager.js

// leaseManager.js
import { Etcd3, Lease } from 'etcd3';
import { etcdConfig, taskConfig } from './config.js';
import pino from 'pino';

const logger = pino({ level: 'info' });

/**
 * 负责管理etcd租约和分布式锁的获取与释放。
 * 每个需要获取锁的worker都应该实例化一个LeaseManager。
 */
export class LeaseManager {
  constructor(taskId, workerId) {
    if (!taskId || !workerId) {
      throw new Error('taskId and workerId are required');
    }
    this.client = new Etcd3(etcdConfig);
    this.taskId = taskId;
    this.workerId = workerId;
    this.lockKey = `${taskConfig.lockPrefix}${this.taskId}`;
    /** @type {Lease | null} */
    this.lease = null;
  }

  /**
   * 尝试获取指定任务的分布式锁。
   * @returns {Promise<boolean>} - 如果成功获取锁,返回 true,否则返回 false。
   */
  async acquireLock() {
    try {
      // 1. 创建一个租约,并立即开始续约(keep-alive)
      this.lease = this.client.lease(taskConfig.leaseTTL);
      // keep-alive的心跳会自动在后台运行。
      // 如果程序崩溃,心跳停止,租约会在TTL后过期。
      this.lease.on('lost', (err) => {
        logger.error({
            err,
            taskId: this.taskId,
            workerId: this.workerId,
          },
          'Lease lost unexpectedly. This worker must stop processing.'
        );
        // 在真实项目中,这里应该有一个紧急停止机制
        // 比如 process.exit(1),让容器编排系统重启它
      });
      
      const leaseId = await this.lease.grant();
      logger.info({ taskId: this.taskId, workerId: this.workerId, leaseId }, 'Lease granted');

      // 2. 尝试使用事务来原子性地创建锁
      // 事务的条件是:当且仅当 lockKey 不存在时 (version is 0)
      const txn = await this.client.if(this.lockKey, 'version', '==', 0)
        // Then: 创建这个 key,并附加我们的租约
        .put(this.lockKey)
        .value(JSON.stringify({ workerId: this.workerId, acquiredAt: new Date().toISOString() }))
        .lease(leaseId)
        // Else: 什么都不做
        .commit();

      if (txn.succeeded) {
        logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Lock acquired successfully');
        return true;
      } else {
        logger.warn({ taskId: this.taskId, workerId: this.workerId }, 'Failed to acquire lock, another worker holds it');
        // 获取锁失败,我们应该立即撤销刚刚创建的租约,避免资源浪费
        await this.revokeLease();
        return false;
      }
    } catch (error) {
      logger.error({ err: error, taskId: this.taskId, workerId: this.workerId }, 'Error acquiring lock');
      // 如果在获取过程中发生任何错误,也需要尝试清理租约
      if (this.lease) {
        await this.revokeLease().catch(err => logger.error({ err }, 'Failed to revoke lease during error handling'));
      }
      return false;
    }
  }

  /**
   * 主动释放锁(任务完成时调用)。
   * 这会撤销租约,etcd会自动删除所有关联的key。
   */
  async releaseLock() {
    if (this.lease) {
      logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Releasing lock...');
      await this.revokeLease();
      logger.info({ taskId: this.taskId, workerId: this.workerId }, 'Lock released');
    }
  }

  /**
   * 封装租约撤销逻辑,确保lease对象被清理。
   * @private
   */
  async revokeLease() {
    if (this.lease) {
      try {
        await this.lease.revoke();
        this.lease = null;
      } catch (error) {
        logger.error({ err: error, taskId: this.taskId, workerId: this.workerId }, 'Error revoking lease');
        // 即使撤销失败,也要将本地引用置空
        this.lease = null;
      }
    }
  }

  /**
   * 获取任务当前状态
   */
  async getTaskStatus() {
    const statusKey = `${taskConfig.statusPrefix}${this.taskId}`;
    const status = await this.client.get(statusKey).json();
    return status || { currentStep: 0, history: [] };
  }

  /**
   * 更新任务状态
   * @param {object} status - 新的状态对象
   */
  async updateTaskStatus(status) {
    const statusKey = `${taskConfig.statusPrefix}${this.taskId}`;
    await this.client.put(statusKey).value(JSON.stringify(status));
  }
}

这段代码是系统的基石。注意 acquireLock 方法中的事务用法,if(key, 'version', '==', 0) 是实现原子性“当Key不存在时创建”的关键。同时,错误处理和资源清理(revokeLease)也至关重要,确保了即使在获取锁的逻辑中途失败,也不会留下悬空的租约。

集成 LangChain Agent

现在我们将 LeaseManager 与一个模拟的 LangChain Agent 结合起来。这个 Agent 会执行一个多步骤的任务,并在每一步后更新其状态。

agent.js

// agent.js
import { taskConfig } from './config.js';
import { LeaseManager } from './leaseManager.js';
import pino from 'pino';

const logger = pino({ level: 'info' });

// 模拟一个 LangChain Agent 执行的函数
// 在真实世界中,这里会是 LangChain 的 RunnableSequence 或 AgentExecutor
async function runLangChainTask(taskId, currentStep) {
  logger.info({ taskId, currentStep }, `Starting LangChain execution from step ${currentStep + 1}`);

  // 模拟多步骤任务
  const steps = [
    { name: 'Data Ingestion', duration: 3000 },
    { name: 'LLM Analysis', duration: 5000 },
    { name: 'Report Generation', duration: 4000 },
    { name: 'Finalization', duration: 2000 },
  ];

  for (let i = currentStep; i < steps.length; i++) {
    const step = steps[i];
    logger.info({ taskId, step: step.name }, `Executing step ${i + 1}`);
    // 模拟异步操作
    await new Promise(resolve => setTimeout(resolve, step.duration));
    logger.info({ taskId, step: step.name }, `Step ${i + 1} completed`);
    
    // 返回下一步的索引
    yield i + 1;
  }
}

/**
 * 分布式任务执行器
 */
export class DistributedTaskRunner {
  constructor(taskId, workerId) {
    this.taskId = taskId;
    this.workerId = workerId;
    this.leaseManager = new LeaseManager(taskId, workerId);
    this.logger = logger.child({ taskId, workerId });
  }

  async run() {
    this.logger.info('Attempting to run task...');
    
    // 1. 尝试获取锁
    const lockAcquired = await this.leaseManager.acquireLock();
    if (!lockAcquired) {
      // 获取锁失败,直接退出
      this.logger.warn('Could not acquire lock, another worker is active. Exiting.');
      return;
    }

    try {
      this.logger.info('Lock acquired. Starting task processing.');

      // 2. 获取任务的最后已知状态
      let status = await this.leaseManager.getTaskStatus();
      this.logger.info({ status }, 'Retrieved initial task status.');

      // 3. 执行 Agent 任务
      const agentExecutor = runLangChainTask(this.taskId, status.currentStep);
      
      for await (const completedStepIndex of agentExecutor) {
        // 4. 每完成一步,就更新一次状态
        // 这是一个关键的容错点。如果在这之后、下一次循环前崩溃,
        // 新的 worker 会从这一步恢复。
        const newStatus = {
            ...status,
            currentStep: completedStepIndex,
            history: [...status.history, { step: completedStepIndex, completedAt: new Date().toISOString() }],
            lastUpdatedBy: this.workerId,
        };
        await this.leaseManager.updateTaskStatus(newStatus);
        this.logger.info({ currentStep: completedStepIndex }, 'Updated task status to etcd.');
      }

      this.logger.info('Task completed successfully.');
      // 可以在这里清理状态信息
      // await this.leaseManager.client.delete().key(`${taskConfig.statusPrefix}${this.taskId}`);
    } catch (error) {
      this.logger.error({ err: error }, 'An error occurred during task execution.');
      // 发生未知错误,但因为我们持有租约,所以锁依然是我们的。
      // 这里的策略可以是:
      // 1. 记录错误状态到 etcd
      // 2. 直接退出,让租约过期,由其他 worker 重试
      // 我们选择后者,因为租约机制保证了最终会释放锁。
    } finally {
      // 5. 任务完成或异常后,主动释放锁
      await this.leaseManager.releaseLock();
    }
  }
}

DistributedTaskRunner 是整个逻辑的核心。它首先尝试获取锁。成功后,它不是从头开始执行任务,而是先从 etcd 读取任务的 status,然后从 currentStep 继续。runLangChainTask 被实现为一个异步生成器 (async function*),这使得我们可以在每一步执行完成后,暂停执行,更新状态,然后再继续。这种模式在处理长周期、多步骤任务时非常有用。

模拟分布式环境

最后,我们创建一个主文件来模拟多个 Worker 同时尝试处理同一个任务。

main.js

// main.js
import { DistributedTaskRunner } from './agent.js';
import { randomUUID } from 'crypto';
import pino from 'pino';

const logger = pino({ level: 'info' });

async function main() {
  const taskId = 'complex-report-gen-123';
  const workerCount = 3;

  logger.info(`Simulating ${workerCount} workers competing for task: ${taskId}`);

  const promises = [];
  for (let i = 0; i < workerCount; i++) {
    const workerId = `worker-${randomUUID().slice(0, 8)}`;
    const runner = new DistributedTaskRunner(taskId, workerId);
    // 让每个worker的启动时间稍微错开,以模拟真实世界
    const delay = Math.random() * 1000;
    const p = new Promise(resolve => setTimeout(resolve, delay)).then(() => runner.run());
    promises.push(p);
  }

  await Promise.all(promises);
  logger.info('All worker simulations finished.');
  
  // 在真实应用中,进程会持续运行,监听新任务。
  // 这里我们为了演示目的,需要手动退出。
  // 注意:etcd3 客户端会保持连接,导致进程不退出。
  process.exit(0);
}


main().catch(err => {
  logger.error(err, 'Simulation failed');
  process.exit(1);
});

要运行这个模拟,首先确保你有一个本地运行的 etcd 实例 (例如通过 Docker: docker run -d -p 2379:2379 --name etcd-gcr-v3.5.0 gcr.io/etcd-development/etcd:v3.5.0 /usr/local/bin/etcd --advertise-client-urls http://0.0.0.0:2379 --listen-client-urls http://0.0.0.0:2379)。

然后在一个终端运行 node main.js。你会看到类似以下的日志输出:

{"level":30,"time":...,"pid":... ,"msg":"Simulating 3 workers competing for task: complex-report-gen-123"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-a6b7c8d9","msg":"Attempting to run task..."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","leaseId":"...","msg":"Lease granted"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Lock acquired successfully"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","leaseId":"...","msg":"Lease granted"}
{"level":40,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Failed to acquire lock, another worker holds it"}
{"level":40,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-e2d3c4a5","msg":"Could not acquire lock, another worker is active. Exiting."}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","workerId":"worker-f9a8b1c2","msg":"Lock acquired. Starting task processing."}
...
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","step":{"name":"Data Ingestion"},"msg":"Executing step 1"}
...
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","step":{"name":"Data Ingestion"},"msg":"Step 1 completed"}
{"level":30,"time":...,"pid":... ,"taskId":"complex-report-gen-123","currentStep":1,"msg":"Updated task status to etcd."}
...

日志清楚地显示,尽管三个 Worker 同时启动,但只有一个成功获取了锁并开始处理任务,其他两个则 gracefully 地退出了。你可以通过在任务执行中途(例如,在 Report Generation 步骤)手动 Ctrl+C 停止程序来模拟 Worker 崩溃。等待大约 15 秒(我们设置的 leaseTTL)后,再次运行 main.js,你会看到新的 Worker 会获取锁,并从崩溃前记录的最后一步继续执行,而不是从头开始。

局限性与未来展望

我们构建的这套机制解决了分布式环境下 LangChain Agent 的任务协调和幂等执行问题,但这并非银弹。

首先,此方案的核心是任务执行的互斥性,它并不适用于需要并行处理子任务的场景。它的定位是确保有状态、长周期的单个任务在分布式环境下的正确性和一致性。

其次,etcd 本身不适合存储大量数据。我们的设计明智地将任务的详细状态(可能包含大量文本或数据)与协调元数据(锁和当前步骤索引)分离开。在生产系统中,任务的完整上下文和历史记录应该存储在更合适的存储中,如 S3、MongoDB 或 PostgreSQL。etcd 只负责存储小体积、高频率读写的元数据。

租约的 TTL 是一个需要精细调整的参数。它必须大于任何单步任务可能的最长执行时间,否则可能在任务正常执行时因续约延迟而丢失锁。同时,它又直接决定了系统在节点故障后的最长恢复时间。设置一个合理的 TTL 并配合完善的监控告警是生产环境的必要条件。

未来的一个优化方向是,可以利用 etcd 的 Watch 机制来代替 Worker 轮询任务源。可以设计一个调度器,当新任务到来时,在 etcd 中创建一个表示任务的 Key。所有空闲的 Worker 都在 Watch 这个 Key 的前缀,一旦有新 Key 创建,它们就会被唤醒并尝试获取锁。这能构建一个更为高效、事件驱动的调度系统。


  目录