基于向量检索的WAF日志语义分析与威胁溯源管道实现


传统的WAF日志审计,本质上是一场关键词的猫鼠游戏。我们用 grepawk 或者在SIEM里用 Lucene 语法,试图从海量的日志中匹配已知的攻击指纹。这种方式对于发现“.../etc/passwd”或者“' OR 1=1 --”这类经典攻击向量是有效的,但面对经过混淆、编码或利用业务逻辑漏洞的新型攻击时,往往力不从心。当攻击者用一个从未见过的payload绕过规则时,基于关键词的搜索就成了摆设。

我们团队遇到的实际痛点是,一次应急响应中,发现了一个利用特定JSON结构进行反序列化攻击的payload。我们想知道,过去几个月,是否存在过手法类似但payload不同的攻击尝试?用关键词根本无法回答这个问题。我们需要的是一种能够理解“攻击意图”的搜索,即语义搜索。

这个困境促使我们探索一种新的日志分析范式。我们的初步构想是:如果能将每一条WAF日志转换成一个能代表其“攻击性”和“攻击模式”的数学向量,那么“寻找类似攻击”的问题就转化为“在向量空间中寻找最近邻”的问题。这套架构的核心组件需要解决几个问题:日志的高效收集、向量化处理、向量存储与检索,以及一个可用的分析前端。

技术选型决策

在真实项目中,技术选型从来不是只看“先进性”,而是成本、维护性、开发效率和生态的综合考量。

  1. 日志聚合: Loki
    我们没有选择ELK栈。原因很简单:对于WAF日志这种结构相对简单但量级巨大的数据,Elasticsearch的倒排索引和全文搜索能力有些过度设计,成本也更高。Loki的设计哲学——“只索引元数据,不索引内容”——非常适合这个场景。它轻量、成本低,与我们已有的Prometheus/Grafana监控体系能无缝集成。我们需要的是一个可靠的“日志原始档案库”,Loki恰好满足。

  2. 向量数据库: Weaviate
    向量数据库选型我们对比了几个方案。Faiss虽然性能极致,但它只是一个库,需要大量工程来构建一个完整的服务。Milvus功能强大但架构相对复杂。我们最终选择了Weaviate,主要看中它的几个特点:

    • Docker化部署: 启动和维护极其简单,对中小型团队友好。
    • 内置模块化: 支持集成多种向量化模型(如Hugging Face),甚至可以做到数据入库时自动向量化,虽然我们为了更精细的控制选择自己处理向量化。
    • GraphQL API: 查询接口友好,支持标量过滤和向量搜索的混合查询,这对我们未来可能的扩展(例如,按时间范围、IP地址过滤后再进行向量搜索)至关重要。
  3. 后端与处理核心: FastAPI
    Python是AI/ML领域的首选语言,有大量的库可以用来生成向量嵌入(Embeddings)。FastAPI因其高性能(基于Starlette和Pydantic)、异步支持以及自动化的API文档生成,成为了我们的不二之选。在我们的场景里,FastAPI需要执行IO密集型任务(从Loki拉日志,向Weaviate写数据,响应API请求),其异步特性能够发挥巨大优势。

  4. 前端界面: React + CSS Modules
    我们需要一个简单的内部工具界面给安全分析师使用。React是团队最熟悉的前端框架。这里的关键是样式管理。为了避免全局CSS污染并保证组件的可维护性,我们决定采用CSS Modules。它能确保每个组件的样式都拥有局部作用域,对于这种需要快速迭代、功能单一的内部工具来说,这是一个非常务实且稳健的选择。

架构与数据流

整个系统的核心数据流清晰直接,我们用Mermaid来描述它:

