在统一Monorepo中采用绞杀者模式剥离PHP单体至ScyllaDB、Rust与Go高性能架构


一个线上运行了数年的PHP单体应用,承载着核心业务逻辑,其用户画像实时写入模块正面临严峻的性能瓶颈。该模块负责处理高并发的用户行为事件,并将其更新到后端的MySQL数据库。随着流量增长,MySQL的写延迟和连接数压力已成为整个系统的主要短板,任何数据库的抖动都会直接引发雪崩效应。直接替换整个PHP应用风险过高,周期过长,因此剥离这个高负载的写密集型路径成为唯一可行的方案。

我们的目标是在不进行大规模重构的前提下,将这个关键路径迁移到一个新的、具备水平扩展能力和低延迟特性的技术栈上。整个演进过程必须在同一个代码库(Monorepo)中进行,以保证工程上的一致性和可管理性。

定义问题与架构权衡

核心问题是替换一个高并发、写密集的API端点。当前的POST /api/v1/profile/update接口由PHP实现,直接操作MySQL。新架构需要一个高性能数据库和一门能够充分利用硬件资源的语言。ScyllaDB因其与Cassandra兼容的API和基于Seastar框架的极致性能,成为数据库的理想选择。

但在服务实现语言上,我们面临一个抉择:使用Go还是Rust?

方案A: 采用Go构建新服务

Go语言的并发模型基于Goroutine和Channel,非常适合构建高并发的网络服务。其简洁的语法、快速的编译速度以及强大的标准库(特别是net/http)使得开发和迭代效率非常高。在真实项目中,Go的工程成熟度和庞大的社区生态系统是重要的加分项。

优势:

  1. 开发效率高: 语法简单,上手快,团队成员能迅速形成战斗力。
  2. 并发处理成熟: Goroutine调度开销极小,能轻松应对海量并发连接。
  3. 生态系统完善: 无论是数据库驱动(gocql)、监控库还是RPC框架,都有成熟的生产级选择。

劣势:

  1. 性能极限: 虽然性能出色,但在CPU密集型计算和对内存布局有极致要求的场景下,其性能天花板低于Rust。GC的存在也可能引入不可预测的延迟(尽管现代Go GC已相当优秀)。
  2. 错误处理模式: if err != nil 的模式虽然直观,但在复杂的业务逻辑中容易变得冗长。

方案B: 采用Rust与Rocket框架构建新服务

Rust提供了与C++相媲美的性能,同时通过其所有权系统和借用检查器在编译期保证了内存安全和线程安全。这对于需要长期稳定运行、对延迟和资源消耗极其敏感的核心服务来说,具有巨大的吸引力。Rocket是一个功能强大且易于使用的Web框架,能让我们在享受Rust性能优势的同时,保持不错的开发体验。

优势:

  1. 极致性能与内存安全: 无GC、零成本抽象,能最大限度地压榨硬件性能,且杜绝了空指针、数据竞争等常见运行时错误。
  2. 强大的类型系统: 丰富的类型系统和模式匹配能在编译时捕获更多逻辑错误。
  3. 可预测的延迟: 没有GC停顿,使得服务延迟(p99, p999)更加稳定可控。

劣势:

  1. 学习曲线陡峭: 所有权和生命周期概念对开发者提出了更高的要求。
  2. 编译时间较长: 复杂的类型检查和优化导致编译速度慢于Go。
  3. 生态相对年轻: 虽然生态在迅速发展,但在某些特定领域的库选择上可能不如Go丰富。

最终决策:混合架构,各司其职

一个常见的错误是陷入“非此即彼”的技术选型思维。在评估了两种方案后,我们决定不选择单一技术,而是采用一种混合架构,让Go和Rust分别在它们最擅长的领域发挥作用。

  1. Go框架作为“绞杀者”代理层 (Strangler Proxy): 我们将构建一个轻量级的Go服务,作为整个系统的入口代理。它的唯一职责是根据请求头、路径或动态配置,将流量路由到旧的PHP单体或新的Rust服务。Go的net/http/httputil.ReverseProxy非常适合这个任务,其稳定性和性能足以胜任。这个代理层是实施“绞杀者模式”的关键。

  2. Rust (Rocket) 作为高性能核心服务: 真正处理业务逻辑、与ScyllaDB交互的新服务将由Rust实现。这里我们追求的是极致的性能、稳定性和资源效率。

  3. Monorepo统一管理: 所有代码,包括遗留的php-monolith、新的strangler-proxy-goprofile-writer-rust服务,都将存放在一个统一的Monorepo中。这简化了CI/CD、依赖管理和跨团队协作。

