构建基于 ISR 保证消息必达的云原生 Server-Sent Events 推送网关


在设计需要服务端主动推送数据的系统时,我们面临一个核心的健壮性问题:当处理SSE(Server-Sent Events)连接的后端服务实例发生故障或重启时,如何确保在这短暂中断期间产生的消息不会永久丢失?一个简单的、在服务内存中直接处理SSE连接的方案,其可用性完全依赖于单个进程的生命周期,这在生产环境中是不可接受的。我们的目标是构建一个推送网关,它不仅要支持海量并发长连接,更核心的是要提供“至少一次”的消息投递保证。

这意味着,任何已经进入系统的消息,都必须有能力在客户端重连后被重新投递。这就排除了简单的、基于Redis Pub/Sub的“发布后不管”(fire-and-forget)模型,因为后者无法为离线的消费者保留消息。我们需要一个更可靠的架构。

架构决策:为何选择持久化消息队列而非轻量级广播

方案A:基于Redis Pub/Sub的广播模型

这个方案很常见。API网关将SSE请求路由到后端一组SSE服务实例上。当需要推送消息时,一个上游服务将消息发布到Redis的某个Channel,所有订阅了该Channel的SSE服务实例都会收到消息,然后遍历各自持有的客户端连接并推送。

  • 优势: 实现简单,延迟极低。对于状态更新不那么关键的场景,例如在线用户数广播,这是一个高效的选择。
  • 劣势: 致命的缺陷在于缺乏持久性。如果一个SSE服务实例在消息发布时恰好宕机,那么连接到该实例的所有客户端将永久丢失这条消息。Redis Pub/Sub不为离线订阅者缓存消息。这直接违背了我们“消息必达”的核心原则。

方案B:基于Kafka与ISR的持久化消费模型

这个方案将系统核心从一个脆弱的广播模型转变为一个健壮的流处理模型。所有待推送的消息首先被发送到一个高可用的Kafka Topic中。SSE服务实例不再是简单的广播接收者,而是作为消费者组(Consumer Group)的成员,从Topic中拉取消息。

  • 优势:

    1. 消息持久化与回溯: Kafka将消息持久化到磁盘,并可在多个Broker间复制。即使所有SSE服务实例都宕机,消息依然安全地存储在Kafka中。服务恢复后可以从上次消费的偏移量(Offset)继续处理。
    2. 投递保证: 通过设置Kafka生产者的acks=all以及Topic的min.insync.replicas(ISR)参数,我们可以确保一条消息只有在成功写入到指定数量的同步副本后,才被认为是“已提交”。这是实现“至少一次”投递保证的基石。
    3. 水平扩展与解耦: SSE服务实例可以作为无状态节点进行任意水平扩展。上游的消息生产者与下游的SSE消费者完全解耦,两者都可以独立演进和扩缩容。
  • 劣势:

    1. 架构复杂度增加: 引入了Kafka集群,运维成本和技术栈复杂度都更高。
    2. 延迟略高: 消息需要经过生产者、Kafka Broker、消费者这一完整链路,相比直接内存或Redis广播,端到端延迟会略有增加。

最终选择: 考虑到业务场景对消息可靠性的严苛要求,我们选择方案B。短暂的延迟增加是为了换取系统级的可靠性和数据一致性,这个权衡是值得的。

核心实现概览

我们的整体架构如下所示,它由API网关、SSE服务集群、Kafka集群以及用于可观测性的SkyWalking组成,全部通过Terraform进行声明式部署。

graph TD
    subgraph "客户端"
        Client[用户浏览器/App]
    end

    subgraph "基础设施 (Terraform管理)"
        subgraph "AWS VPC"
            subgraph "Public Subnet"
                ALB[Application Load Balancer]
            end
            subgraph "Private Subnet"
                APIGW[API Gateway - Spring Cloud Gateway] --> SSEService
                SSEService[SSE Service Cluster - EC2 ASG] -- Consume --> Kafka
                ProducerService[上游业务服务] -- Produce --> Kafka
                Kafka[Managed Kafka Cluster - MSK]
            end
        end
    end

    subgraph "可观测性"
        SkyWalking[SkyWalking OAP & UI]
        APIGW -- Tracing --> SkyWalking
        SSEService -- Tracing --> SkyWalking
        ProducerService -- Tracing --> SkyWalking
    end

    Client -- HTTPS --> ALB
    ALB -- HTTP --> APIGW

SSE服务:连接管理与Kafka消费

SSE服务的核心职责是维护客户端连接,并作为Kafka消费者将消息准确推送给对应的客户端。我们使用Java和Spring Boot来实现。

