使用 ASP.NET Core 和 Clean Architecture 构建写入 Apache Hudi 的 CDC 数据管道并部署于 Nomad


我们面临一个典型的数据工程难题:核心业务的 PostgreSQL 数据库承载了大量的在线事务处理(OLTP),但分析团队需要近乎实时地查询这些数据。直接在生产库上跑复杂的分析查询是不可接受的,这会严重影响核心业务性能。传统的夜间批量 ETL 任务延迟太高,无法满足业务对数据新鲜度的要求。我们需要一个方案,将变更数据实时捕获并导入到一个支持快速分析查询的数据湖中,同时,这个方案必须能融入我们现有的以 .NET 为主的技术栈,并且部署在我们统一的 Nomad 集群上。

初步构想与技术选型决策

初步构想是建立一个基于变更数据捕获(CDC)的实时数据管道。数据流向大致如下:
PostgreSQL (Source) -> Debezium -> Kafka -> .NET Ingestion Service -> Apache Hudi (Data Lake)

这个架构中的每一个组件选型都经过了权衡:

  1. Debezium + Kafka: 这是 CDC 领域的黄金组合。Debezium 通过读取 PostgreSQL 的逻辑复制槽(WAL),能以极低的开销捕获行级别的 INSERT, UPDATE, DELETE 操作,并将这些变更事件以结构化的 JSON 格式推送到 Kafka topic 中。Kafka 提供了可靠的、可扩展的事件总线。

  2. Apache Hudi: 我们选择 Hudi 作为数据湖的存储格式,而非简单的 Parquet 或 ORC。核心原因是 Hudi 提供了数据湖上的 ACID 事务、记录级别的更新(Upsert)和删除。这对于处理来自 OLTP 数据库的 CDC 数据至关重要,因为我们需要在数据湖中精确地复现源数据库的状态变更。我们选择了写时合并(Copy on Write)表类型,以优化读取性能。

  3. ASP.NET Core Worker Service: 这是整个方案中最具“非主流”色彩的一环。通常这类数据管道会使用 Spark Streaming 或 Flink。但我们的团队主要是 .NET 开发者,引入一个庞大的 JVM 技术栈会带来高昂的学习和维护成本。使用 .NET Core Worker Service 作为消费者,可以让我们在熟悉的开发环境中,利用 C# 的强类型、现代化的异步编程模型以及丰富的库生态来构建一个轻量级、高性能的摄取服务。

  4. Clean Architecture: 对于一个需要长期运行和维护的数据管道服务,一个清晰的架构至关重要。Clean Architecture 强制将业务逻辑(数据转换、校验)与外部依赖(Kafka、Hudi、配置)解耦。这使得核心逻辑变得极易测试,并且未来更换消息队列或存储引擎时,对核心代码的冲击最小。

  5. HashiCorp Nomad: 我们的基础设施已经标准化在 Nomad 上。相比 Kubernetes,Nomad 在处理混合工作负载(长期运行的服务、批处理任务等)时更简单、更灵活。我们的 .NET 服务将被打包成 Docker 镜像,并作为一个 service 类型的作业部署在 Nomad 集群中。

架构与项目结构

我们将遵循 Clean Architecture 的原则来组织我们的 .NET 项目。解决方案包含以下几个项目:

  • Cdc.Hudi.Core: 包含领域实体和接口。这是最核心、最稳定的部分,不依赖任何外部框架。
  • Cdc.Hudi.Application: 包含应用服务和业务逻辑。它协调领域实体,执行数据转换规则,但不直接与 Kafka 或 Hudi 交互。
  • Cdc.Hudi.Infrastructure: 实现核心层定义的接口。这里包含了 Kafka 消费者、Hudi 写入器、日志记录等所有与外部世界交互的代码。
  • Cdc.Hudi.Worker: ASP.NET Core Worker Service 的主项目,负责组装所有依赖(DI 容器配置)并启动后台服务。