graph TD
    subgraph Monorepo
        direction LR
        subgraph apps
            A[php-monolith]
            B[strangler-proxy-go]
            C[profile-writer-rust]
        end
        subgraph packages
            D[proto-definitions]
        end
    end

    Client[Client] --> Ingress

    subgraph Kubernetes Cluster
        Ingress --> B
        B -- Route: /api/v1/** --> A
        B -- Route: /api/v2/profile/update --> C
    end

    A --> MySQL[(MySQL)]
    C --> ScyllaDB[(ScyllaDB)]

    B -- Shadowing (optional) --> C

这种架构下,我们可以逐步、安全地迁移流量。初期,所有流量都指向PHP。然后,我们可以开启“影子流量”(Shadowing),即将线上请求复制一份发送给新的Rust服务,但不使用其返回结果,仅用于验证新系统的性能和正确性。当验证通过后,再逐步将真实流量切换到新服务。

核心实现概览

首先是我们的Monorepo目录结构,使用appspackages的经典模式。

.
├── Makefile
├── apps
│   ├── php-monolith
│   │   └── ... (Legacy PHP code)
│   ├── profile-writer-rust
│   │   ├── Cargo.toml
│   │   └── src
│   │       └── main.rs
│   └── strangler-proxy-go
│       ├── go.mod
│       ├── go.sum
│       └── main.go
└── packages
    └── ... (Shared libraries, e.g., Protobuf)

MakefileTaskfile将提供统一的命令接口,如 make build-all, make test-rust, make run-proxy

1. ScyllaDB 数据模型

我们为用户画像设计一个简单的表。选择user_id作为分区键(Partition Key)可以确保同一个用户的所有数据都落在同一个节点上,提高查询效率。last_updated_ts作为聚类键(Clustering Key)用于排序。

-- Filename: schema.cql

CREATE KEYSPACE IF NOT EXISTS user_profiles WITH REPLICATION = {
    'class': 'NetworkTopologyStrategy',
    'replication_factor': 3
};

USE user_profiles;

CREATE TABLE IF NOT EXISTS profiles (
    user_id uuid,
    last_updated_ts timestamp,
    attributes map<text, text>,
    PRIMARY KEY (user_id, last_updated_ts)
) WITH CLUSTERING ORDER BY (last_updated_ts DESC);

这里的map<text, text>提供了足够的灵活性来存储非结构化的用户标签和属性。

2. Go 绞杀者代理 (Strangler Proxy)

这是架构的枢纽。一个生产级的代理不仅要能路由,还要包含配置化、日志、健康检查和优雅停机。

// File: apps/strangler-proxy-go/main.go
package main

import (
	"context"
	"log"
	"net/http"
	"net/http/httputil"
	"net/url"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"

	"github.com/kelseyhightower/envconfig"
)

// Config holds configuration for the proxy.
// In a real project, this would be more sophisticated, possibly loaded from a file or config service.
type Config struct {
	ListenAddr      string `envconfig:"LISTEN_ADDR" default:":8080"`
	LegacyTargetURL string `envconfig:"LEGACY_TARGET_URL" required:"true"`
	NewTargetURL    string `envconfig:"NEW_TARGET_URL" required:"true"`
	// A simple feature flag mechanism. e.g., "0.1" means 10% of users go to the new service.
	// "shadow" means all requests go to legacy, and are copied to new service.
	// "1.0" means all requests go to new service.
	MigrationMode string `envconfig:"MIGRATION_MODE" default:"legacy"`
}

func main() {
	var cfg Config
	err := envconfig.Process("", &cfg)
	if err != nil {
		log.Fatalf("Failed to process config: %v", err)
	}

	legacyURL, err := url.Parse(cfg.LegacyTargetURL)
	if err != nil {
		log.Fatalf("Invalid legacy target URL: %v", err)
	}
	newURL, err := url.Parse(cfg.NewTargetURL)
	if err != nil {
		log.Fatalf("Invalid new target URL: %v", err)
	}

	legacyProxy := httputil.NewSingleHostReverseProxy(legacyURL)
	newProxy := httputil.NewSingleHostReverseProxy(newURL)

	mux := http.NewServeMux()
	mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// The core routing logic. In a real system, this would be much more complex,
		// potentially based on user ID hashes, headers, or a dynamic configuration system.
		// Here, we focus on the profile update path.
		if strings.HasPrefix(r.URL.Path, "/api/v1/profile/update") {
			log.Printf("Routing profile update with mode: %s", cfg.MigrationMode)
			
			switch cfg.MigrationMode {
			case "shadow":
				// Shadowing: send a copy to the new service but ignore its response.
				// This is critical for testing the new service with production traffic
				// without impacting users.
				go shadowRequest(r, newProxy)
				legacyProxy.ServeHTTP(w, r)
			case "migrate":
				// For simplicity, we use a header for canary routing.
				// A real implementation would use something more robust like user ID hashing.
				if r.Header.Get("X-Canary-User") == "true" {
					log.Printf("Canary request routed to new service for %s", r.RemoteAddr)
					newProxy.ServeHTTP(w, r)
				} else {
					legacyProxy.ServeHTTP(w, r)
				}
			case "full":
				newProxy.ServeHTTP(w, r)
			default: // "legacy" mode
				legacyProxy.ServeHTTP(w, r)
			}
			return
		}

		// All other requests go to the legacy system.
		legacyProxy.ServeHTTP(w, r)
	})
	
	server := &http.Server{
		Addr:    cfg.ListenAddr,
		Handler: mux,
	}

	// Graceful shutdown logic
	go func() {
		log.Printf("Starting proxy server on %s", cfg.ListenAddr)
		if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Could not listen on %s: %v\n", cfg.ListenAddr, err)
		}
	}()

	stop := make(chan os.Signal, 1)
	signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
	<-stop

	log.Println("Shutting down server...")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	if err := server.Shutdown(ctx); err != nil {
		log.Fatalf("Server forced to shutdown: %v", err)
	}
	log.Println("Server gracefully stopped")
}

// shadowRequest creates a new request object to avoid data races and sends it to the target.
func shadowRequest(r *http.Request, target http.Handler) {
    // We must clone the request body because it's a stream that can only be read once.
	bodyBytes, err := io.ReadAll(r.Body)
	if err != nil {
		log.Printf("[Shadow] Failed to read request body: %v", err)
		return
	}
	r.Body.Close() // Close the original body
	r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // Restore it for the primary request handler

	shadowReq := r.Clone(context.Background())
	shadowReq.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // Provide a fresh body for the shadow request
	
    // The ResponseWriter for a shadow request is a "noop" writer, as we don't care about the response.
	noopWriter := &noopResponseWriter{}
	target.ServeHTTP(noopWriter, shadowReq)
	log.Printf("[Shadow] Shadow request to %s completed with status %d", shadowReq.URL.Host, noopWriter.statusCode)
}

type noopResponseWriter struct {
	header http.Header
	statusCode int
}

func (n *noopResponseWriter) Header() http.Header {
	if n.header == nil {
		n.header = make(http.Header)
	}
	return n.header
}
func (n *noopResponseWriter) Write(b []byte) (int, error) { return len(b), nil }
func (n *noopResponseWriter) WriteHeader(statusCode int) { n.statusCode = statusCode }

这个Go代理已经具备了生产环境的基本要素:配置化路由、影子流量能力和优雅停机。

3. Rust 高性能写入服务 (Profile Writer)

现在是核心的写入服务。我们使用Rocket框架和官方的scylla-rust-driver

Cargo.toml:

# File: apps/profile-writer-rust/Cargo.toml
[package]
name = "profile-writer-rust"
version = "0.1.0"
edition = "2021"

[dependencies]
rocket = { version = "0.5.0", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
scylla = "0.10.0"
tokio = { version = "1", features = ["full"] }
uuid = { version = "1.3", features = ["v4", "serde"] }
chrono = "0.4"
dotenv = "0.15"
config = "0.13"

main.rs:

```rust
// File: apps/profile-writer-rust/src/main.rs
#[macro_use]
extern crate rocket;

use rocket::serde::json::Json;
use rocket::{State, http::Status};
use scylla::{Session, SessionBuilder};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use uuid::Uuid;

// Data structure for the incoming request payload.
// Using Serde for automatic deserialization and validation.
#[derive(Deserialize)]
struct ProfileUpdateRequest {
user_id: Uuid,
attributes: HashMap<String, String>,
}

// Represents the application state, primarily the ScyllaDB session.
// Arc is used for thread-safe sharing of the session across Rocket’s worker threads.
struct AppState {
db_session: Arc,
}

// The core API endpoint.
#[post(“/api/v2/profile/update”, format = “json”, data = ““)]
async fn update_profile(
state: &State,
request: Json,
) -> Result<(), (Status, String)> {
let session = state.db_session.clone();
let user_id = request.user_id;
let attributes = &request.attributes;

// In a real application, we might use a prepared statement for performance.
// Prepared statements are parsed and cached by the database, reducing overhead on subsequent executions.
let cql_query = "INSERT INTO user_profiles.profiles (user_id, last_updated_ts, attributes) VALUES (?, toTimestamp(now()), ?)";

// The driver is fully async. We `.await` the result.
match session.query(cql_query, (user_id, attributes)).await {
    Ok(_) => {
        // Log successful insertion. In a real system, you'd use a structured logger.
        println!("Successfully updated profile for user {}", user_id);
        Ok(())
    },
    Err(e) => {
        // Proper error handling is crucial.
        // Here we log the error and return a 500 status to the client.
        eprintln!("Failed to write to ScyllaDB for user {}: {}", user_id, e);
        Err((Status::InternalServerError, format!("Database error: {}", e)))
    }
}

}

// This endpoint is essential for load balancers and orchestration systems like Kubernetes.
#[get(“/healthz”)]
fn health_check() -> Status {
Status::Ok
}

#[launch]
async fn rocket() -> _ {
// Load configuration from environment variables or a config file.
// A robust configuration setup is vital for production.
dotenv::dotenv().ok();
let db_nodes = env::var(“SCYLLA_NODES”).expect(“SCYLLA_NODES must be set”);

// Establishing a connection to the ScyllaDB cluster.
// This session is created once at startup and reused for all requests.
let session: Session = match SessionBuilder::new()
    .known_nodes(db_nodes.split(','))
    .build()
    .await
{
    Ok(session) => session,
    Err(e) => panic!("Failed to connect to ScyllaDB: {}", e),
};

println!("ScyllaDB session established.");

let app_state = AppState {
    db_session: Arc::new(session),
};

// Configure and launch the Rocket server.
rocket::build()
    .mount("/", routes

  目录