graph TD
    subgraph "外部流量"
        A[用户请求] --> B{WAF / ModSecurity}
    end

    subgraph "日志收集"
        B -- "原始日志" --> C[Promtail Agent]
        C -- "附加标签" --> D[Loki]
    end

    subgraph "核心处理管道 (FastAPI Service)"
        E[后台轮询任务] -- "1. 定期查询" --> D
        E -- "2. 新增日志" --> F[日志预处理与特征提取]
        F -- "3. 清洗后文本" --> G[SentenceTransformer模型]
        G -- "4. 生成向量" --> H[数据封装]
        H -- "5. 批量写入" --> I[Weaviate]
    end

    subgraph "分析与溯源"
        J[安全分析师] --> K[React前端界面]
        K -- "6. 发起相似性查询" --> L[FastAPI API Endpoint]
        L -- "7. 查询文本向量化" --> G
        L -- "8. 执行向量搜索" --> I
        I -- "9. 返回相似日志" --> L
        L -- "10. 响应结果" --> K
        K -- "11. 展示结果" --> J
    end

    style B fill:#f9f,stroke:#333,stroke-width:2px
    style D fill:#f9f,stroke:#333,stroke-width:2px
    style I fill:#f9f,stroke:#333,stroke-width:2px

步骤化实现

1. 基础设施搭建 (Loki & Weaviate)

我们使用 docker-compose 来管理这些基础设施,这对于开发和后续部署都极为方便。

docker-compose.yml:

version: '3.8'

services:
  loki:
    image: grafana/loki:2.8.0
    container_name: loki_service
    ports:
      - "3100:3100"
    command: -config.file=/etc/loki/local-config.yaml
    networks:
      - monitoring

  promtail:
    image: grafana/promtail:2.8.0
    container_name: promtail_agent
    volumes:
      - ./waf_logs:/var/log/waf
      - ./promtail-config.yml:/etc/promtail/config.yml
    command: -config.file=/etc/promtail/config.yml
    networks:
      - monitoring
    depends_on:
      - loki

  weaviate:
    image: cr.weaviate.io/semitechnologies/weaviate:1.20.0
    container_name: weaviate_db
    ports:
      - "8080:8080"
    restart: on-failure:0
    environment:
      QUERY_DEFAULTS_LIMIT: 25
      AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
      PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
      DEFAULT_VECTORIZER_MODULE: 'none' # 我们自己生成向量
      ENABLE_MODULES: ''
      CLUSTER_HOSTNAME: 'node1'
    volumes:
      - ./weaviate_data:/var/lib/weaviate
    networks:
      - monitoring

networks:
  monitoring:
    driver: bridge

Promtail的配置是关键,它负责读取WAF日志文件,并打上标签。标签(如job, hostname)是Loki中唯一被索引的数据。

promtail-config.yml:

server:
  http_listen_port: 9080
  grpc_listen_port: 0

positions:
  filename: /tmp/positions.yaml

clients:
  - url: http://loki:3100/loki/api/v1/push