这是一个可视化的架构图:

graph TD
    subgraph "PostgreSQL"
        A[OLTP Database] -- WAL --> B(Debezium Connector)
    end

    subgraph "Kafka Cluster"
        B -- JSON Events --> C{Topic: db.public.orders}
    end

    subgraph "Nomad Cluster"
        subgraph "Docker Container (ASP.NET Core Worker)"
            D[Kafka Consumer] -- Raw Event --> E(Application Service)
            E -- Transformed Data --> F[Hudi Writer]
            subgraph "Clean Architecture Layers"
                direction LR
                L1[Infrastructure] --> L2(Application)
                L2 --> L3(Core)
            end
        end
    end
    
    subgraph "Data Lake (S3/HDFS)"
        F -- Spark-Submit --> G[Apache Hudi Table]
    end

    subgraph "Query Engines"
        G -- SQL Query --> H[Presto / Trino / Spark SQL]
    end

    style L1 fill:#f9f,stroke:#333,stroke-width:2px
    style L2 fill:#ccf,stroke:#333,stroke-width:2px
    style L3 fill:#cfc,stroke:#333,stroke-width:2px

步骤化实现:从消费到写入

1. 核心领域定义 (Cdc.Hudi.Core)

我们首先定义核心数据模型和仓储接口。假设我们要同步 orders 表。

// Cdc.Hudi.Core/Entities/Order.cs
namespace Cdc.Hudi.Core.Entities;

public class Order
{
    public Guid Id { get; set; }
    public string OrderNumber { get; set; }
    public decimal Amount { get; set; }
    public DateTime CreatedAtUtc { get; set; }
    public DateTime UpdatedAtUtc { get; set; }
    public bool IsDeleted { get; set; } // Hudi's soft delete flag
}

// Cdc.Hudi.Core/Models/ChangeEvent.cs
// Represents a generic CDC event
public class ChangeEvent<T> where T : class
{
    public T? Before { get; set; }
    public T? After { get; set; }
    public SourceMetadata Source { get; set; }
    public string Op { get; set; } // "c" for create, "u" for update, "d" for delete
    public long TsMs { get; set; }
}

public class SourceMetadata
{
    public string Version { get; set; }
    public string Connector { get; set; }
    public string Name { get; set; }
    public long TsMs { get; set; }
    public string Db { get; set; }
    public string Schema { get; set; }
    public string Table { get; set; }
}

// Cdc.Hudi.Core/Interfaces/IHudiWriter.cs
namespace Cdc.Hudi.Core.Interfaces;

public interface IHudiWriter<T> where T : class
{
    Task WriteAsync(IEnumerable<T> records, CancellationToken cancellationToken);
}

2. 应用层逻辑 (Cdc.Hudi.Application)

应用服务负责处理业务逻辑。例如,将 Debezium 的操作类型映射到我们模型的 IsDeleted 字段上,或者进行一些数据清洗。

// Cdc.Hudi.Application/Services/OrderIngestionService.cs
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Interfaces;
using Cdc.Hudi.Core.Models;
using Microsoft.Extensions.Logging;

namespace Cdc.Hudi.Application.Services;

public class OrderIngestionService
{
    private readonly IHudiWriter<Order> _hudiWriter;
    private readonly ILogger<OrderIngestionService> _logger;

    public OrderIngestionService(IHudiWriter<Order> hudiWriter, ILogger<OrderIngestionService> logger)
    {
        _hudiWriter = hudiWriter;
        _logger = logger;
    }

