在Vercel无服务器函数中管理Cassandra持久连接以构建实时遥测管道


我们面临一个棘手的架构挑战:构建一个遥测数据管道,要求前端具备实时展示能力,后端写入吞吐量峰值需达到每秒数千次事件,且数据存储必须保证高可用和水平扩展性。技术栈选型已经确定:前端使用Vite构建,因为它提供了极致的开发体验和性能;API层采用Vercel Functions,以利用其弹性伸缩和全球边缘网络的优势;数据持久化则选择了Apache Cassandra,因为它在处理大规模时序数据写入方面无出其右。

问题立刻浮现:Vercel Functions是典型的无服务器(Serverless)计算模型,其核心特征是无状态和短生命周期。而Cassandra作为一种分布式数据库,其客户端驱动(Driver)为了性能最大化,被设计为维护一个到集群的持久化、有状态的TCP连接池。在一个函数调用可能只有几百毫秒生命周期的环境中,为每次调用都执行“连接 -> 认证 -> 操作 -> 关闭”的完整流程,其开销将是毁灭性的,不仅会耗尽数据库的连接资源,更会让函数性能雪上加霜。

一个天真的实现可能长这样:

// api/ingest-naive.ts
// 警告:这是绝对错误的做法,切勿在生产中使用!
import { Client } from 'cassandra-driver';
import type { VercelRequest, VercelResponse } from '@vercel/node';

export default async function handler(req: VercelRequest, res: VercelResponse) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  const client = new Client({
    contactPoints: [process.env.CASSANDRA_HOST!],
    localDataCenter: process.env.CASSANDRA_DATACENTER!,
    credentials: {
      username: process.env.CASSANDRA_USER!,
      password: process.env.CASSANDRA_PASSWORD!,
    },
    keyspace: 'telemetry_space',
  });

  try {
    await client.connect(); // 每次调用都建立新连接
    const query = 'INSERT INTO logs (id, timestamp, level, message) VALUES (uuid(), toTimestamp(now()), ?, ?)';
    
    // 假设body是一个日志事件数组
    const events = req.body.events || [];
    for (const event of events) {
        await client.execute(query, [event.level, event.message], { prepare: true });
    }
    
    res.status(202).json({ status: 'accepted' });
  } catch (error) {
    console.error('Cassandra operation failed:', error);
    res.status(500).json({ error: 'Internal Server Error' });
  } finally {
    await client.shutdown(); // 每次调用都关闭连接
  }
}

在低负载下,上述代码或许能工作。但在真实项目中,Vercel平台为了处理并发请求,会启动多个函数实例。每个实例的每次调用都重复创建和销毁Client实例,这不仅是性能瓶颈,在高并发下几乎可以肯定会导致数据库连接风暴或函数因超时而失败。这里的坑在于,我们必须找到一种方法,在Vercel的无状态执行环境中“注入”状态,复用昂贵的数据库连接资源。

利用Node.js模块缓存实现连接复用

Vercel(以及其他Node.js Serverless平台)的执行环境有一个关键特性:虽然函数实例的生命周期是短暂的,但单个容器实例(Execution Context)在短时间内可能会被复用来处理多个连续的请求。在两次调用之间,Node.js的模块缓存 (require.cache) 会被保留。这意味着,在模块顶层作用域声明的变量可以跨越同个容器实例的多次函数调用而存在。

这就是破局点。我们可以创建一个专门的模块来管理Cassandra客户端实例,并确保它只被初始化一次。

// lib/cassandra-client.ts
import { Client, auth, types } from 'cassandra-driver';
import type { ClientOptions } from 'cassandra-driver';

// 在模块顶层作用域声明一个变量,用于缓存客户端实例
let client: Client | null = null;

