集成Playwright、Zipkin与WebAssembly构建高保真综合监控管道


问题的起点很明确:前端E2E测试失败了,但我们不知道是哪个后端服务拖慢了整个响应。Playwright的截图和视频记录了用户看到的失败,而Zipkin的链路追踪图谱展示了后端微服务间的调用延迟。这两者之间存在一条巨大的鸿沟。用户体验的失败无法直接归因于后端具体的某个服务瓶颈,因为来自E2E测试的HTTP请求在进入我们的追踪系统时,已经是一个全新的、匿名的请求。

在真实项目中,这种断裂的可见性是不可接受的。我们需要一条完整的、从模拟用户操作开始,贯穿整个分布式系统的调用链。这意味着Playwright发起的每一次浏览器导航、每一次API请求,都必须携带一个统一的追踪上下文,让后端的Zipkin能够将其与后续的数据库查询、RPC调用串联起来。

初步的构想是利用追踪上下文传播标准,如B3 Propagation。如果能在Playwright脚本执行时生成一个父Trace ID,并将其注入到由浏览器实例发起的所有HTTP请求头中,理论上就可以实现这个目标。

第一步:搭建可追踪的后端服务环境

为了验证这个方案,需要一个最小化的微服务环境。这里我们用Go语言构建两个服务:一个api-gateway和一个user-serviceapi-gateway接收外部请求,然后通过HTTP调用user-service。同时,引入一个标准的Zipkin实例来收集和展示追踪数据。

docker-compose.yml是启动这一切的骨架。

version: '3.8'

services:
  zipkin:
    image: openzipkin/zipkin:2
    container_name: zipkin
    ports:
      - "9411:9411"

  api-gateway:
    build:
      context: ./api-gateway
    container_name: api-gateway
    ports:
      - "8080:8080"
    environment:
      - ZIPKIN_ENDPOINT=http://zipkin:9411/api/v2/spans
      - USER_SERVICE_URL=http://user-service:8081
    depends_on:
      - zipkin
      - user-service

  user-service:
    build:
      context: ./user-service
    container_name: user-service
    ports:
      - "8081:8081"
    environment:
      - ZIPKIN_ENDPOINT=http://zipkin:9411/api/v2/spans
    depends_on:
      - zipkin

接下来是api-gateway的核心代码。这里的关键是引入OpenTelemetry的SDK和Zipkin的exporter,并使用otelhttp中间件自动包裹http.Handler,从而实现追踪数据的生成和传播。

api-gateway/main.go:

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"time"

	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/exporters/zipkin"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/sdk/resource"
	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
)

// initTracer 初始化并注册一个全局的TracerProvider
func initTracer(serviceName string, zipkinEndpoint string) (*sdktrace.TracerProvider, error) {
	exporter, err := zipkin.New(zipkinEndpoint, zipkin.WithLogger(log.New(os.Stderr, "zipkin-reporter", log.LstdFlags)))
	if err != nil {
		return nil, err
	}

	res, err := resource.Merge(
		resource.Default(),
		resource.NewWithAttributes(
			semconv.SchemaURL,
			semconv.ServiceName(serviceName),
		),
	)
	if err != nil {
		return nil, err
	}

	tp := sdktrace.NewTracerProvider(
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(res),
	)
	otel.SetTracerProvider(tp)
	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}, propagation.B3{}))

	return tp, nil
}