scrape_configs:
  - job_name: waf
    static_configs:
      - targets:
          - localhost
        labels:
          job: waf_logs
          __path__: /var/log/waf/*.log

2. FastAPI 核心服务

这是整个系统的“大脑”。我们将其拆分为几个部分:数据同步、API服务、向量化逻辑。

项目结构:

.
├── app
│   ├── __init__.py
│   ├── main.py         # FastAPI应用入口
│   ├── config.py       # 配置管理
│   ├── schemas.py      # Pydantic模型
│   └── services
│       ├── __init__.py
│       ├── loki_client.py   # Loki交互逻辑
│       ├── vectorizer.py    # 向量化服务
│       └── weaviate_client.py # Weaviate交互逻辑
├── Dockerfile
└── requirements.txt

配置 (app/config.py):

# app/config.py
import os

from pydantic import BaseSettings

class Settings(BaseSettings):
    # Loki configuration
    LOKI_API_URL: str = os.getenv("LOKI_API_URL", "http://loki_service:3100")

    # Weaviate configuration
    WEAVIATE_URL: str = os.getenv("WEAVIATE_URL", "http://weaviate_db:8080")
    WEAVIATE_CLASS_NAME: str = "WAFLog"

    # Vectorizer configuration
    # 使用一个轻量级且效果不错的中文/多语言模型
    EMBEDDING_MODEL: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"

    # Background task settings
    SYNC_INTERVAL_SECONDS: int = 60
    LOG_FETCH_LIMIT: int = 1000

settings = Settings()

Weaviate 客户端与 Schema 定义 (app/services/weaviate_client.py):

# app/services/weaviate_client.py
import weaviate
import logging
from tenacity import retry, stop_after_attempt, wait_fixed

from app.config import settings

logger = logging.getLogger(__name__)

class WeaviateClient:
    def __init__(self, url: str):
        self.client = weaviate.Client(url)
        self.class_name = settings.WEAVIATE_CLASS_NAME

    @retry(stop=stop_after_attempt(5), wait=wait_fixed(2))
    def ensure_schema(self):
        """确保Weaviate中的集合(Class)存在,如果不存在则创建。"""
        if self.client.schema.exists(self.class_name):
            logger.info(f"Schema '{self.class_name}' already exists.")
            return

        waf_log_schema = {
            "class": self.class_name,
            "description": "Stores WAF log entries and their vector embeddings",
            "vectorizer": "none", # 明确告知Weaviate我们自己提供向量
            "properties": [
                {
                    "name": "content",
                    "dataType": ["text"],
                    "description": "The raw content of the WAF log",
                },
                {
                    "name": "timestamp",
                    "dataType": ["date"],
                    "description": "Timestamp of the log entry",
                },
                {
                    "name": "source_ip",
                    "dataType": ["string"],
                    "description": "Source IP address from the log",
                },
            ],
        }
        self.client.schema.create_class(waf_log_schema)
        logger.info(f"Schema '{self.class_name}' created successfully.")

    def batch_insert(self, logs: list[dict]):
        """批量插入日志及其向量"""
        if not logs:
            return

        with self.client.batch as batch:
            batch.batch_size = 100 # 根据机器性能调整
            for log_data in logs:
                properties = {
                    "content": log_data["content"],
                    "timestamp": log_data["timestamp"],
                    "source_ip": log_data["source_ip"],
                }
                batch.add_data_object(
                    data_object=properties,
                    class_name=self.class_name,
                    vector=log_data["vector"]
                )
        logger.info(f"Successfully inserted {len(logs)} log entries into Weaviate.")

    def search_similar(self, vector: list[float], limit: int = 10) -> list[dict]:
        """根据向量搜索相似的日志"""
        near_vector = {"vector": vector}
        
        # 这里的字段列表是需要从Weaviate返回的元数据
        properties_to_return = ["content", "timestamp", "source_ip", "_additional {distance}"]

        result = (
            self.client.query
            .get(self.class_name, properties_to_return)
            .with_near_vector(near_vector)
            .with_limit(limit)
            .do()
        )
        return result.get("data", {}).get("Get", {}).get(self.class_name, [])

# 在应用启动时实例化的单例
weaviate_client = WeaviateClient(settings.WEAVIATE_URL)

后台同步任务 (app/main.py):
我们需要一个后台任务来周期性地从Loki拉取新日志,进行处理后存入Weaviate。fastapi-utilsRepeatedTask 是一个简单的实现,对于生产环境,可能会考虑使用 CeleryARQ

# app/main.py (部分代码)
import asyncio
import logging
from fastapi import FastAPI, HTTPException
from fastapi_utils.tasks import repeat_every

from app.config import settings
from app.services.loki_client import loki_client
from app.services.vectorizer import vectorizer
from app.services.weaviate_client import weaviate_client
from app.schemas import SearchQuery, SearchResult

# ... (日志配置)

app = FastAPI(title="WAF Log Semantic Search Service")

@app.on_event("startup")
async def startup_event():
    """应用启动时执行初始化"""
    logging.info("Initializing application...")
    weaviate_client.ensure_schema()
    # 预加载模型,避免第一次请求时耗时过长
    vectorizer.load_model()
    # 启动后台同步任务
    asyncio.create_task(sync_logs_task())
    logging.info("Application initialization complete.")

async def sync_logs_task():
    """包装同步任务以便在后台运行"""
    @repeat_every(seconds=settings.SYNC_INTERVAL_SECONDS, wait_first=True, logger=logging)
    async def repeated_sync() -> None:
        try:
            # 1. 从Loki获取新日志
            new_logs = await loki_client.fetch_new_logs()
            if not new_logs:
                logging.info("No new logs found in Loki.")
                return

            # 2. 对日志进行向量化
            contents = [log['content'] for log in new_logs]
            vectors = vectorizer.embed_batch(contents)

            # 3. 准备写入Weaviate的数据
            data_to_insert = []
            for log, vector in zip(new_logs, vectors):
                data_to_insert.append({**log, "vector": vector})

            # 4. 批量写入
            weaviate_client.batch_insert(data_to_insert)

        except Exception as e:
            logging.error(f"Error during log sync: {e}", exc_info=True)

# API Endpoint for searching
@app.post("/api/v1/search", response_model=list[SearchResult])
async def search_similar_logs(query: SearchQuery):
    """接收查询文本,返回语义相似的WAF日志"""
    if not query.text.strip():
        raise HTTPException(status_code=400, detail="Query text cannot be empty.")
    
    try:
        # 1. 将查询文本向量化
        query_vector = vectorizer.embed(query.text)
        
        # 2. 在Weaviate中执行向量搜索
        results = weaviate_client.search_similar(query_vector, limit=query.limit)
        
        # 3. 格式化返回结果
        response_data = []
        for item in results:
            response_data.append(
                SearchResult(
                    content=item.get('content'),
                    timestamp=item.get('timestamp'),
                    source_ip=item.get('source_ip'),
                    distance=item.get('_additional', {}).get('distance')
                )
            )
        return response_data
    except Exception as e:
        logging.error(f"Error during search: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Internal server error during search.")

单元测试思路:

  • loki_client,可以 mock httpx 客户端,测试它是否能正确构造Loki API的URL和参数,并能解析返回的JSON。
  • vectorizer,可以测试 embed 方法对于给定的输入是否返回了正确维度的向量。
  • weaviate_client,可以 mock weaviate.Client,验证 batch_insertsearch_similar 方法是否用正确的参数调用了底层库。
  • 对API endpoint,使用 TestClient 测试各种输入(正常、空、超长)的响应状态码和内容。

3. 前端分析界面 (React + CSS Modules)

我们只需要一个简单的界面:一个输入框,一个搜索按钮,一个结果列表。CSS Modules的用法是关键。

组件文件 ThreatSearch.js:

// src/components/ThreatSearch.js
import React, { useState } from 'react';
import styles from './ThreatSearch.module.css'; // 关键:导入模块化CSS

const API_ENDPOINT = '/api/v1/search';

function ThreatSearch() {
  const [query, setQuery] = useState('');
  const [results, setResults] = useState([]);
  const [isLoading, setIsLoading] = useState(false);
  const [error, setError] = useState(null);

  const handleSearch = async () => {
    if (!query.trim()) return;

    setIsLoading(true);
    setError(null);
    setResults([]);

    try {
      const response = await fetch(API_ENDPOINT, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ text: query, limit: 15 }),
      });

      if (!response.ok) {
        throw new Error(`API error: ${response.status} ${response.statusText}`);
      }

      const data = await response.json();
      setResults(data);
    } catch (err) {
      setError(err.message);
    } finally {
      setIsLoading(false);
    }
  };

  return (
    <div className={styles.searchContainer}>
      <h1 className={styles.title}>WAF日志语义溯源</h1>
      <div className={styles.inputGroup}>
        <textarea
          className={styles.queryInput}
          value={query}
          onChange={(e) => setQuery(e.target.value)}
          placeholder="粘贴一条可疑日志或描述一种攻击模式..."
        />
        <button
          className={styles.searchButton}
          onClick={handleSearch}
          disabled={isLoading}
        >
          {isLoading ? '搜索中...' : '查找相似威胁'}
        </button>
      </div>

      {error && <div className={styles.error}>{error}</div>}

      <div className={styles.resultsArea}>
        {results.length > 0 && (
          <ul className={styles.resultsList}>
            {results.map((result, index) => (
              <li key={index} className={styles.resultItem}>
                <div className={styles.resultMeta}>
                  <span><strong>时间:</strong> {new Date(result.timestamp).toLocaleString()}</span>
                  <span><strong>源IP:</strong> {result.source_ip}</span>
                  <span className={styles.distance}><strong>距离:</strong> {result.distance.toFixed(4)}</span>
                </div>
                <pre className={styles.resultContent}>{result.content}</pre>
              </li>
            ))}
          </ul>
        )}
      </div>
    </div>
  );
}

export default ThreatSearch;

对应的CSS模块文件 ThreatSearch.module.css:

/* src/components/ThreatSearch.module.css */
/* 每个类名都会被编译成一个唯一的哈希值,如 ThreatSearch_searchContainer__aB3d_ */
.searchContainer {
  font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
  max-width: 900px;
  margin: 40px auto;
  padding: 20px;
  background-color: #f7f9fc;
  border-radius: 8px;
  box-shadow: 0 4px 12px rgba(0, 0, 0, 0.08);
}