    public async Task ProcessEventAsync(ChangeEvent<Order> changeEvent, CancellationToken cancellationToken)
    {
        if (changeEvent?.After is null && changeEvent?.Op != "d")
        {
            _logger.LogWarning("Received an event with no 'after' state and it is not a delete operation. Skipping.");
            return;
        }

        Order finalRecord;
        
        switch (changeEvent.Op)
        {
            case "c": // Create
            case "u": // Update
                finalRecord = changeEvent.After!;
                finalRecord.IsDeleted = false;
                break;
            case "d": // Delete
                finalRecord = changeEvent.Before!;
                if (finalRecord is null)
                {
                    _logger.LogError("Received a delete event with no 'before' state. Cannot process tombstone.");
                    return;
                }
                finalRecord.IsDeleted = true;
                break;
            default:
                _logger.LogWarning("Unknown operation type '{Op}'. Skipping event.", changeEvent.Op);
                return;
        }

        // In a real project, more transformations could happen here.
        // For example, data enrichment, PII masking, etc.
        finalRecord.UpdatedAtUtc = DateTime.UtcNow; // Set ingestion timestamp

        try
        {
            // Batching is handled by the worker service, here we process one record at a time conceptually
            await _hudiWriter.WriteAsync(new[] { finalRecord }, cancellationToken);
            _logger.LogInformation("Successfully processed and wrote order {OrderId}", finalRecord.Id);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to write record for order {OrderId} to Hudi.", finalRecord.Id);
            // Error handling strategy is crucial here. Re-queueing, dead-letter queue, etc.
            throw; 
        }
    }
}

3. 基础设施层实现 (Cdc.Hudi.Infrastructure)

这是最复杂的部分,包含了与外部系统的所有集成。

Kafka 消费者

我们使用 Confluent.Kafka 库来消费消息,并自定义一个 Deserializer 来处理 Debezium 的 JSON 结构。

// Cdc.Hudi.Infrastructure/Kafka/DebeziumJsonDeserializer.cs
using System.Text.Json;
using Confluent.Kafka;
using Cdc.Hudi.Core.Models;

namespace Cdc.Hudi.Infrastructure.Kafka;

public class DebeziumJsonDeserializer<T> : IDeserializer<ChangeEvent<T>?> where T : class
{
    private readonly JsonSerializerOptions _options = new()
    {
        PropertyNameCaseInsensitive = true,
        PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
    };

    public ChangeEvent<T>? Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        if (isNull) return null;

        try
        {
            // Debezium wraps the actual payload in a "payload" field.
            var jsonDoc = JsonDocument.Parse(data);
            if (!jsonDoc.RootElement.TryGetProperty("payload", out var payloadElement))
            {
                // This might be a tombstone record or an invalid format
                return null;
            }
            
            return JsonSerializer.Deserialize<ChangeEvent<T>>(payloadElement.GetRawText(), _options);
        }
        catch (JsonException)
        {
            // Log the error and return null to skip the message.
            // A more robust implementation would send this to a dead-letter queue.
            Console.WriteLine($"Failed to deserialize message: {System.Text.Encoding.UTF8.GetString(data)}");
            return null;
        }
    }
}
Hudi 写入器

目前没有成熟的、原生的 .NET Hudi 客户端。在真实项目中,最务实的做法是利用 Hudi 提供的 hudi-spark-bundle,通过 spark-submit 命令行工具来执行写入操作。我们的 C# 代码将负责:

  1. 将一批 .NET 对象序列化为 Parquet 文件。
  2. 调用 spark-submit 进程,并传递必要的 Hudi 配置。
// Cdc.Hudi.Infrastructure/Hudi/SparkHudiWriter.cs
using System.Diagnostics;
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Parquet;
using Parquet.Data;

namespace Cdc.Hudi.Infrastructure.Hudi;

// This implementation is a pragmatic approach for .NET to interact with Hudi.
public class SparkHudiWriter : IHudiWriter<Order>
{
    private readonly ILogger<SparkHudiWriter> _logger;
    private readonly string _sparkSubmitPath;
    private readonly string _hudiBundleJarPath;
    private readonly string _tempParquetPath;
    private readonly string _hudiTablePath;
    
