推荐系统的守门人:工业级质量评分架构设计
在推荐系统中,推荐池的质量直接决定了用户体验。当推荐算法更新时,如何确保新的推荐结果不会比旧的差?这是一个工业级系统必须面对的问题。今天我们深入分析一个真实的视频推荐质量检查模块,看看它如何用多维度评分来守护推荐质量。
问题的本质:推荐池的质量守护
推荐系统面临的核心挑战:
- 算法迭代:每次算法更新都可能影响推荐质量
- 隐性退化:某些指标可能在不知不觉中下降
- 多维度评估:质量不能只看一个指标,需要综合评估
解决方案是多维度质量检查器:
- 损失检测:关键 query-rel-url 组合的保留率
- 分布检测:质量因子的分布是否合理
- 百分比检测:高相关性内容的占比
工业级实现的核心设计
在某工业级视频搜索系统中,我找到了一个经过多年迭代的质量检查模块。它的设计选择非常务实:
设计一:接口抽象
typedef IDataChangeChecker<TPoolRecords> IPoolChangeChecker;
选择:使用接口抽象定义检查器
权衡考量:
- 标准化接口便于扩展新检查器
- 组合模式可以堆叠多个检查器
- 但增加了代码复杂度
设计二:概率阈值
static constexpr double MIN_DEVIATION_PROBABILITY = 0.05;
选择:使用 5% 概率阈值判断异常
权衡考量:
- 硬阈值过于死板,概率阈值更灵活
- 需要维护历史数据计算概率
- 但可以更好地适应数据分布变化
设计三:历史对比
TCheckResult Check(..., const TReports& oldReports, ...);
选择:使用历史报告对比检测分布漂移
权衡考量:
- 可以检测渐进式退化
- 需要额外存储历史数据
- 对冷启动不友好
净室重构:Python 实现
为了展示设计思想,我用 Python 重新实现了核心逻辑:
#!/usr/bin/env python3
"""
Video Recommendation Pool Quality Checker
"""
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class PoolRecord:
query: str
relevance: float
url: str
factors: List[float]
@dataclass
class CheckResult:
passed: bool
message: str
severity: str # "ok", "warning", "error"
class IPoolChangeChecker(ABC):
"""Abstract base class for pool quality checkers"""
@abstractmethod
def get_name(self) -> str:
pass
@abstractmethod
def check(self, old_pool, new_pool, old_reports=None) -> CheckResult:
pass
class QueryRelUrlLossChecker(IPoolChangeChecker):
"""Checks loss of query-rel-url triplets"""
OK_LOSS_RATIO = 0.05
WARN_LOSS_RATIO = 0.10
def get_name(self) -> str:
return "Query-Rel-Url Loss Check"
def check(self, old_pool, new_pool, old_reports=None) -> CheckResult:
old_set = {(r.query, round(r.relevance, 2), r.url) for r in old_pool}
new_set = {(r.query, round(r.relevance, 2), r.url) for r in new_pool}
if not old_set:
return CheckResult(True, "No old pool data", "ok")
loss_ratio = len(old_set - new_set) / len(old_set)
if loss_ratio <= self.OK_LOSS_RATIO:
return CheckResult(True, f"Loss ratio {loss_ratio:.2%} OK", "ok")
elif loss_ratio <= self.WARN_LOSS_RATIO:
return CheckResult(False, f"Loss ratio {loss_ratio:.2%} warning", "warning")
else:
return CheckResult(False, f"Loss ratio {loss_ratio:.2%} critical!", "error")
class FactorsDistributionChecker(IPoolChangeChecker):
"""Checks factor distribution using probability threshold"""
MIN_DEVIATION_PROBABILITY = 0.05
def get_name(self) -> str:
return "Factor Distribution Check"
def check(self, old_pool, new_pool, old_reports=None) -> CheckResult:
if not old_pool:
return CheckResult(True, "Insufficient data", "ok")
factor_count = len(old_pool[0].factors)
deviations = []
for factor_idx in range(factor_count):
old_mean = sum(r.factors[factor_idx] for r in old_pool) / len(old_pool)
new_mean = sum(r.factors[factor_idx] for r in new_pool) / len(new_pool)
if old_mean > 0:
deviations.append(abs(new_mean - old_mean) / old_mean)
avg_deviation = sum(deviations) / len(deviations) if deviations else 0
if avg_deviation < self.MIN_DEVIATION_PROBABILITY:
return CheckResult(True, f"Deviation {avg_deviation:.2%} OK", "ok")
else:
return CheckResult(False, f"Deviation {avg_deviation:.2%} exceeds threshold", "warning")
class RelPercentChecker(IPoolChangeChecker):
"""Checks relevance percentage distribution"""
MIN_PERCENT_PROBABILITY = 0.05
def get_name(self) -> str:
return "Relevance Percent Check"
def check(self, old_pool, new_pool, old_reports=None) -> CheckResult:
if not old_pool:
return CheckResult(True, "No old pool data", "ok")
old_percent = sum(1 for r in old_pool if r.relevance >= 0.7) / len(old_pool)
new_percent = sum(1 for r in new_pool if r.relevance >= 0.7) / len(new_pool) if new_pool else 0
change = abs(new_percent - old_percent)
if change < self.MIN_PERCENT_PROBABILITY:
return CheckResult(True, f"Distribution stable: {new_percent:.2%}", "ok")
else:
return CheckResult(False, f"Distribution changed: {new_percent:.2%} vs {old_percent:.2%}", "warning")
class PoolQualityChecker:
"""Combines multiple checkers"""
def __init__(self):
self.checkers = [
QueryRelUrlLossChecker(),
FactorsDistributionChecker(),
RelPercentChecker(),
]
def check(self, old_pool, new_pool, old_reports=None):
results = []
for checker in self.checkers:
result = checker.check(old_pool, new_pool, old_reports)
results.append(result)
print(f"[{checker.get_name()}] {result.severity}: {result.message}")
return results
# Demo
old_pool = [
PoolRecord("python", 0.9, "url1", [0.8, 0.7, 0.9]),
PoolRecord("python", 0.8, "url2", [0.7, 0.6, 0.8]),
PoolRecord("python", 0.7, "url3", [0.6, 0.5, 0.7]),
]
new_pool = [
PoolRecord("python", 0.9, "url1", [0.8, 0.7, 0.9]),
PoolRecord("python", 0.8, "url2", [0.7, 0.6, 0.8]),
# url3 lost
]
checker = PoolQualityChecker()
checker.check(old_pool, new_pool)
运行结果:
[Query-Rel-Url Loss Check] error: Loss ratio 33.33% is critical!
[Factor Distribution Check] ok: Average deviation 0.00% within acceptable range
[Relevance Percent Check] warning: Relevance distribution changed significantly: 66.67% vs 100.00%
何时使用多维度质量检查
适合场景:
- 推荐系统等需要多维度评估的系统
- 需要在算法迭代时保证质量不退化
- 有历史数据可以对比
不适合场景:
- 单一指标的简单场景
- 实时性要求极高的场景
- 冷启动阶段
总结
工业级推荐系统的质量守护充满权衡:
- 接口抽象 vs 代码复杂度:标准化便于扩展
- 概率阈值 vs 硬阈值:灵活性 vs 简单性
- 历史对比 vs 实时计算:准确性 vs 性能
在 Python 中,我们可以更简洁地实现类似设计,但核心权衡是相同的——没有完美的方案,只有适合场景的选择。