.title {
  color: #2c3e50;
  text-align: center;
  margin-bottom: 24px;
}

.inputGroup {
  display: flex;
  gap: 10px;
}

.queryInput {
  flex-grow: 1;
  min-height: 80px;
  padding: 12px;
  border: 1px solid #dcdfe6;
  border-radius: 4px;
  font-size: 14px;
  resize: vertical;
}

.searchButton {
  padding: 10px 20px;
  background-color: #3498db;
  color: white;
  border: none;
  border-radius: 4px;
  cursor: pointer;
  font-size: 16px;
  transition: background-color 0.2s;
}

.searchButton:disabled {
  background-color: #a9cce3;
  cursor: not-allowed;
}

.resultItem {
  background-color: #ffffff;
  border: 1px solid #e4e7ed;
  padding: 15px;
  margin-bottom: 10px;
  border-radius: 4px;
  list-style-type: none;
}

.resultMeta {
  display: flex;
  justify-content: space-between;
  font-size: 12px;
  color: #606266;
  margin-bottom: 8px;
  padding-bottom: 8px;
  border-bottom: 1px dashed #e4e7ed;
}

.distance {
  font-weight: bold;
  color: #e67e22;
}

.resultContent {
  white-space: pre-wrap;
  word-break: break-all;
  background-color: #ecf0f1;
  padding: 10px;
  border-radius: 3px;
  color: #34495e;
  font-family: 'Courier New', Courier, monospace;
}

