我们面临一个典型的数据工程难题:核心业务的 PostgreSQL 数据库承载了大量的在线事务处理(OLTP),但分析团队需要近乎实时地查询这些数据。直接在生产库上跑复杂的分析查询是不可接受的,这会严重影响核心业务性能。传统的夜间批量 ETL 任务延迟太高,无法满足业务对数据新鲜度的要求。我们需要一个方案,将变更数据实时捕获并导入到一个支持快速分析查询的数据湖中,同时,这个方案必须能融入我们现有的以 .NET 为主的技术栈,并且部署在我们统一的 Nomad 集群上。
初步构想与技术选型决策
初步构想是建立一个基于变更数据捕获(CDC)的实时数据管道。数据流向大致如下:PostgreSQL (Source) -> Debezium -> Kafka -> .NET Ingestion Service -> Apache Hudi (Data Lake)
这个架构中的每一个组件选型都经过了权衡:
Debezium + Kafka: 这是 CDC 领域的黄金组合。Debezium 通过读取 PostgreSQL 的逻辑复制槽(WAL),能以极低的开销捕获行级别的
INSERT,UPDATE,DELETE操作,并将这些变更事件以结构化的 JSON 格式推送到 Kafka topic 中。Kafka 提供了可靠的、可扩展的事件总线。Apache Hudi: 我们选择 Hudi 作为数据湖的存储格式,而非简单的 Parquet 或 ORC。核心原因是 Hudi 提供了数据湖上的 ACID 事务、记录级别的更新(Upsert)和删除。这对于处理来自 OLTP 数据库的 CDC 数据至关重要,因为我们需要在数据湖中精确地复现源数据库的状态变更。我们选择了写时合并(Copy on Write)表类型,以优化读取性能。
ASP.NET Core Worker Service: 这是整个方案中最具“非主流”色彩的一环。通常这类数据管道会使用 Spark Streaming 或 Flink。但我们的团队主要是 .NET 开发者,引入一个庞大的 JVM 技术栈会带来高昂的学习和维护成本。使用 .NET Core Worker Service 作为消费者,可以让我们在熟悉的开发环境中,利用 C# 的强类型、现代化的异步编程模型以及丰富的库生态来构建一个轻量级、高性能的摄取服务。
Clean Architecture: 对于一个需要长期运行和维护的数据管道服务,一个清晰的架构至关重要。Clean Architecture 强制将业务逻辑(数据转换、校验)与外部依赖(Kafka、Hudi、配置)解耦。这使得核心逻辑变得极易测试,并且未来更换消息队列或存储引擎时,对核心代码的冲击最小。
HashiCorp Nomad: 我们的基础设施已经标准化在 Nomad 上。相比 Kubernetes,Nomad 在处理混合工作负载(长期运行的服务、批处理任务等)时更简单、更灵活。我们的 .NET 服务将被打包成 Docker 镜像,并作为一个
service类型的作业部署在 Nomad 集群中。
架构与项目结构
我们将遵循 Clean Architecture 的原则来组织我们的 .NET 项目。解决方案包含以下几个项目:
-
Cdc.Hudi.Core: 包含领域实体和接口。这是最核心、最稳定的部分,不依赖任何外部框架。 -
Cdc.Hudi.Application: 包含应用服务和业务逻辑。它协调领域实体,执行数据转换规则,但不直接与 Kafka 或 Hudi 交互。 -
Cdc.Hudi.Infrastructure: 实现核心层定义的接口。这里包含了 Kafka 消费者、Hudi 写入器、日志记录等所有与外部世界交互的代码。 -
Cdc.Hudi.Worker: ASP.NET Core Worker Service 的主项目,负责组装所有依赖(DI 容器配置)并启动后台服务。
这是一个可视化的架构图:
graph TD
subgraph "PostgreSQL"
A[OLTP Database] -- WAL --> B(Debezium Connector)
end
subgraph "Kafka Cluster"
B -- JSON Events --> C{Topic: db.public.orders}
end
subgraph "Nomad Cluster"
subgraph "Docker Container (ASP.NET Core Worker)"
D[Kafka Consumer] -- Raw Event --> E(Application Service)
E -- Transformed Data --> F[Hudi Writer]
subgraph "Clean Architecture Layers"
direction LR
L1[Infrastructure] --> L2(Application)
L2 --> L3(Core)
end
end
end
subgraph "Data Lake (S3/HDFS)"
F -- Spark-Submit --> G[Apache Hudi Table]
end
subgraph "Query Engines"
G -- SQL Query --> H[Presto / Trino / Spark SQL]
end
style L1 fill:#f9f,stroke:#333,stroke-width:2px
style L2 fill:#ccf,stroke:#333,stroke-width:2px
style L3 fill:#cfc,stroke:#333,stroke-width:2px
步骤化实现:从消费到写入
1. 核心领域定义 (Cdc.Hudi.Core)
我们首先定义核心数据模型和仓储接口。假设我们要同步 orders 表。
// Cdc.Hudi.Core/Entities/Order.cs
namespace Cdc.Hudi.Core.Entities;
public class Order
{
public Guid Id { get; set; }
public string OrderNumber { get; set; }
public decimal Amount { get; set; }
public DateTime CreatedAtUtc { get; set; }
public DateTime UpdatedAtUtc { get; set; }
public bool IsDeleted { get; set; } // Hudi's soft delete flag
}
// Cdc.Hudi.Core/Models/ChangeEvent.cs
// Represents a generic CDC event
public class ChangeEvent<T> where T : class
{
public T? Before { get; set; }
public T? After { get; set; }
public SourceMetadata Source { get; set; }
public string Op { get; set; } // "c" for create, "u" for update, "d" for delete
public long TsMs { get; set; }
}
public class SourceMetadata
{
public string Version { get; set; }
public string Connector { get; set; }
public string Name { get; set; }
public long TsMs { get; set; }
public string Db { get; set; }
public string Schema { get; set; }
public string Table { get; set; }
}
// Cdc.Hudi.Core/Interfaces/IHudiWriter.cs
namespace Cdc.Hudi.Core.Interfaces;
public interface IHudiWriter<T> where T : class
{
Task WriteAsync(IEnumerable<T> records, CancellationToken cancellationToken);
}
2. 应用层逻辑 (Cdc.Hudi.Application)
应用服务负责处理业务逻辑。例如,将 Debezium 的操作类型映射到我们模型的 IsDeleted 字段上,或者进行一些数据清洗。
// Cdc.Hudi.Application/Services/OrderIngestionService.cs
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Interfaces;
using Cdc.Hudi.Core.Models;
using Microsoft.Extensions.Logging;
namespace Cdc.Hudi.Application.Services;
public class OrderIngestionService
{
private readonly IHudiWriter<Order> _hudiWriter;
private readonly ILogger<OrderIngestionService> _logger;
public OrderIngestionService(IHudiWriter<Order> hudiWriter, ILogger<OrderIngestionService> logger)
{
_hudiWriter = hudiWriter;
_logger = logger;
}
public async Task ProcessEventAsync(ChangeEvent<Order> changeEvent, CancellationToken cancellationToken)
{
if (changeEvent?.After is null && changeEvent?.Op != "d")
{
_logger.LogWarning("Received an event with no 'after' state and it is not a delete operation. Skipping.");
return;
}
Order finalRecord;
switch (changeEvent.Op)
{
case "c": // Create
case "u": // Update
finalRecord = changeEvent.After!;
finalRecord.IsDeleted = false;
break;
case "d": // Delete
finalRecord = changeEvent.Before!;
if (finalRecord is null)
{
_logger.LogError("Received a delete event with no 'before' state. Cannot process tombstone.");
return;
}
finalRecord.IsDeleted = true;
break;
default:
_logger.LogWarning("Unknown operation type '{Op}'. Skipping event.", changeEvent.Op);
return;
}
// In a real project, more transformations could happen here.
// For example, data enrichment, PII masking, etc.
finalRecord.UpdatedAtUtc = DateTime.UtcNow; // Set ingestion timestamp
try
{
// Batching is handled by the worker service, here we process one record at a time conceptually
await _hudiWriter.WriteAsync(new[] { finalRecord }, cancellationToken);
_logger.LogInformation("Successfully processed and wrote order {OrderId}", finalRecord.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to write record for order {OrderId} to Hudi.", finalRecord.Id);
// Error handling strategy is crucial here. Re-queueing, dead-letter queue, etc.
throw;
}
}
}
3. 基础设施层实现 (Cdc.Hudi.Infrastructure)
这是最复杂的部分,包含了与外部系统的所有集成。
Kafka 消费者
我们使用 Confluent.Kafka 库来消费消息,并自定义一个 Deserializer 来处理 Debezium 的 JSON 结构。
// Cdc.Hudi.Infrastructure/Kafka/DebeziumJsonDeserializer.cs
using System.Text.Json;
using Confluent.Kafka;
using Cdc.Hudi.Core.Models;
namespace Cdc.Hudi.Infrastructure.Kafka;
public class DebeziumJsonDeserializer<T> : IDeserializer<ChangeEvent<T>?> where T : class
{
private readonly JsonSerializerOptions _options = new()
{
PropertyNameCaseInsensitive = true,
PropertyNamingPolicy = JsonNamingPolicy.SnakeCaseLower,
};
public ChangeEvent<T>? Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (isNull) return null;
try
{
// Debezium wraps the actual payload in a "payload" field.
var jsonDoc = JsonDocument.Parse(data);
if (!jsonDoc.RootElement.TryGetProperty("payload", out var payloadElement))
{
// This might be a tombstone record or an invalid format
return null;
}
return JsonSerializer.Deserialize<ChangeEvent<T>>(payloadElement.GetRawText(), _options);
}
catch (JsonException)
{
// Log the error and return null to skip the message.
// A more robust implementation would send this to a dead-letter queue.
Console.WriteLine($"Failed to deserialize message: {System.Text.Encoding.UTF8.GetString(data)}");
return null;
}
}
}
Hudi 写入器
目前没有成熟的、原生的 .NET Hudi 客户端。在真实项目中,最务实的做法是利用 Hudi 提供的 hudi-spark-bundle,通过 spark-submit 命令行工具来执行写入操作。我们的 C# 代码将负责:
- 将一批 .NET 对象序列化为 Parquet 文件。
- 调用
spark-submit进程,并传递必要的 Hudi 配置。
// Cdc.Hudi.Infrastructure/Hudi/SparkHudiWriter.cs
using System.Diagnostics;
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Interfaces;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Parquet;
using Parquet.Data;
namespace Cdc.Hudi.Infrastructure.Hudi;
// This implementation is a pragmatic approach for .NET to interact with Hudi.
public class SparkHudiWriter : IHudiWriter<Order>
{
private readonly ILogger<SparkHudiWriter> _logger;
private readonly string _sparkSubmitPath;
private readonly string _hudiBundleJarPath;
private readonly string _tempParquetPath;
private readonly string _hudiTablePath;
// Hudi configurations are read from appsettings.json
private const string HudiTableName = "orders_hudi";
private const string RecordKeyField = "id";
private const string PartitionPathField = "created_at_utc"; // Using date for partitioning
private const string PrecombineField = "updated_at_utc";
public SparkHudiWriter(ILogger<SparkHudiWriter> logger, IConfiguration config)
{
_logger = logger;
_sparkSubmitPath = config.GetValue<string>("Spark:SubmitPath") ?? "spark-submit";
_hudiBundleJarPath = config.GetValue<string>("Spark:HudiBundleJarPath");
_tempParquetPath = Path.Combine(Path.GetTempPath(), "hudi_ingest");
_hudiTablePath = config.GetValue<string>("Hudi:TableBasePath");
Directory.CreateDirectory(_tempParquetPath);
}
public async Task WriteAsync(IEnumerable<Order> records, CancellationToken cancellationToken)
{
var recordsList = records.ToList();
if (!recordsList.Any()) return;
var tempFilePath = Path.Combine(_tempParquetPath, $"{Guid.NewGuid()}.parquet");
try
{
// Step 1: Serialize .NET objects to a temporary Parquet file
await WriteToParquetFile(recordsList, tempFilePath);
// Step 2: Build and execute the spark-submit command
await ExecuteSparkSubmit(tempFilePath, cancellationToken);
}
finally
{
// Clean up the temporary file
if (File.Exists(tempFilePath))
{
File.Delete(tempFilePath);
}
}
}
private async Task WriteToParquetFile(List<Order> orders, string filePath)
{
var dataFields = new DataField[]
{
new DataField<Guid>("id"),
new DataField<string>("order_number"),
new DataField<decimal>("amount"),
new DataField<DateTime>("created_at_utc"),
new DataField<DateTime>("updated_at_utc"),
new DataField<bool>("is_deleted")
};
var schema = new Schema(dataFields);
using var fileStream = File.Create(filePath);
using var parquetWriter = await ParquetWriter.CreateAsync(schema, fileStream);
using var groupWriter = parquetWriter.CreateRowGroup();
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[0], orders.Select(o => o.Id).ToArray()));
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[1], orders.Select(o => o.OrderNumber).ToArray()));
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[2], orders.Select(o => o.Amount).ToArray()));
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[3], orders.Select(o => o.CreatedAtUtc).ToArray()));
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[4], orders.Select(o => o.UpdatedAtUtc).ToArray()));
await groupWriter.WriteColumnAsync(new DataColumn(dataFields[5], orders.Select(o => o.IsDeleted).ToArray()));
}
private async Task ExecuteSparkSubmit(string sourceParquetFile, CancellationToken cancellationToken)
{
// A helper script is often better than a very long command line argument.
// Here we build it dynamically for clarity.
var scriptContent = $@"
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HudiNetCoreIngest').getOrCreate()
input_df = spark.read.parquet('{sourceParquetFile}')
hudi_options = {{
'hoodie.table.name': '{HudiTableName}',
'hoodie.datasource.write.recordkey.field': '{RecordKeyField}',
'hoodie.datasource.write.partitionpath.field': '{PartitionPathField}',
'hoodie.datasource.write.table.name': '{HudiTableName}',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': '{PrecombineField}',
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.OverwriteWithLatestAvroPayload',
'hoodie.parquet.small.file.limit': '0', # Disable auto-sizing for simplicity
'hoodie.upsert.shuffle.parallelism': '2',
'hoodie.insert.shuffle.parallelism': '2'
}}
input_df.write.format('hudi') \
.options(**hudi_options) \
.mode('append') \
.save('{_hudiTablePath}')
spark.stop()
";
var pythonScriptPath = Path.Combine(_tempParquetPath, $"{Guid.NewGuid()}.py");
await File.WriteAllTextAsync(pythonScriptPath, scriptContent, cancellationToken);
var processInfo = new ProcessStartInfo
{
FileName = _sparkSubmitPath,
ArgumentList = {
"--master", "local[*]", // Or your cluster URL
"--jars", _hudiBundleJarPath,
"--driver-class-path", _hudiBundleJarPath,
"--conf", "spark.serializer=org.apache.spark.serializer.KryoSerializer",
"--conf", "spark.sql.hive.convertMetastoreParquet=false",
pythonScriptPath
},
RedirectStandardOutput = true,
RedirectStandardError = true,
UseShellExecute = false,
CreateNoWindow = true,
};
_logger.LogInformation("Executing spark-submit with script: {ScriptPath}", pythonScriptPath);
using var process = Process.Start(processInfo);
if (process is null)
{
throw new InvalidOperationException("Failed to start spark-submit process.");
}
var stdOut = await process.StandardOutput.ReadToEndAsync(cancellationToken);
var stdErr = await process.StandardError.ReadToEndAsync(cancellationToken);
await process.WaitForExitAsync(cancellationToken);
if (process.ExitCode != 0)
{
_logger.LogError("spark-submit failed with exit code {ExitCode}.\nSTDOUT: {StdOut}\nSTDERR: {StdErr}",
process.ExitCode, stdOut, stdErr);
throw new Exception($"Spark job failed. See logs for details.");
}
_logger.LogInformation("spark-submit completed successfully. STDOUT: {StdOut}", stdOut);
File.Delete(pythonScriptPath);
}
}
4. Worker Service (Cdc.Hudi.Worker)
最后,我们用一个 BackgroundService 将所有部分串联起来。它会持续地从 Kafka 拉取消息,分批处理,然后调用应用服务。
// Cdc.Hudi.Worker/KafkaEventProcessor.cs
using Confluent.Kafka;
using Cdc.Hudi.Application.Services;
using Cdc.Hudi.Core.Entities;
using Cdc.Hudi.Core.Models;
using Cdc.Hudi.Infrastructure.Kafka;
namespace Cdc.Hudi.Worker;
public class KafkaEventProcessor : BackgroundService
{
private readonly ILogger<KafkaEventProcessor> _logger;
private readonly IServiceScopeFactory _scopeFactory;
private readonly IConfiguration _config;
public KafkaEventProcessor(ILogger<KafkaEventProcessor> logger, IServiceScopeFactory scopeFactory, IConfiguration config)
{
_logger = logger;
_scopeFactory = scopeFactory;
_config = config;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _config.GetValue<string>("Kafka:BootstrapServers"),
GroupId = "hudi-ingestion-worker",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // We need manual control
};
var topic = _config.GetValue<string>("Kafka:Topic");
using var consumer = new ConsumerBuilder<string, ChangeEvent<Order>?>(consumerConfig)
.SetValueDeserializer(new DebeziumJsonDeserializer<Order>())
.Build();
consumer.Subscribe(topic);
_logger.LogInformation("Subscribed to Kafka topic: {Topic}", topic);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = consumer.Consume(stoppingToken);
if (consumeResult?.Message?.Value is null)
{
// Skip tombstone or deserialization failed messages
consumer.Commit(consumeResult);
continue;
}
var changeEvent = consumeResult.Message.Value;
// Process each message in a new DI scope
using (var scope = _scopeFactory.CreateScope())
{
var ingestionService = scope.ServiceProvider.GetRequiredService<OrderIngestionService>();
await ingestionService.ProcessEventAsync(changeEvent, stoppingToken);
}
consumer.Commit(consumeResult); // Commit offset after successful processing
}
catch (ConsumeException e)
{
_logger.LogError(e, "Error consuming from Kafka.");
}
catch (OperationCanceledException)
{
_logger.LogInformation("Worker service is stopping.");
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "An unhandled exception occurred during message processing. Waiting before retrying.");
// In a production system, you might want a more sophisticated back-off strategy.
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
}
}
consumer.Close();
}
}
部署到 Nomad
我们将服务打包成一个 Docker 镜像,然后通过一个 Nomad job 文件来部署它。
Dockerfile:
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS build
WORKDIR /src
COPY . .
RUN dotnet publish "Cdc.Hudi.Worker/Cdc.Hudi.Worker.csproj" -c Release -o /app/publish
# Install Spark and Python for the Hudi writer
FROM openjdk:11-jre-slim
RUN apt-get update && \
apt-get install -y python3 python3-pip curl && \
rm -rf /var/lib/apt/lists/*
ENV SPARK_VERSION=3.3.2
ENV HADOOP_VERSION=3
ENV SPARK_HOME=/opt/spark
ENV PATH=$PATH:$SPARK_HOME/bin
RUN curl -o spark.tgz "https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz" && \
tar -xf spark.tgz && \
mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} ${SPARK_HOME} && \
rm spark.tgz
RUN pip3 install pyspark==${SPARK_VERSION}
# Copy .NET application
WORKDIR /app
COPY /app/publish .
# TODO: Add your hudi-spark-bundle JAR to the image or mount it
# COPY hudi-spark3.3-bundle_2.12-0.12.2.jar /app/hudi-spark-bundle.jar
ENTRYPOINT ["dotnet", "Cdc.Hudi.Worker.dll"]
cdc-hudi-ingest.nomad.hcl:
job "cdc-hudi-ingest" {
datacenters = ["dc1"]
type = "service"
group "ingest-worker" {
count = 1 # Start with one instance, can be scaled up
restart {
attempts = 3
interval = "5m"
delay = "25s"
mode = "delay"
}
task "app" {
driver = "docker"
config {
image = "your-repo/cdc-hudi-worker:latest"
// Mount Hudi bundle JAR if not included in the image
// volumes = [
// "/path/to/hudi-spark-bundle.jar:/app/hudi-spark-bundle.jar:ro"
// ]
}
// Configuration passed as environment variables
env {
"Kafka__BootstrapServers" = "kafka.service.consul:9092"
"Kafka__Topic" = "dbserver1.public.orders"
"Hudi__TableBasePath" = "s3a://your-data-lake-bucket/hudi_tables/orders"
"Spark__SubmitPath" = "/opt/spark/bin/spark-submit"
"Spark__HudiBundleJarPath"= "/app/hudi-spark-bundle.jar"
// AWS credentials for S3 access should be configured via Nomad's secrets or vault.
"AWS_ACCESS_KEY_ID" = "..."
"AWS_SECRET_ACCESS_KEY" = "..."
}
resources {
cpu = 1024 # MHz
memory = 2048 # MB
}
}
}
}
遗留问题与未来迭代
当前通过 Process.Start 调用 spark-submit 的方案是务实的,但存在固有的性能开销。每次写入一小批数据都要启动一个完整的 Spark 会话,这对于高吞吐量的场景并不理想。一个可能的优化路径是修改 .NET 服务,使其能够将数据累积成更大的批次(例如,每分钟或每10000条记录触发一次写入),以摊销 Spark 启动的成本。
另一个更长远的演进方向是探索 Java/.NET 互操作技术,如 IKVM.NET 或 GraalVM 的互操作能力,以便在 .NET 进程中直接调用 Hudi 的 Java 写入 API。这将彻底消除进程调用的开销,实现更高效的集成,但也会显著增加实现的复杂性。
最后,当前方案是单实例处理,如果 Kafka topic 的分区数增加,可以通过增加 group "ingest-worker" 的 count 来水平扩展消费者,但需要确保 Hudi 的并发写入控制能够正确处理来自多个写入者的提交,这可能需要引入 Hudi 的乐观并发控制或基于 Zookeeper/Hive Metastore 的锁机制。