const clientOptions: ClientOptions = {
  contactPoints: [process.env.CASSANDRA_HOST!],
  localDataCenter: process.env.CASSANDRA_DATACENTER!,
  credentials: {
    username: process.env.CASSANDRA_USER!,
    password: process.env.CASSANDRA_PASSWORD!,
  },
  keyspace: 'telemetry_space',
  protocolOptions: {
    port: 9042, // 显式指定端口
  },
  pooling: {
    // 针对Serverless环境优化连接池
    // 核心思想是保持少量长连接,因为并发由函数实例数量控制
    coreConnectionsPerHost: {
      [types.distance.local]: 1, // 本地数据中心每个主机保持1个核心连接
      [types.distance.remote]: 1,
    },
    maxRequestsPerConnection: 32768, // 默认值,对于现代Cassandra驱动已经足够
  },
  queryOptions: {
    consistency: types.consistencies.localQuorum, // 写入使用LOCAL_QUORUM保证强一致性
    prepare: true, // 默认开启预编译语句,性能关键
  },
  socketOptions: {
      connectTimeout: 5000, // 5秒连接超时
  },
};

/**
 * 获取Cassandra客户端单例。
 * 如果实例不存在,则创建并连接。
 * 如果实例已存在,则直接返回。
 * 包含简单的健康检查逻辑。
 */
async function getClient(): Promise<Client> {
  if (client && !client.isShuttingDown) {
    // 执行一个轻量级查询来检查连接是否仍然有效
    try {
      await client.execute('SELECT release_version FROM system.local');
      return client;
    } catch (e) {
      console.warn('Existing Cassandra client connection is unhealthy. Reconnecting.', e);
      // 如果连接不健康,尝试关闭并重新创建
      await client.shutdown().catch(err => console.error('Error during shutdown of unhealthy client:', err));
      client = null; // 清除旧实例
    }
  }

  console.log('No active Cassandra client found. Creating a new one...');
  const newClient = new Client(clientOptions);
  
  try {
    await newClient.connect();
    console.log('Successfully connected to Cassandra cluster.');
    client = newClient; // 只有在成功连接后才赋值给全局变量

    // 优雅地处理进程退出信号,确保连接关闭
    // 这在Serverless环境中可能不会被触发,但作为最佳实践保留
    process.on('SIGTERM', () => {
      console.log('SIGTERM received. Shutting down Cassandra client.');
      client?.shutdown();
    });
    process.on('SIGINT', () => {
      console.log('SIGINT received. Shutting down Cassandra client.');
      client?.shutdown();
    });

    return client;
  } catch (error) {
    console.error('Failed to connect to Cassandra:', error);
    // 如果连接失败,确保client实例不会被设置,以便下次调用可以重试
    throw new Error('Could not establish connection with Cassandra cluster.');
  }
}

export const cassandraClient = {
  getInstance: getClient,
};

这个 cassandra-client.ts 模块是整个方案的核心。它在模块作用域中维护了一个 client 变量。getClient 函数实现了单例模式:

  1. 首次调用: clientnull,它会创建一个新的 Client 实例,调用 connect(),成功后将实例赋给 client 并返回。
  2. 后续调用 (在同一执行环境中): client 不为 null,函数会执行一个快速的健康检查查询。如果健康,直接返回现有实例,避免了昂贵的连接过程。如果不健康,它会尝试安全地关闭旧连接并重新建立。

重构数据写入API

现在,我们可以用这个连接管理器来重构我们的写入API。

// api/ingest.ts
import { cassandraClient } from '../lib/cassandra-client';
import type { VercelRequest, VercelResponse } from '@vercel/node';

// Cassandra表结构 (CQL)
/*
CREATE KEYSPACE IF NOT EXISTS telemetry_space WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};

CREATE TABLE IF NOT EXISTS telemetry_space.logs (
    day text, // 分区键,按天分区避免热点
    id timeuuid,
    timestamp timestamp,
    level text,
    message text,
    source text,
    metadata map<text, text>,
    PRIMARY KEY (day, id)
) WITH CLUSTERING ORDER BY (id DESC);
*/

interface LogEvent {
  level: 'info' | 'warn' | 'error';
  message: string;
  source: string;
  metadata?: Record<string, string>;
}