通过这种方式,我们完全不用担心这个组件的样式会影响到应用的其他部分。

方案的局限性与未来迭代

这个方案虽然解决了我们最初的问题,但在真实生产环境中,它并非银弹。

首先,向量模型的质量决定了系统的上限。我们使用的 paraphrase-multilingual-MiniLM-L12-v2 是一个通用的语义模型,对于理解SQL注入、XSS等攻击语法的“意图”有一定效果,但它并非为此专门训练。一个更优的路径是,在大量已标注的攻击样本上,对一个基础模型进行微调(fine-tuning),使其更能捕捉安全领域的语义。

其次,处理流程是批量的,非实时repeat_every 任务引入了分钟级的延迟。对于需要亚秒级响应的实时阻断场景,这个架构无能为力。向实时架构演进,需要将Loki的拉取模式改为推送模式,例如WAF直接将日志写入Kafka这类消息队列,FastAPI服务变为一个流式处理消费者。

再者,可扩展性问题。当日志量达到每日数十亿条时,单个FastAPI实例的向量化处理能力会成为瓶颈。届时,需要将向量化任务拆分为一个独立的、可水平扩展的服务集群,甚至使用Spark或Ray这类分布式计算框架来完成。Weaviate本身可以通过分片来扩展,但这也会带来运维复杂度的提升。

最后,这个系统只提供了“溯源”能力,它本身不产生告警。未来的一个方向是,可以实现自动化的聚类分析。例如,定期扫描新入库的向量,如果发现一个与所有已知攻击簇都相距甚远的新向量簇,这可能就标志着一种新型的、未知的攻击活动,可以主动产生告警。这就从一个被动的分析工具,向一个主动的威胁发现系统迈进了一步。


  目录