1. 连接管理

我们需要一个并发安全的数据结构来存储用户ID与SseEmitter实例的映射关系。SseEmitter是Spring Web对SSE连接的抽象。

// src/main/java/com/example/sse/gateway/SseConnectionManager.java
package com.example.sse.gateway;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class SseConnectionManager {

    private static final Logger logger = LoggerFactory.getLogger(SseConnectionManager.class);
    // 使用ConcurrentHashMap来保证线程安全
    private final Map<String, SseEmitter> connections = new ConcurrentHashMap<>();

    /**
     * 添加一个新的SSE连接
     * @param userId 用户唯一标识
     * @param emitter SseEmitter实例
     */
    public void add(String userId, SseEmitter emitter) {
        // 在真实项目中, 这里可能需要处理同一用户多端登录的场景,
        // 例如, 允许多个emitter存在, 或踢掉旧的连接。此处简化为只保留最新的。
        if (connections.containsKey(userId)) {
            // 先完成并移除旧的连接, 避免资源泄露
            SseEmitter oldEmitter = connections.remove(userId);
            if (oldEmitter != null) {
                oldEmitter.complete();
                logger.info("Removed existing SSE connection for user: {}", userId);
            }
        }
        this.connections.put(userId, emitter);
        logger.info("Added new SSE connection for user: {}. Total connections: {}", userId, connections.size());

        // 设置超时回调, 超时通常意味着客户端断开连接
        emitter.onTimeout(() -> {
            logger.warn("SSE connection timed out for user: {}", userId);
            remove(userId, emitter);
        });

        // 设置完成回调, 无论是服务端主动完成还是客户端关闭导致
        emitter.onCompletion(() -> {
            logger.info("SSE connection completed for user: {}", userId);
            remove(userId, emitter);
        });

        // 设置错误回调
        emitter.onError(throwable -> {
            logger.error("SSE connection error for user: {}: {}", userId, throwable.getMessage());
            remove(userId, emitter);
        });
    }

    /**
     * 移除一个SSE连接
     * @param userId 用户ID
     * @param emitter 待移除的emitter实例, 确保不会误删新连接
     */
    public void remove(String userId, SseEmitter emitter) {
        // 使用computeIfPresent确保原子性操作, 避免在并发场景下移除错误的emitter
        connections.computeIfPresent(userId, (key, existingEmitter) -> {
            if (existingEmitter == emitter) {
                logger.info("Removing SSE connection for user: {}. Total connections: {}", userId, connections.size() - 1);
                return null; // 返回null会从map中移除该键值对
            }
            return existingEmitter; // 不是同一个实例, 不移除
        });
    }

    /**
     * 向指定用户发送消息
     * @param userId 用户ID
     * @param message 消息内容
     */
    public void sendMessage(String userId, String message) {
        SseEmitter emitter = connections.get(userId);
        if (emitter == null) {
            // 这是一个常见情况: 消息到达时, 用户可能已经离线。
            // 由于我们使用了Kafka, 这条消息的offset会被正常提交, 不会丢失。
            // 用户下次上线时, 如果有需要, 可以通过其他API拉取历史消息。
            logger.warn("No active SSE connection found for user: {}", userId);
            return;
        }

        try {
            // 构建SSE事件格式
            SseEmitter.SseEventBuilder event = SseEmitter.event()
                    .data(message)
                    .id(String.valueOf(System.currentTimeMillis())) // 事件ID
                    .name("message"); // 事件类型
            emitter.send(event);
            logger.debug("Sent message to user: {}", userId);
        } catch (IOException e) {
            // 发送失败, 很可能是客户端已经断开连接。
            // 移除这个无效的连接。
            logger.error("Failed to send message to user: {}. Removing connection.", userId, e);
            remove(userId, emitter);
        }
    }
}

这个管理器的设计考虑了并发安全和连接生命周期管理。一个常见的坑是,在移除连接时没有进行实例比较,可能导致新建立的连接被旧连接的清理回调错误地移除。

2. Kafka消费者

我们使用Spring Kafka来消费消息。每个消息都应该包含目标userId和消息内容。

// src/main/java/com/example/sse/gateway/KafkaMessageConsumer.java
package com.example.sse.gateway;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

@Service
public class KafkaMessageConsumer {

    private static final Logger logger = LoggerFactory.getLogger(KafkaMessageConsumer.class);

    private final SseConnectionManager connectionManager;
    private final ObjectMapper objectMapper;

    // 依赖注入连接管理器
    public KafkaMessageConsumer(SseConnectionManager connectionManager, ObjectMapper objectMapper) {
        this.connectionManager = connectionManager;
        this.objectMapper = objectMapper;
    }