export default async function handler(req: VercelRequest, res: VercelResponse) {
  if (req.method !== 'POST') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  const events: LogEvent[] = req.body.events;
  if (!Array.isArray(events) || events.length === 0) {
    return res.status(400).json({ error: 'Invalid payload. "events" array is required.' });
  }

  try {
    const client = await cassandraClient.getInstance();
    const query = `
      INSERT INTO logs (day, id, timestamp, level, message, source, metadata) 
      VALUES (?, timeuuidNow(), ?, ?, ?, ?, ?)
    `;
    
    // 使用Batch语句进行批量插入,这是Cassandra写入性能的关键
    const queries = events.map(event => {
      const today = new Date().toISOString().split('T')[0]; // e.g., "2023-10-27"
      return {
        query,
        params: [
          today,
          new Date(),
          event.level,
          event.message,
          event.source || 'unknown',
          event.metadata || {},
        ],
      };
    });

    // UNLOGGED BATCH 保证原子性在分区级别,对于这种时间序列数据足够了
    // 它比 LOGGED BATCH 的性能开销小得多
    await client.batch(queries, { prepare: true });

    return res.status(202).json({ status: 'accepted', count: events.length });

  } catch (error: any) {
    // 增强错误日志
    console.error({
      message: 'Failed to process telemetry batch',
      errorMessage: error.message,
      stack: error.stack,
      requestBody: req.body,
    });
    return res.status(500).json({ error: 'Internal Server Error' });
  }
}

这个重构后的版本有几个关键改进:

  1. 连接复用: 通过 cassandraClient.getInstance() 获取客户端,解决了性能瓶颈。
  2. 批量写入: 客户端发送一个包含多个事件的数组,后端使用 Cassandra 的 batch 功能一次性写入。这极大地减少了网络往返次数和数据库的协调开销,是实现高吞吐量的核心。
  3. 分区策略: CQL Schema 中使用了 day 作为分区键。这意味着每天的数据会写到不同的分区中,避免了单个分区无限增长导致的“热点”问题。查询时也必须带上分区键。
  4. 健壮性: 增加了更详细的错误处理和日志记录。

构建查询API和Vite前端

有了数据写入,我们还需要一个API来查询数据,以及一个前端来展示它。

// api/query.ts
import { cassandraClient } from '../lib/cassandra-client';
import type { VercelRequest, VercelResponse } from '@vercel/node';

export default async function handler(req: VercelRequest, res: VercelResponse) {
  if (req.method !== 'GET') {
    return res.status(405).json({ error: 'Method Not Allowed' });
  }

  // 从查询参数获取日期,这是我们的分区键
  const day = req.query.day as string;
  if (!day || !/^\d{4}-\d{2}-\d{2}$/.test(day)) {
    return res.status(400).json({ error: 'A valid "day" parameter is required (YYYY-MM-DD).' });
  }

  const limit = parseInt(req.query.limit as string) || 100;

  try {
    const client = await cassandraClient.getInstance();
    // 查询必须包含分区键 `day`
    const query = 'SELECT id, timestamp, level, message, source, metadata FROM logs WHERE day = ? LIMIT ?';
    const result = await client.execute(query, [day, limit], { prepare: true });

    // 格式化输出
    const logs = result.rows.map(row => ({
      id: row.id.toString(),
      timestamp: row.timestamp,
      level: row.level,
      message: row.message,
      source: row.source,
      metadata: row.metadata,
    }));
    
    return res.status(200).json({ logs });
  } catch (error: any) {
    console.error('Failed to query logs:', error);
    return res.status(500).json({ error: 'Internal Server Error' });
  }
}

前端部分,我们用Vite创建一个简单的React应用,定期轮询查询API来获取最新日志。

// src/App.tsx
import React, { useState, useEffect } from 'react';

interface LogEntry {
  id: string;
  timestamp: string;
  level: 'info' | 'warn' | 'error';
  message: string;
  source: string;
}