func main() {
	zipkinEndpoint := os.Getenv("ZIPKIN_ENDPOINT")
	if zipkinEndpoint == "" {
		log.Fatal("ZIPKIN_ENDPOINT environment variable not set")
	}

	tp, err := initTracer("api-gateway", zipkinEndpoint)
	if err != nil {
		log.Fatalf("failed to initialize tracer provider: %v", err)
	}
	defer func() {
		if err := tp.Shutdown(context.Background()); err != nil {
			log.Printf("Error shutting down tracer provider: %v", err)
		}
	}()

	userServiceURL := os.Getenv("USER_SERVICE_URL")
	if userServiceURL == "" {
		log.Fatal("USER_SERVICE_URL environment variable not set")
	}

	// 使用otelhttp包装默认的http.Client,使其能够自动传播追踪上下文
	client := &http.Client{
		Transport: otelhttp.NewTransport(http.DefaultTransport),
	}

	handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		// 从请求中提取父span,并创建一个新的span
		ctx, span := otel.Tracer("gateway-tracer").Start(r.Context(), "handle-get-user")
		defer span.End()

		req, _ := http.NewRequestWithContext(ctx, "GET", userServiceURL+"/user/123", nil)
		
		// client.Do会因为Transport被包装而自动注入B3 headers
		res, err := client.Do(req)
		if err != nil {
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		defer res.Body.Close()

		w.WriteHeader(res.StatusCode)
		w.Write([]byte("Request processed by gateway and user-service"))
	})

	// 使用otelhttp包装Handler,使其能够自动处理入口请求的追踪
	wrappedHandler := otelhttp.NewHandler(handler, "http-server")

	log.Println("API Gateway listening on :8080")
	if err := http.ListenAndServe(":8080", wrappedHandler); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

user-service的代码结构几乎一样,只是它是一个纯粹的被调用方,它接收请求、处理并返回。它的initTracer函数是完全相同的,这保证了整个技术栈的追踪实现是统一的。

第二步:从Playwright注入追踪上下文

这是整个方案的核心。我们需要在Playwright测试脚本中,为每一个测试用例(或每一个逻辑用户操作)生成唯一的追踪ID,并在浏览器发起请求时将其作为HTTP Header附加。B3多Header格式(X-B3-TraceId, X-B3-SpanId, X-B3-Sampled)是Zipkin事实上的标准。

安装必要的依赖:

npm install --save-dev playwright @types/node

一个典型的Playwright测试脚本如下:
e2e/user-journey.spec.ts

import { test, expect, Page } from '@playwright/test';
import { randomBytes } from 'crypto';

// 生成B3兼容的Trace ID (16个字节的十六进制字符串)
const generateTraceId = (): string => randomBytes(16).toString('hex');

// 生成B3兼容的Span ID (8个字节的十六进制字符串)
const generateSpanId = (): string => randomBytes(8).toString('hex');

// 创建一个注入追踪上下文的辅助函数
// 这里的关键是使用 page.setExtraHTTPHeaders
// 它会在该Page实例发出的所有HTTP请求中都添加指定的头部
async function withTracing(page: Page, testFunction: () => Promise<void>) {
  const traceId = generateTraceId();
  const spanId = generateSpanId();
  const sampled = '1'; // '1'表示采样,'0'表示不采样

  console.log(`[Tracing] Starting test with TraceId: ${traceId}`);

  // 这是一个Playwright的关键API,它为页面上的所有后续请求设置额外的HTTP头
  await page.setExtraHTTPHeaders({
    'X-B3-TraceId': traceId,
    'X-B3-SpanId': spanId,
    'X-B3-Sampled': sampled,
    // 如果下游服务需要,还可以添加 X-B3-ParentSpanId
  });

  try {
    await testFunction();
  } finally {
    // 测试结束后,最好清空这些头,避免影响同一个worker中的下一个测试文件
    await page.setExtraHTTPHeaders({});
  }
}

test.describe('User Profile Journey with Distributed Tracing', () => {
  test('should load user data successfully', async ({ page }) => {
    // 将我们的测试逻辑包装在 withTracing 函数中
    await withTracing(page, async () => {
      // 这里的 navigation 请求和后续的 fetch/XHR 请求
      // 都会自动带上我们设置的 B3 headers
      const response = await page.goto('http://localhost:8080/api/user');

      // 断言确保页面加载成功
      expect(response?.ok()).toBeTruthy();
      
      const body = await response?.text();
      expect(body).toContain('Request processed by gateway and user-service');

      // 在这里可以添加更多的用户交互和断言
      // 比如点击按钮、填写表单等
      // 所有这些操作触发的API请求都会被追踪
      console.log(`[Tracing] Test completed. Check Zipkin for trace.`);
    });
  });
});

现在,当我们运行 npx playwright test 时,Playwright会启动浏览器,执行page.gotopage.setExtraHTTPHeaders确保了这个请求的HTTP头部包含了X-B3-TraceId等信息。api-gatewayotelhttp中间件会自动识别这些头部,并创建一个以Playwright生成的spanIdParentSpanId的新的Span。当api-gateway调用user-service时,它会继续传播这个追踪上下文。