    // Hudi configurations are read from appsettings.json
    private const string HudiTableName = "orders_hudi";
    private const string RecordKeyField = "id";
    private const string PartitionPathField = "created_at_utc"; // Using date for partitioning
    private const string PrecombineField = "updated_at_utc";

    public SparkHudiWriter(ILogger<SparkHudiWriter> logger, IConfiguration config)
    {
        _logger = logger;
        _sparkSubmitPath = config.GetValue<string>("Spark:SubmitPath") ?? "spark-submit";
        _hudiBundleJarPath = config.GetValue<string>("Spark:HudiBundleJarPath");
        _tempParquetPath = Path.Combine(Path.GetTempPath(), "hudi_ingest");
        _hudiTablePath = config.GetValue<string>("Hudi:TableBasePath");
        
        Directory.CreateDirectory(_tempParquetPath);
    }

    public async Task WriteAsync(IEnumerable<Order> records, CancellationToken cancellationToken)
    {
        var recordsList = records.ToList();
        if (!recordsList.Any()) return;

        var tempFilePath = Path.Combine(_tempParquetPath, $"{Guid.NewGuid()}.parquet");

        try
        {
            // Step 1: Serialize .NET objects to a temporary Parquet file
            await WriteToParquetFile(recordsList, tempFilePath);
            
            // Step 2: Build and execute the spark-submit command
            await ExecuteSparkSubmit(tempFilePath, cancellationToken);
        }
        finally
        {
            // Clean up the temporary file
            if (File.Exists(tempFilePath))
            {
                File.Delete(tempFilePath);
            }
        }
    }

    private async Task WriteToParquetFile(List<Order> orders, string filePath)
    {
        var dataFields = new DataField[]
        {
            new DataField<Guid>("id"),
            new DataField<string>("order_number"),
            new DataField<decimal>("amount"),
            new DataField<DateTime>("created_at_utc"),
            new DataField<DateTime>("updated_at_utc"),
            new DataField<bool>("is_deleted")
        };

        var schema = new Schema(dataFields);

        using var fileStream = File.Create(filePath);
        using var parquetWriter = await ParquetWriter.CreateAsync(schema, fileStream);
        using var groupWriter = parquetWriter.CreateRowGroup();
        
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[0], orders.Select(o => o.Id).ToArray()));
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[1], orders.Select(o => o.OrderNumber).ToArray()));
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[2], orders.Select(o => o.Amount).ToArray()));
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[3], orders.Select(o => o.CreatedAtUtc).ToArray()));
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[4], orders.Select(o => o.UpdatedAtUtc).ToArray()));
        await groupWriter.WriteColumnAsync(new DataColumn(dataFields[5], orders.Select(o => o.IsDeleted).ToArray()));
    }

    private async Task ExecuteSparkSubmit(string sourceParquetFile, CancellationToken cancellationToken)
    {
        // A helper script is often better than a very long command line argument.
        // Here we build it dynamically for clarity.
        var scriptContent = $@"
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('HudiNetCoreIngest').getOrCreate()
input_df = spark.read.parquet('{sourceParquetFile}')

hudi_options = {{
    'hoodie.table.name': '{HudiTableName}',
    'hoodie.datasource.write.recordkey.field': '{RecordKeyField}',
    'hoodie.datasource.write.partitionpath.field': '{PartitionPathField}',
    'hoodie.datasource.write.table.name': '{HudiTableName}',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.precombine.field': '{PrecombineField}',
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
    'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload',
    'hoodie.parquet.small.file.limit': '0', # Disable auto-sizing for simplicity
    'hoodie.upsert.shuffle.parallelism': '2',
    'hoodie.insert.shuffle.parallelism': '2'
}}

input_df.write.format('hudi') \
    .options(**hudi_options) \
    .mode('append') \
    .save('{_hudiTablePath}')

spark.stop()
";
        var pythonScriptPath = Path.Combine(_tempParquetPath, $"{Guid.NewGuid()}.py");
        await File.WriteAllTextAsync(pythonScriptPath, scriptContent, cancellationToken);
        
        var processInfo = new ProcessStartInfo
        {
            FileName = _sparkSubmitPath,
            ArgumentList = {
                "--master", "local[*]", // Or your cluster URL
                "--jars", _hudiBundleJarPath,
                "--driver-class-path", _hudiBundleJarPath,
                "--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
                "--conf", "spark.sql.hive.convertMetastoreParquet=false",
                pythonScriptPath
            },
            RedirectStandardOutput = true,
            RedirectStandardError = true,
            UseShellExecute = false,
            CreateNoWindow = true,
        };

        _logger.LogInformation("Executing spark-submit with script: {ScriptPath}", pythonScriptPath);
        using var process = Process.Start(processInfo);
        
        if (process is null)
        {
            throw new InvalidOperationException("Failed to start spark-submit process.");
        }

        var stdOut = await process.StandardOutput.ReadToEndAsync(cancellationToken);
        var stdErr = await process.StandardError.ReadToEndAsync(cancellationToken);
        await process.WaitForExitAsync(cancellationToken);

        if (process.ExitCode != 0)
        {
            _logger.LogError("spark-submit failed with exit code {ExitCode}.\nSTDOUT: {StdOut}\nSTDERR: {StdErr}", 
                process.ExitCode, stdOut, stdErr);
            throw new Exception($"Spark job failed. See logs for details.");
        }
        
        _logger.LogInformation("spark-submit completed successfully. STDOUT: {StdOut}", stdOut);
        File.Delete(pythonScriptPath);
    }
}