    // 监听名为 "realtime-events" 的topic, 属于 "sse-gateway-group" 消费者组
    // containerFactory 指向我们在配置中定义的并发消费者工厂
    @KafkaListener(topics = "realtime-events", groupId = "sse-gateway-group", containerFactory = "kafkaListenerContainerFactory")
    public void listen(String message, Acknowledgment ack) {
        try {
            logger.info("Received message from Kafka: {}", message);
            // 在真实项目中, 消息格式应为结构化数据, 如JSON
            PushMessage pushMessage = objectMapper.readValue(message, PushMessage.class);
            
            // 核心逻辑: 将消息转发给连接管理器
            connectionManager.sendMessage(pushMessage.getUserId(), pushMessage.getContent());
            
            // 手动提交offset, 确保消息被成功处理后再确认
            // 这是实现"至少一次"语义的关键
            ack.acknowledge();
            logger.debug("Acknowledged kafka message for user: {}", pushMessage.getUserId());
        } catch (Exception e) {
            // 如果JSON解析失败或处理过程中出现任何异常, 我们不提交offset。
            // 这会导致Kafka在稍后重新投递这条消息。
            // 这里需要配置重试策略和死信队列(DLQ), 避免无限次重试坏消息。
            logger.error("Error processing kafka message. Will not acknowledge.", e);
        }
    }

    // 定义消息的DTO
    public static class PushMessage {
        private String userId;
        private String content;
        // getters and setters
        public String getUserId() { return userId; }
        public void setUserId(String userId) { this.userId = userId; }
        public String getContent() { return content; }
        public void setContent(String content) { this.content = content; }
    }
}

Kafka消费者配置是保证可靠性的核心。

# application.yml
spring:
  kafka:
    bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
    consumer:
      group-id: sse-gateway-group
      auto-offset-reset: earliest # 从最早的offset开始消费, 以免服务重启期间漏掉消息
      enable-auto-commit: false # 关闭自动提交! 这是必须的, 我们要手动控制提交流程
    listener:
      # 手动提交offset需要设置ack-mode
      ack-mode: MANUAL_IMMEDIATE

我们关闭了自动提交,转而使用MANUAL_IMMEDIATE模式。只有在connectionManager.sendMessage成功执行后,我们才调用ack.acknowledge()。如果期间发生任何异常,消息不会被确认,Kafka会在下一次轮询时重新投递它。

API网关:路由与可观测性

我们使用Spring Cloud Gateway作为API网关。它的主要职责是:

  1. 对外部请求进行鉴权和身份识别。
  2. /events/{userId}这样的SSE请求路由到后端的SSE服务集群。
  3. 集成SkyWalking Agent,使所有经过网关的请求都能被追踪。

一个简单的路由配置如下:

