传统的WAF日志审计,本质上是一场关键词的猫鼠游戏。我们用 grep、awk 或者在SIEM里用 Lucene 语法,试图从海量的日志中匹配已知的攻击指纹。这种方式对于发现“.../etc/passwd”或者“' OR 1=1 --”这类经典攻击向量是有效的,但面对经过混淆、编码或利用业务逻辑漏洞的新型攻击时,往往力不从心。当攻击者用一个从未见过的payload绕过规则时,基于关键词的搜索就成了摆设。
我们团队遇到的实际痛点是,一次应急响应中,发现了一个利用特定JSON结构进行反序列化攻击的payload。我们想知道,过去几个月,是否存在过手法类似但payload不同的攻击尝试?用关键词根本无法回答这个问题。我们需要的是一种能够理解“攻击意图”的搜索,即语义搜索。
这个困境促使我们探索一种新的日志分析范式。我们的初步构想是:如果能将每一条WAF日志转换成一个能代表其“攻击性”和“攻击模式”的数学向量,那么“寻找类似攻击”的问题就转化为“在向量空间中寻找最近邻”的问题。这套架构的核心组件需要解决几个问题:日志的高效收集、向量化处理、向量存储与检索,以及一个可用的分析前端。
技术选型决策
在真实项目中,技术选型从来不是只看“先进性”,而是成本、维护性、开发效率和生态的综合考量。
日志聚合: Loki
我们没有选择ELK栈。原因很简单:对于WAF日志这种结构相对简单但量级巨大的数据,Elasticsearch的倒排索引和全文搜索能力有些过度设计,成本也更高。Loki的设计哲学——“只索引元数据,不索引内容”——非常适合这个场景。它轻量、成本低,与我们已有的Prometheus/Grafana监控体系能无缝集成。我们需要的是一个可靠的“日志原始档案库”,Loki恰好满足。向量数据库: Weaviate
向量数据库选型我们对比了几个方案。Faiss虽然性能极致,但它只是一个库,需要大量工程来构建一个完整的服务。Milvus功能强大但架构相对复杂。我们最终选择了Weaviate,主要看中它的几个特点:- Docker化部署: 启动和维护极其简单,对中小型团队友好。
- 内置模块化: 支持集成多种向量化模型(如Hugging Face),甚至可以做到数据入库时自动向量化,虽然我们为了更精细的控制选择自己处理向量化。
- GraphQL API: 查询接口友好,支持标量过滤和向量搜索的混合查询,这对我们未来可能的扩展(例如,按时间范围、IP地址过滤后再进行向量搜索)至关重要。
后端与处理核心: FastAPI
Python是AI/ML领域的首选语言,有大量的库可以用来生成向量嵌入(Embeddings)。FastAPI因其高性能(基于Starlette和Pydantic)、异步支持以及自动化的API文档生成,成为了我们的不二之选。在我们的场景里,FastAPI需要执行IO密集型任务(从Loki拉日志,向Weaviate写数据,响应API请求),其异步特性能够发挥巨大优势。前端界面: React + CSS Modules
我们需要一个简单的内部工具界面给安全分析师使用。React是团队最熟悉的前端框架。这里的关键是样式管理。为了避免全局CSS污染并保证组件的可维护性,我们决定采用CSS Modules。它能确保每个组件的样式都拥有局部作用域,对于这种需要快速迭代、功能单一的内部工具来说,这是一个非常务实且稳健的选择。
架构与数据流
整个系统的核心数据流清晰直接,我们用Mermaid来描述它:
graph TD
subgraph "外部流量"
A[用户请求] --> B{WAF / ModSecurity}
end
subgraph "日志收集"
B -- "原始日志" --> C[Promtail Agent]
C -- "附加标签" --> D[Loki]
end
subgraph "核心处理管道 (FastAPI Service)"
E[后台轮询任务] -- "1. 定期查询" --> D
E -- "2. 新增日志" --> F[日志预处理与特征提取]
F -- "3. 清洗后文本" --> G[SentenceTransformer模型]
G -- "4. 生成向量" --> H[数据封装]
H -- "5. 批量写入" --> I[Weaviate]
end
subgraph "分析与溯源"
J[安全分析师] --> K[React前端界面]
K -- "6. 发起相似性查询" --> L[FastAPI API Endpoint]
L -- "7. 查询文本向量化" --> G
L -- "8. 执行向量搜索" --> I
I -- "9. 返回相似日志" --> L
L -- "10. 响应结果" --> K
K -- "11. 展示结果" --> J
end
style B fill:#f9f,stroke:#333,stroke-width:2px
style D fill:#f9f,stroke:#333,stroke-width:2px
style I fill:#f9f,stroke:#333,stroke-width:2px
步骤化实现
1. 基础设施搭建 (Loki & Weaviate)
我们使用 docker-compose 来管理这些基础设施,这对于开发和后续部署都极为方便。
docker-compose.yml:
version: '3.8'
services:
loki:
image: grafana/loki:2.8.0
container_name: loki_service
ports:
- "3100:3100"
command: -config.file=/etc/loki/local-config.yaml
networks:
- monitoring
promtail:
image: grafana/promtail:2.8.0
container_name: promtail_agent
volumes:
- ./waf_logs:/var/log/waf
- ./promtail-config.yml:/etc/promtail/config.yml
command: -config.file=/etc/promtail/config.yml
networks:
- monitoring
depends_on:
- loki
weaviate:
image: cr.weaviate.io/semitechnologies/weaviate:1.20.0
container_name: weaviate_db
ports:
- "8080:8080"
restart: on-failure:0
environment:
QUERY_DEFAULTS_LIMIT: 25
AUTHENTICATION_ANONYMOUS_ACCESS_ENABLED: 'true'
PERSISTENCE_DATA_PATH: '/var/lib/weaviate'
DEFAULT_VECTORIZER_MODULE: 'none' # 我们自己生成向量
ENABLE_MODULES: ''
CLUSTER_HOSTNAME: 'node1'
volumes:
- ./weaviate_data:/var/lib/weaviate
networks:
- monitoring
networks:
monitoring:
driver: bridge
Promtail的配置是关键,它负责读取WAF日志文件,并打上标签。标签(如job, hostname)是Loki中唯一被索引的数据。
promtail-config.yml:
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki:3100/loki/api/v1/push
scrape_configs:
- job_name: waf
static_configs:
- targets:
- localhost
labels:
job: waf_logs
__path__: /var/log/waf/*.log
2. FastAPI 核心服务
这是整个系统的“大脑”。我们将其拆分为几个部分:数据同步、API服务、向量化逻辑。
项目结构:
.
├── app
│ ├── __init__.py
│ ├── main.py # FastAPI应用入口
│ ├── config.py # 配置管理
│ ├── schemas.py # Pydantic模型
│ └── services
│ ├── __init__.py
│ ├── loki_client.py # Loki交互逻辑
│ ├── vectorizer.py # 向量化服务
│ └── weaviate_client.py # Weaviate交互逻辑
├── Dockerfile
└── requirements.txt
配置 (app/config.py):
# app/config.py
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
# Loki configuration
LOKI_API_URL: str = os.getenv("LOKI_API_URL", "http://loki_service:3100")
# Weaviate configuration
WEAVIATE_URL: str = os.getenv("WEAVIATE_URL", "http://weaviate_db:8080")
WEAVIATE_CLASS_NAME: str = "WAFLog"
# Vectorizer configuration
# 使用一个轻量级且效果不错的中文/多语言模型
EMBEDDING_MODEL: str = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
# Background task settings
SYNC_INTERVAL_SECONDS: int = 60
LOG_FETCH_LIMIT: int = 1000
settings = Settings()
Weaviate 客户端与 Schema 定义 (app/services/weaviate_client.py):
# app/services/weaviate_client.py
import weaviate
import logging
from tenacity import retry, stop_after_attempt, wait_fixed
from app.config import settings
logger = logging.getLogger(__name__)
class WeaviateClient:
def __init__(self, url: str):
self.client = weaviate.Client(url)
self.class_name = settings.WEAVIATE_CLASS_NAME
@retry(stop=stop_after_attempt(5), wait=wait_fixed(2))
def ensure_schema(self):
"""确保Weaviate中的集合(Class)存在,如果不存在则创建。"""
if self.client.schema.exists(self.class_name):
logger.info(f"Schema '{self.class_name}' already exists.")
return
waf_log_schema = {
"class": self.class_name,
"description": "Stores WAF log entries and their vector embeddings",
"vectorizer": "none", # 明确告知Weaviate我们自己提供向量
"properties": [
{
"name": "content",
"dataType": ["text"],
"description": "The raw content of the WAF log",
},
{
"name": "timestamp",
"dataType": ["date"],
"description": "Timestamp of the log entry",
},
{
"name": "source_ip",
"dataType": ["string"],
"description": "Source IP address from the log",
},
],
}
self.client.schema.create_class(waf_log_schema)
logger.info(f"Schema '{self.class_name}' created successfully.")
def batch_insert(self, logs: list[dict]):
"""批量插入日志及其向量"""
if not logs:
return
with self.client.batch as batch:
batch.batch_size = 100 # 根据机器性能调整
for log_data in logs:
properties = {
"content": log_data["content"],
"timestamp": log_data["timestamp"],
"source_ip": log_data["source_ip"],
}
batch.add_data_object(
data_object=properties,
class_name=self.class_name,
vector=log_data["vector"]
)
logger.info(f"Successfully inserted {len(logs)} log entries into Weaviate.")
def search_similar(self, vector: list[float], limit: int = 10) -> list[dict]:
"""根据向量搜索相似的日志"""
near_vector = {"vector": vector}
# 这里的字段列表是需要从Weaviate返回的元数据
properties_to_return = ["content", "timestamp", "source_ip", "_additional {distance}"]
result = (
self.client.query
.get(self.class_name, properties_to_return)
.with_near_vector(near_vector)
.with_limit(limit)
.do()
)
return result.get("data", {}).get("Get", {}).get(self.class_name, [])
# 在应用启动时实例化的单例
weaviate_client = WeaviateClient(settings.WEAVIATE_URL)
后台同步任务 (app/main.py):
我们需要一个后台任务来周期性地从Loki拉取新日志,进行处理后存入Weaviate。fastapi-utils 的 RepeatedTask 是一个简单的实现,对于生产环境,可能会考虑使用 Celery 或 ARQ。
# app/main.py (部分代码)
import asyncio
import logging
from fastapi import FastAPI, HTTPException
from fastapi_utils.tasks import repeat_every
from app.config import settings
from app.services.loki_client import loki_client
from app.services.vectorizer import vectorizer
from app.services.weaviate_client import weaviate_client
from app.schemas import SearchQuery, SearchResult
# ... (日志配置)
app = FastAPI(title="WAF Log Semantic Search Service")
@app.on_event("startup")
async def startup_event():
"""应用启动时执行初始化"""
logging.info("Initializing application...")
weaviate_client.ensure_schema()
# 预加载模型,避免第一次请求时耗时过长
vectorizer.load_model()
# 启动后台同步任务
asyncio.create_task(sync_logs_task())
logging.info("Application initialization complete.")
async def sync_logs_task():
"""包装同步任务以便在后台运行"""
@repeat_every(seconds=settings.SYNC_INTERVAL_SECONDS, wait_first=True, logger=logging)
async def repeated_sync() -> None:
try:
# 1. 从Loki获取新日志
new_logs = await loki_client.fetch_new_logs()
if not new_logs:
logging.info("No new logs found in Loki.")
return
# 2. 对日志进行向量化
contents = [log['content'] for log in new_logs]
vectors = vectorizer.embed_batch(contents)
# 3. 准备写入Weaviate的数据
data_to_insert = []
for log, vector in zip(new_logs, vectors):
data_to_insert.append({**log, "vector": vector})
# 4. 批量写入
weaviate_client.batch_insert(data_to_insert)
except Exception as e:
logging.error(f"Error during log sync: {e}", exc_info=True)
# API Endpoint for searching
@app.post("/api/v1/search", response_model=list[SearchResult])
async def search_similar_logs(query: SearchQuery):
"""接收查询文本,返回语义相似的WAF日志"""
if not query.text.strip():
raise HTTPException(status_code=400, detail="Query text cannot be empty.")
try:
# 1. 将查询文本向量化
query_vector = vectorizer.embed(query.text)
# 2. 在Weaviate中执行向量搜索
results = weaviate_client.search_similar(query_vector, limit=query.limit)
# 3. 格式化返回结果
response_data = []
for item in results:
response_data.append(
SearchResult(
content=item.get('content'),
timestamp=item.get('timestamp'),
source_ip=item.get('source_ip'),
distance=item.get('_additional', {}).get('distance')
)
)
return response_data
except Exception as e:
logging.error(f"Error during search: {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error during search.")
单元测试思路:
- 对
loki_client,可以mockhttpx客户端,测试它是否能正确构造Loki API的URL和参数,并能解析返回的JSON。 - 对
vectorizer,可以测试embed方法对于给定的输入是否返回了正确维度的向量。 - 对
weaviate_client,可以mockweaviate.Client,验证batch_insert和search_similar方法是否用正确的参数调用了底层库。 - 对API endpoint,使用
TestClient测试各种输入(正常、空、超长)的响应状态码和内容。
3. 前端分析界面 (React + CSS Modules)
我们只需要一个简单的界面:一个输入框,一个搜索按钮,一个结果列表。CSS Modules的用法是关键。
组件文件 ThreatSearch.js:
// src/components/ThreatSearch.js
import React, { useState } from 'react';
import styles from './ThreatSearch.module.css'; // 关键:导入模块化CSS
const API_ENDPOINT = '/api/v1/search';
function ThreatSearch() {
const [query, setQuery] = useState('');
const [results, setResults] = useState([]);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState(null);
const handleSearch = async () => {
if (!query.trim()) return;
setIsLoading(true);
setError(null);
setResults([]);
try {
const response = await fetch(API_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ text: query, limit: 15 }),
});
if (!response.ok) {
throw new Error(`API error: ${response.status} ${response.statusText}`);
}
const data = await response.json();
setResults(data);
} catch (err) {
setError(err.message);
} finally {
setIsLoading(false);
}
};
return (
<div className={styles.searchContainer}>
<h1 className={styles.title}>WAF日志语义溯源</h1>
<div className={styles.inputGroup}>
<textarea
className={styles.queryInput}
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="粘贴一条可疑日志或描述一种攻击模式..."
/>
<button
className={styles.searchButton}
onClick={handleSearch}
disabled={isLoading}
>
{isLoading ? '搜索中...' : '查找相似威胁'}
</button>
</div>
{error && <div className={styles.error}>{error}</div>}
<div className={styles.resultsArea}>
{results.length > 0 && (
<ul className={styles.resultsList}>
{results.map((result, index) => (
<li key={index} className={styles.resultItem}>
<div className={styles.resultMeta}>
<span><strong>时间:</strong> {new Date(result.timestamp).toLocaleString()}</span>
<span><strong>源IP:</strong> {result.source_ip}</span>
<span className={styles.distance}><strong>距离:</strong> {result.distance.toFixed(4)}</span>
</div>
<pre className={styles.resultContent}>{result.content}</pre>
</li>
))}
</ul>
)}
</div>
</div>
);
}
export default ThreatSearch;
对应的CSS模块文件 ThreatSearch.module.css:
/* src/components/ThreatSearch.module.css */
/* 每个类名都会被编译成一个唯一的哈希值,如 ThreatSearch_searchContainer__aB3d_ */
.searchContainer {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
max-width: 900px;
margin: 40px auto;
padding: 20px;
background-color: #f7f9fc;
border-radius: 8px;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.08);
}
.title {
color: #2c3e50;
text-align: center;
margin-bottom: 24px;
}
.inputGroup {
display: flex;
gap: 10px;
}
.queryInput {
flex-grow: 1;
min-height: 80px;
padding: 12px;
border: 1px solid #dcdfe6;
border-radius: 4px;
font-size: 14px;
resize: vertical;
}
.searchButton {
padding: 10px 20px;
background-color: #3498db;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
font-size: 16px;
transition: background-color 0.2s;
}
.searchButton:disabled {
background-color: #a9cce3;
cursor: not-allowed;
}
.resultItem {
background-color: #ffffff;
border: 1px solid #e4e7ed;
padding: 15px;
margin-bottom: 10px;
border-radius: 4px;
list-style-type: none;
}
.resultMeta {
display: flex;
justify-content: space-between;
font-size: 12px;
color: #606266;
margin-bottom: 8px;
padding-bottom: 8px;
border-bottom: 1px dashed #e4e7ed;
}
.distance {
font-weight: bold;
color: #e67e22;
}
.resultContent {
white-space: pre-wrap;
word-break: break-all;
background-color: #ecf0f1;
padding: 10px;
border-radius: 3px;
color: #34495e;
font-family: 'Courier New', Courier, monospace;
}
通过这种方式,我们完全不用担心这个组件的样式会影响到应用的其他部分。
方案的局限性与未来迭代
这个方案虽然解决了我们最初的问题,但在真实生产环境中,它并非银弹。
首先,向量模型的质量决定了系统的上限。我们使用的 paraphrase-multilingual-MiniLM-L12-v2 是一个通用的语义模型,对于理解SQL注入、XSS等攻击语法的“意图”有一定效果,但它并非为此专门训练。一个更优的路径是,在大量已标注的攻击样本上,对一个基础模型进行微调(fine-tuning),使其更能捕捉安全领域的语义。
其次,处理流程是批量的,非实时。repeat_every 任务引入了分钟级的延迟。对于需要亚秒级响应的实时阻断场景,这个架构无能为力。向实时架构演进,需要将Loki的拉取模式改为推送模式,例如WAF直接将日志写入Kafka这类消息队列,FastAPI服务变为一个流式处理消费者。
再者,可扩展性问题。当日志量达到每日数十亿条时,单个FastAPI实例的向量化处理能力会成为瓶颈。届时,需要将向量化任务拆分为一个独立的、可水平扩展的服务集群,甚至使用Spark或Ray这类分布式计算框架来完成。Weaviate本身可以通过分片来扩展,但这也会带来运维复杂度的提升。
最后,这个系统只提供了“溯源”能力,它本身不产生告警。未来的一个方向是,可以实现自动化的聚类分析。例如,定期扫描新入库的向量,如果发现一个与所有已知攻击簇都相距甚远的新向量簇,这可能就标志着一种新型的、未知的攻击活动,可以主动产生告警。这就从一个被动的分析工具,向一个主动的威胁发现系统迈进了一步。