4. Worker Service (Cdc.Hudi.Worker)

最后,我们用一个 BackgroundService 将所有部分串联起来。它会持续地从 Kafka 拉取消息,分批处理,然后调用应用服务。

// Cdc.Hudi.Worker/KafkaEventProcessor.cs
using Confluent.Kafka;
using Cdc.Hudi.Application.Services;
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Models;
using Cdc.Hudi.Infrastructure.Kafka;

namespace Cdc.Hudi.Worker;

public class KafkaEventProcessor : BackgroundService
{
    private readonly ILogger<KafkaEventProcessor> _logger;
    private readonly IServiceScopeFactory _scopeFactory;
    private readonly IConfiguration _config;

    public KafkaEventProcessor(ILogger<KafkaEventProcessor> logger, IServiceScopeFactory scopeFactory, IConfiguration config)
    {
        _logger = logger;
        _scopeFactory = scopeFactory;
        _config = config;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var consumerConfig = new ConsumerConfig
        {
            BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
            GroupId = "hudi-ingestion-worker",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = false // We need manual control
        };
        
        var topic = _config.GetValue<string>("Kafka:Topic");

        using var consumer = new ConsumerBuilder<string, ChangeEvent<Order>?>(consumerConfig)
            .SetValueDeserializer(new DebeziumJsonDeserializer<Order>())
            .Build();
            
        consumer.Subscribe(topic);
        _logger.LogInformation("Subscribed to Kafka topic: {Topic}", topic);

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
                var consumeResult = consumer.Consume(stoppingToken);

                if (consumeResult?.Message?.Value is null)
                {
                    // Skip tombstone or deserialization failed messages
                    consumer.Commit(consumeResult);
                    continue;
                }
                
                var changeEvent = consumeResult.Message.Value;

                // Process each message in a new DI scope
                using (var scope = _scopeFactory.CreateScope())
                {
                    var ingestionService = scope.ServiceProvider.GetRequiredService<OrderIngestionService>();
                    await ingestionService.ProcessEventAsync(changeEvent, stoppingToken);
                }

                consumer.Commit(consumeResult); // Commit offset after successful processing
            }
            catch (ConsumeException e)
            {
                _logger.LogError(e, "Error consuming from Kafka.");
            }
            catch (OperationCanceledException)
            {
                _logger.LogInformation("Worker service is stopping.");
                break;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "An unhandled exception occurred during message processing. Waiting before retrying.");
                // In a production system, you might want a more sophisticated back-off strategy.
                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }
        }

        consumer.Close();
    }
}