# Spring Cloud Gateway configuration
spring:
  cloud:
    gateway:
      routes:
        - id: sse_events_route
          # 路由到SSE服务集群, lb:// 表示使用服务发现和负载均衡
          uri: lb://sse-service
          predicates:
            # 匹配所有路径为 /api/v1/events/ 开头的请求
            - Path=/api/v1/events/**
          filters:
            # 可以在这里添加自定义的认证Filter
            # - CustomAuthFilter
            # 重写路径, 去掉前缀, 以便SSE服务直接处理
            - RewritePath=/api/v1/(?<segment>.*), /$\{segment}

可观测性:使用SkyWalking进行全链路追踪

SkyWalking的集成是无侵入的。我们只需要在API网关和SSE服务的启动脚本中,通过-javaagent参数挂载SkyWalking的探针即可。

# Dockerfile or startup script
java -javaagent:/path/to/skywalking-agent/skywalking-agent.jar \
     -Dskywalking.agent.service_name=sse-gateway-service \
     -Dskywalking.collector.backend_service=skywalking-oap.skywalking:11800 \
     -jar /app/sse-gateway.jar

集成后,SkyWalking会自动完成:

  1. 服务拓扑发现: 自动绘制出从API网关到SSE服务,再到Kafka的依赖关系图。
  2. 链路追踪: 当一个发布消息的请求进入系统,我们可以看到完整的调用链:ProducerService -> Kafka -> SSEServiceSSEService消费消息并推送给客户端的内部过程也会被记录。
  3. 性能指标监控: 自动收集各服务的JVM指标、吞吐量、延迟等,并监控Kafka Topic的消费延迟(lag)。当消费延迟持续升高时,可以触发告警,表明SSE服务处理能力不足。

这条清晰的链路对于排查问题至关重要。例如,如果用户反馈消息延迟,我们可以通过SkyWalking的trace精确地定位到是Kafka积压了,还是SSE服务内部处理耗时过长。

基础设施即代码:使用Terraform部署高可用架构

手动部署和管理一套包含VPC、负载均衡器、Auto Scaling Group和Managed Kafka的架构是极其繁琐且容易出错的。Terraform让我们能够用代码来定义、部署和管理这一切。

以下是核心资源的Terraform HCL代码片段,用于阐述设计思路。

# main.tf

# ---------------------------------
# Network Foundation
# ---------------------------------
resource "aws_vpc" "main" {
  cidr_block = "10.0.0.0/16"
}
# ... (Subnets, Route Tables, Internet Gateway, etc.)

# ---------------------------------
# Managed Kafka (MSK) Cluster
# ---------------------------------
resource "aws_msk_cluster" "main" {
  cluster_name           = "realtime-events-cluster"
  kafka_version          = "2.8.1"
  number_of_broker_nodes = 3 # 跨3个可用区部署, 保证高可用

  broker_node_group_info {
    instance_type  = "kafka.m5.large"
    client_subnets = [aws_subnet.private_a.id, aws_subnet.private_b.id, aws_subnet.private_c.id]
    storage_info {
      ebs_storage_info {
        volume_size = 100 # GiB
      }
    }
  }

  # 启用自动的主题创建, 或者通过Kafka Admin Client手动创建
}

# Kafka Topic定义, 关键在于replication_factor和min_insync_replicas
# 这通常在应用启动时通过AdminClient API创建, 或通过Kafka工具创建
# topic_config = {
#   "replication.factor" = "3"
#   "min.insync.replicas" = "2" # 核心配置! 确保数据至少写入2个副本才算成功
# }

# ---------------------------------
# SSE Service - Auto Scaling Group
# ---------------------------------
resource "aws_launch_template" "sse_service" {
  name_prefix   = "sse-service-"
  image_id      = "ami-xxxxxxxxxxxx" # 包含应用和SkyWalking agent的AMI
  instance_type = "t3.medium"
  
  # user_data用于在实例启动时配置服务
  user_data = base64encode(<<-EOF
    #!/bin/bash
    export KAFKA_BOOTSTRAP_SERVERS=${aws_msk_cluster.main.bootstrap_brokers_tls}
    # ... 其他环境变量
    java -javaagent:/opt/skywalking/agent/skywalking-agent.jar -jar /app/sse-gateway.jar
  EOF
  )
}

resource "aws_autoscaling_group" "sse_service" {
  name                = "sse-service-asg"
  desired_capacity    = 2
  max_size            = 10
  min_size            = 2 # 至少2个实例, 避免单点故障
  vpc_zone_identifier = [aws_subnet.private_a.id, aws_subnet.private_b.id]

  launch_template {
    id      = aws_launch_template.sse_service.id
    version = "$Latest"
  }
}

# ... (Load Balancer, Target Group, Security Groups, IAM Roles)

这段Terraform代码的核心思想是:

  • 高可用Kafka: MSK集群横跨3个AZ,replication.factor=3min.insync.replicas=2的Topic配置意味着即使一个AZ或Broker完全失效,数据依然安全,且写入操作在确认前必须得到至少两个副本的确认。这就是ISR机制在基础设施层面的体现。
  • 弹性的SSE服务: 使用Auto Scaling Group确保SSE服务始终有至少两个实例在运行。可以配置基于CPU利用率或活动连接数的伸缩策略,自动应对流量高峰。
  • 配置解耦: Kafka的连接地址等配置通过环境变量注入,而不是硬编码在代码或镜像中。

当前方案的局限性与未来迭代路径

尽管此架构提供了强大的消息投递保证,但它并非没有权衡。首先,引入Kafka显著增加了系统的端到端延迟,对于需要亚百毫秒级实时性的场景(如高频交易),可能需要评估其他方案。

其次,对于消费者端的错误处理,目前的方案是在异常发生时不提交offset,等待Kafka重投。如果某条消息本身是“有毒”的(例如,格式错误导致持续的解析异常),它会阻塞分区,反复投递。在生产环境中,必须配合一个健壮的死信队列(DLQ)机制,在重试几次后将问题消息转移到DLQ,以便后续进行人工分析和处理,同时保证正常消息的消费流程不受影响。

未来的一个优化方向是针对客户端重连的场景。当客户端因网络抖动短暂断开并立即重连时,SSE服务可以考虑在内存中缓存最近的少量消息。这样,新连接建立后,可以先从内存缓存中推送,而不是让客户端完全等待下一条来自Kafka的新消息,从而改善用户体验。


  目录