最终,在Zipkin的UI中,我们不再是看到孤立的后端调用。我们会看到一个完整的链路,它的根Span(Root Span)就是由Playwright发起的那个GET /api/user请求,下面清晰地挂着api-gatewayuser-service内部的执行跨度。问题解决了,可见性鸿沟被填平。

第三步:应对海量追踪数据的挑战

方案成功后,我们将其集成到了CI/CD流水线中,每个PR都会触发几十个E2E测试用例。问题随之而来:Zipkin接收到的追踪数据量暴增。绝大多数的追踪都是成功的、低延迟的,对于排错而言是“噪音”。我们真正关心的是那些执行时间超长的、或者包含错误标签的链路。

在所有服务中都配置采样率是一种办法,但过于粗暴。它可能会把我们恰好需要的那条错误链路给丢弃掉。我们需要一个更智能的过滤机制——一个位于服务和Zipkin Collector之间的“预处理器”,它能根据链路的特征(如耗时、错误状态)来决定是否将其丢给后端存储。

这个预处理器必须具备极高的性能,因为它不能成为新的瓶颈。它的部署也必须足够轻量。这正是WebAssembly (WASM) 的用武之地。我们可以用Rust编写一个高性能的过滤逻辑,然后编译成WASM模块。这个WASM模块可以运行在各种环境中,比如一个轻量的代理服务,一个Serverless函数,甚至一个支持WASI的边缘节点。

我们来定义这个过滤逻辑:

  1. 解析传入的Zipkin V2 JSON格式的Span列表。
  2. 检查整个trace(所有具有相同traceId的span)中是否有任何一个span包含"error": true标签。
  3. 计算整个trace的总耗时(最后一个span的timestamp + duration 减去根span的timestamp)。
  4. 如果trace包含错误,或者总耗时超过一个预设的阈值(比如500ms),则将该trace的所有span转发出去。否则,直接丢弃。

用Rust实现这个逻辑:
trace-filter/src/lib.rs

use wasm_bindgen::prelude::*;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
struct Span {
    trace_id: String,
    id: String,
    name: Option<String>,
    parent_id: Option<String>,
    timestamp: Option<u64>,
    duration: Option<u64>,
    tags: Option<HashMap<String, String>>,
}

// wasm-bindgen宏将这个函数暴露给JavaScript宿主环境
#[wasm_bindgen]
pub fn filter_traces(spans_json: &str, duration_threshold_micros: u64) -> String {
    let spans: Result<Vec<Span>, _> = serde_json::from_str(spans_json);
    let mut filtered_spans: Vec<Span> = Vec::new();

    match spans {
        Ok(spans) => {
            // 按traceId对span进行分组
            let mut traces: HashMap<String, Vec<Span>> = HashMap::new();
            for span in spans {
                traces.entry(span.trace_id.clone()).or_default().push(span);
            }

            for (_, trace_spans) in traces {
                let mut has_error = false;
                let mut root_span_timestamp: u64 = u64::MAX;
                let mut max_end_timestamp: u64 = 0;

                for span in &trace_spans {
                    // 检查是否存在错误标签
                    if let Some(tags) = &span.tags {
                        if tags.get("error").map_or(false, |v| v == "true") {
                            has_error = true;
                        }
                    }

                    // 寻找根span的时间戳和整个trace的最晚结束时间
                    if let Some(ts) = span.timestamp {
                        if span.parent_id.is_none() {
                            root_span_timestamp = root_span_timestamp.min(ts);
                        }
                        let end_ts = ts + span.duration.unwrap_or(0);
                        max_end_timestamp = max_end_timestamp.max(end_ts);
                    }
                }
                
                let total_duration = if root_span_timestamp != u64::MAX && max_end_timestamp > root_span_timestamp {
                    max_end_timestamp - root_span_timestamp
                } else {
                    0
                };

                // 判断是否应该保留这个trace
                if has_error || total_duration > duration_threshold_micros {
                    filtered_spans.extend(trace_spans);
                }
            }
        }
        Err(_) => {
            // 如果JSON解析失败,可以选择返回空或者错误信息
            return "[]".to_string();
        }
    }
    
    // 将过滤后的spans序列化回JSON字符串
    serde_json::to_string(&filtered_spans).unwrap_or_else(|_| "[]".to_string())
}