部署到 Nomad

我们将服务打包成一个 Docker 镜像,然后通过一个 Nomad job 文件来部署它。

Dockerfile:

FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish "Cdc.Hudi.Worker/Cdc.Hudi.Worker.csproj" -c Release -o /app/publish

# Install Spark and Python for the Hudi writer
FROM openjdk:11-jre-slim
RUN apt-get update && \
    apt-get install -y python3 python3-pip curl && \
    rm -rf /var/lib/apt/lists/*

ENV SPARK_VERSION=3.3.2
ENV HADOOP_VERSION=3
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin

RUN curl -o spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
    tar -xf spark.tgz && \
    mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME} && \
    rm spark.tgz

RUN pip3 install pyspark==${SPARK_VERSION}

# Copy .NET application
WORKDIR /app
COPY --from=build /app/publish .

# TODO: Add your hudi-spark-bundle JAR to the image or mount it
# COPY hudi-spark3.3-bundle_2.12-0.12.2.jar /app/hudi-spark-bundle.jar

ENTRYPOINT ["dotnet", "Cdc.Hudi.Worker.dll"]

cdc-hudi-ingest.nomad.hcl:

job "cdc-hudi-ingest" {
  datacenters = ["dc1"]
  type        = "service"

  group "ingest-worker" {
    count = 1 # Start with one instance, can be scaled up

    restart {
      attempts = 3
      interval = "5m"
      delay    = "25s"
      mode     = "delay"
    }

    task "app" {
      driver = "docker"

      config {
        image = "your-repo/cdc-hudi-worker:latest"
        // Mount Hudi bundle JAR if not included in the image
        // volumes = [
        //   "/path/to/hudi-spark-bundle.jar:/app/hudi-spark-bundle.jar:ro"
        // ]
      }
      
      // Configuration passed as environment variables
      env {
        "Kafka__BootstrapServers" = "kafka.service.consul:9092"
        "Kafka__Topic"            = "dbserver1.public.orders"
        "Hudi__TableBasePath"     = "s3a://your-data-lake-bucket/hudi_tables/orders"
        "Spark__SubmitPath"       = "/opt/spark/bin/spark-submit"
        "Spark__HudiBundleJarPath"= "/app/hudi-spark-bundle.jar"
        // AWS credentials for S3 access should be configured via Nomad's secrets or vault.
        "AWS_ACCESS_KEY_ID"       = "..."
        "AWS_SECRET_ACCESS_KEY"   = "..."
      }

      resources {
        cpu    = 1024 # MHz
        memory = 2048 # MB
      }
    }
  }
}

遗留问题与未来迭代

当前通过 Process.Start 调用 spark-submit 的方案是务实的,但存在固有的性能开销。每次写入一小批数据都要启动一个完整的 Spark 会话,这对于高吞吐量的场景并不理想。一个可能的优化路径是修改 .NET 服务,使其能够将数据累积成更大的批次(例如,每分钟或每10000条记录触发一次写入),以摊销 Spark 启动的成本。

另一个更长远的演进方向是探索 Java/.NET 互操作技术,如 IKVM.NETGraalVM 的互操作能力,以便在 .NET 进程中直接调用 Hudi 的 Java 写入 API。这将彻底消除进程调用的开销,实现更高效的集成,但也会显著增加实现的复杂性。

最后,当前方案是单实例处理,如果 Kafka topic 的分区数增加,可以通过增加 group "ingest-worker"count 来水平扩展消费者,但需要确保 Hudi 的并发写入控制能够正确处理来自多个写入者的提交,这可能需要引入 Hudi 的乐观并发控制或基于 Zookeeper/Hive Metastore 的锁机制。


  目录