function App() {
  const [logs, setLogs] = useState<LogEntry[]>([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState<string | null>(null);

  const fetchLogs = async () => {
    setIsLoading(true);
    setError(null);
    try {
      const today = new Date().toISOString().split('T')[0];
      const response = await fetch(`/api/query?day=${today}&limit=50`);
      if (!response.ok) {
        throw new Error(`API request failed with status ${response.status}`);
      }
      const data = await response.json();
      setLogs(data.logs);
    } catch (err: any) {
      setError(err.message);
    } finally {
      setIsLoading(false);
    }
  };

  useEffect(() => {
    fetchLogs();
    const intervalId = setInterval(fetchLogs, 5000); // 每5秒轮询一次

    return () => clearInterval(intervalId);
  }, []);

  return (
    <div style={{ fontFamily: 'monospace', padding: '20px' }}>
      <h1>Real-time Telemetry</h1>
      {isLoading && <p>Loading...</p>}
      {error && <p style={{ color: 'red' }}>Error: {error}</p>}
      <div style={{ background: '#222', color: '#eee', padding: '10px', height: '80vh', overflowY: 'scroll' }}>
        {logs.map((log) => (
          <div key={log.id}>
            <span>{new Date(log.timestamp).toISOString()}</span>
            <span style={{ color: log.level === 'error' ? 'red' : 'cyan', margin: '0 10px' }}>[{log.level.toUpperCase()}]</span>
            <span>({log.source})</span>
            <span style={{ marginLeft: '10px' }}>{log.message}</span>
          </div>
        ))}
      </div>
    </div>
  );
}

export default App;

单元测试这个架构时,关键在于模拟 cassandra-client。可以使用 jest.mock 来替换 cassandraClient.getInstance 的实现,让它返回一个伪造的客户端对象,从而可以在不实际连接数据库的情况下测试API的业务逻辑、输入验证和错误处理。

架构图与数据流

整个系统的流程可以用下图清晰地表示:

sequenceDiagram
    participant User as Vite Frontend
    participant Vercel as Vercel Edge/CDN
    participant IngestFn as Vercel Function (api/ingest)
    participant QueryFn as Vercel Function (api/query)
    participant Cassandra as Cassandra Cluster
    
    User->>Vercel: POST /api/ingest (Batch of logs)
    Vercel->>IngestFn: Invoke function
    IngestFn->>Cassandra: cassandraClient.getInstance()
    Note right of IngestFn: Reuses existing connection if available
    IngestFn->>Cassandra: client.batch(INSERT ... )
    Cassandra-->>IngestFn: Batch success
    IngestFn-->>Vercel: 202 Accepted
    Vercel-->>User: 202 Accepted
    
    loop Every 5 seconds
        User->>Vercel: GET /api/query?day=...
        Vercel->>QueryFn: Invoke function
        QueryFn->>Cassandra: cassandraClient.getInstance()
        QueryFn->>Cassandra: client.execute(SELECT ... )
        Cassandra-->>QueryFn: Return log rows
        QueryFn-->>Vercel: 200 OK (JSON logs)
        Vercel-->>User: 200 OK (JSON logs)
        User->>User: Update UI
    end

局限性与未来展望

这个方案优雅地解决了在Serverless环境中与有状态数据库交互的核心痛点,但在生产环境中,它并非没有局限性。

首先,Vercel Functions(特别是在Hobby/Pro套餐)有最大执行时长的限制(通常是10-60秒)。如果一次写入的批次过大,或者Cassandra集群响应缓慢,函数可能会超时。因此,客户端的批次大小需要 carefully tuned。

其次,虽然我们复用了连接,但冷启动问题依然存在。一个全新的Vercel函数实例第一次被调用时,cassandraClient 还是需要执行完整的连接和认证流程,这会增加该请求的延迟。对于延迟极度敏感的应用,可能需要配合Vercel的预留并发功能来保持一部分函数实例“温热”。

最后,这个架构的查询能力受限于Cassandra本身。它非常适合基于分区键和聚类键的快速查找,但对于复杂的分析查询(Ad-hoc queries)则力不从心。如果未来需要更灵活的查询能力,可能需要引入一个二级索引系统,例如将数据双写到Elasticsearch,或者使用Presto/Trino等查询引擎对Cassandra中的数据进行分析。这种架构的演进路径是清晰的,但当前的设计已经为高性能、高可用的遥测数据摄取奠定了坚实的基础。


  目录