使用wasm-pack build --target nodejs命令可以将其编译成一个Node.js可以调用的WASM包。然后我们可以用一个简单的Node.js HTTP服务器来承载这个WASM模块,作为我们的智能代理。

这个架构的Mermaid图如下:

sequenceDiagram
    participant P as Playwright Runner
    participant B as Browser
    participant GW as API Gateway
    participant US as User Service
    participant F as WASM Filter Proxy
    participant Z as Zipkin

    P->>B: page.goto("http://...") with B3 Headers
    B->>GW: GET /api/user (Headers: X-B3-TraceId, ...)
    GW->>US: GET /user/123 (propagates B3 headers)
    US-->>GW: 200 OK
    GW-->>B: 200 OK
    B-->>P: Response
    
    Note right of US: Services send spans asynchronously
    GW->>F: POST /api/v2/spans (Span Data)
    US->>F: POST /api/v2/spans (Span Data)
    
    F->>F: WASM module filters spans
    alt Trace is slow or has errors
        F->>Z: POST /api/v2/spans (Forward filtered spans)
    else Trace is normal
        F-->>F: Discard spans
    end

第四步:用Prettier保障代码一致性

现在,这个项目包含Go后端代码、TypeScript测试代码、Rust的WASM代码,以及YAML配置文件。一个常见的错误是,不同语言和框架的混合会导致代码风格的混乱,增加代码审查和维护的成本。

Prettier是解决这个问题的标准工具。通过一个.prettierrc.json配置文件和一系列插件,我们可以对项目中的绝大部分代码进行统一的格式化。

.prettierrc.json:

{
  "semi": true,
  "singleQuote": true,
  "trailingComma": "all",
  "arrowParens": "always",
  "printWidth": 80,
  "tabWidth": 2,
  "overrides": [
    {
      "files": "*.go",
      "options": {
        "parser": "go" 
      }
    },
    {
      "files": "*.rs",
      "options": {
        "parser": "rust" 
      }
    }
  ]
}

注:Prettier对Go和Rust的支持可能需要外部插件或通过prettier-plugin-exec调用gofmtrustfmt来实现。这里的配置是一个概念示意。

package.json中加入脚本:

"scripts": {
  "format": "prettier --write \"**/*.{ts,js,json,yml,md}\"",
  "check-format": "prettier --check \"**/*.{ts,js,json,yml,md}\""
}

check-format命令加入CI流程,可以强制所有提交的代码都符合统一的规范。这看似是一个小事,但在一个由多种技术栈构成的复杂系统中,这种一致性是维持长期可维护性的基石。它降低了认知负荷,让工程师可以专注于业务逻辑本身,而不是代码风格的细枝末节。

方案的局限与未来路径

这个方案有效地将前端综合监控与后端分布式追踪连接起来,并通过WASM实现了高性能的边缘过滤。但它并非没有局限性。

首先,基于入口的过滤(在我们这个例子里是WASM代理)无法实现真正的“尾部采样”。尾部采样要求在收集到一条trace的所有span之后再做采样决策。我们的WASM过滤器虽然是按traceId分组处理,但它依赖于一个时间窗口内接收到所有相关span。对于一个耗时非常长、span分散在很长时间窗口里的trace,它可能会被错误地拆分处理。一个更健壮的实现需要一个流式处理引擎,如Flink,或者一个专门为此设计的采集器,如OpenTelemetry Collector及其tail_sampling处理器。

其次,WASM模块的逻辑目前是硬编码的。一个生产级的系统需要支持动态配置过滤阈值、规则,甚至支持通过远程配置热更新过滤逻辑,而无需重新部署整个代理。

未来的迭代方向可能包括:将WASM过滤器迁移到OpenTelemetry Collector的自定义处理器中,以利用其成熟的管道模型和生态系统;或者进一步探索使用eBPF技术在内核层面无侵入地捕获和关联请求,实现更低开销的上下文传播。


  目录