总体分析

两阶段推荐系统架构

1
召回层 (Recall) → 排序层 (Ranking) → 重排层 (Reranking)

召回策略

  • ItemCF召回:基于物品相似度的协同过滤
    • 相似度计算:改进的余弦相似度 + 时间衰减
    • 优点:个性化强,能发现长尾物品
  • 热度召回:基于时间窗口的热门推荐
    • 24小时时间窗口过滤
    • 优点:实时性强,覆盖热门内容

排序模型

  • 模型类型:LightGBM排序模型(LGBMRanker)
  • 训练方式:Pairwise排序学习
  • 特征工程
    • 用户特征:设备、地区、OS等
    • 物品特征:类别、字数、创建时间
    • 交叉特征:时间差等

技术亮点

  • 负采样策略解决类别不平衡
  • 时间衰减因子提高推荐实时性
  • MRR指标评估推荐质量
  • 多路召回融合提高覆盖率

评估指标

  • MRR(Mean Reciprocal Rank):衡量推荐排名的质量
  • 召回率:衡量召回阶段的效果
  • 特征重要性:分析模型决策依据

代码解读

导入库和基础设置

实现功能:

  1. 导入库:包括系统库和数据分析库(pandas,numpy),还有机器学习库
  2. 忽略所有警告(需要import warnings)
  3. 设置路径
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 导入系统库
import collections
import gc
import math
import os
import pickle
import random
import sys
import time
import warnings
from collections import defaultdict
from datetime import datetime
from operator import itemgetter

# 导入数据科学和机器学习库
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from tqdm import tqdm # 进度条库

warnings.filterwarnings('ignore') # 忽略所有警告信息

# 设置路径
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # 获取项目根目录
data_path = os.path.join(BASE_DIR, 'data/') # 数据路径
save_path = os.path.join(BASE_DIR, 'output/') # 输出路径

数据加载

1
2
3
4
5
6
7
# 加载训练集和测试集A,并合并
train_click = pd.read_csv(data_path + 'train_click_log.csv')
testA_click = pd.read_csv(data_path + 'testA_click_log.csv')
train_click = pd.concat([train_click, testA_click]) # 将测试集A用于训练

test_click = pd.read_csv(data_path + 'testB_click_log.csv') # 真正的测试集
articles = pd.read_csv(data_path + 'articles.csv') # 文章元数据

train_click:train的20w用户训练集+testA的5w用户测试集(总共为25w用户训练)

test_click:testB的5w用户测试集(用于真正的测试)

articles:还没弄懂干什么……

基础工具函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_all_click_df(train=True, test=True):
"""
获取全量点击数据,并进行去重处理

参数:
train: 是否包含训练集
test: 是否包含测试集

返回:
去重后的完整点击数据
"""
if train:
all_click = train_click.copy()
if test:
all_click = test_click.copy()
if train and test:
trn_click = train_click.copy()
tst_click = test_click.copy()
all_click = trn_click.append(tst_click) # 合并所有数据

# 基于用户、文章和时间戳进行去重
all_click = all_click.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))
return all_click

ItemCF协同过滤召回

其中包含

  • def get_past_click()

  • def get_user_item_time(click_df)

    • def make_item_time_pair(df)
  • def itemcf_sim(df)

  • def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def itemcf_recall(topk=10):
"""
基于物品的协同过滤召回算法

核心思想: 如果用户喜欢物品A,也可能会喜欢与A相似的物品B

参数:
topk: 每个用户召回的物品数量
"""
ts = time.time() # 记录开始时间

def get_past_click():
"""
分离用户的历史点击和最后一次点击

用于构建训练集和验证集
训练时使用历史点击预测最后一次点击
"""
train = train_click.sort_values(['user_id', 'click_timestamp']).reset_index().copy()
list1 = [] # 存储用户最后一次点击
train_indexs = [] # 存储最后一次点击的索引

# 遍历训练集用户
for user_id in tqdm(train['user_id'].unique()):
user = train[train['user_id'] == user_id]
row = user.tail(1) # 获取最后一次点击
train_indexs.append(row.index.values[0])

# 过滤掉只点击了一次的用户(测试集A中的噪声用户)
if len(user) >= 2:
list1.append(row.values.tolist()[0])

# 创建训练集最后一次点击DataFrame
train_last_click = pd.DataFrame(list1, columns=['index', 'user_id', 'article_id', 'click_timestamp',
'click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type'])
train_last_click = train_last_click.drop(columns=['index'])

# 获取训练集历史点击(排除最后一次)
train_past_clicks = train[~train.index.isin(train_indexs)]
train_past_clicks = train_past_clicks.drop(columns=['index'])

# 处理测试集
test = test_click.sort_values(['user_id', 'click_timestamp']).reset_index().copy()
list2 = []
for user_id in tqdm(test['user_id'].unique()):
user = test[test['user_id'] == user_id]
row = user.tail(1) # 获取最后一次点击
list2.append(row.values.tolist()[0])

test_last_click = pd.DataFrame(list2, columns=['index', 'user_id', 'article_id', 'click_timestamp',
'click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type'])
test_last_click = test_last_click.drop(columns=['index'])

# 合并所有历史点击数据(用于计算相似度)
all_click_df = pd.concat([train_past_clicks, test_click])
all_click_df = all_click_df.reset_index().drop(columns=['index'])
all_click_df = all_click_df.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

return all_click_df, train_past_clicks, train_last_click, test_last_click

def get_user_item_time(click_df):
"""
构建用户-物品-时间的字典结构

返回格式: {user1: [(item1, time1), (item2, time2)...], ...}

参数:
click_df: 点击数据DataFrame
"""
click_df = click_df.sort_values('click_timestamp')

def make_item_time_pair(df):
"""将用户的点击记录转换为(物品ID, 时间戳)元组列表"""
return list(zip(df['click_article_id'], df['click_timestamp']))

# 按用户分组,构建物品-时间列表
user_item_time_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']] \
.apply(lambda x: make_item_time_pair(x)) \
.reset_index().rename(columns={0: 'item_time_list'})

# 转换为字典
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))

return user_item_time_dict

def itemcf_sim(df):
"""
计算物品间的协同过滤相似度

使用改进的相似度计算公式:
1. 考虑时间衰减:1/log(len(item_list)+1)
2. 归一化:余弦相似度
"""
user_item_time_dict = get_user_item_time(df)

# 初始化相似度矩阵
i2i_sim = {}
item_cnt = defaultdict(int) # 记录每个物品的点击次数

# 遍历所有用户的点击历史
for user, item_time_list in tqdm(user_item_time_dict.items()):
# 遍历用户点击的每个物品
for i, i_click_time in item_time_list:
item_cnt[i] += 1
i2i_sim.setdefault(i, {})

# 计算物品i与其他物品的相似度
for j, j_click_time in item_time_list:
if(i == j):
continue
i2i_sim[i].setdefault(j, 0)

# 相似度累加,加入时间衰减因子
i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

# 归一化相似度(余弦相似度)
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

# 保存相似度矩阵到本地
pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))

return i2i_sim_


def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num):
"""
基于物品相似度进行推荐

步骤:
1. 获取用户历史点击的物品
2. 对每个历史物品,获取其相似物品
3. 累加相似度得分
4. 排序并返回topN

参数:
user_id: 用户ID
user_item_time_dict: 用户-物品-时间字典
i2i_sim: 物品相似度矩阵
sim_item_topk: 每个物品考虑的相似物品数
recall_item_num: 召回物品数量
"""
# 获取用户历史点击的物品
user_hist_items = user_item_time_dict[user_id]
user_hist_items_ = {user_id for user_id, _ in user_hist_items} # 转换为集合便于查询

item_rank = {}
# 遍历用户的历史点击物品
for loc, (i, click_time) in enumerate(user_hist_items):
# 获取物品i的topk相似物品
for j, wij in i2i_sim[i][:sim_item_topk]:
if j in user_hist_items_: # 排除用户已经点击过的物品
continue
item_rank.setdefault(j, 0)
item_rank[j] += wij # 累加相似度得分

# 按得分排序,取前recall_item_num个
item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]

return item_rank

# 获取处理后的数据
all_click_df, train_past_clicks, train_last_click, test_last_click = get_past_click()

# 计算物品相似度矩阵
i2i_sim = itemcf_sim(all_click_df)

# 开始召回流程
user_recall_items_dict = collections.defaultdict(dict) # 存储召回结果

# 获取用户-物品-时间字典
user_item_time_dict = get_user_item_time(all_click_df)

# 加载相似度矩阵
i2i_sim = pickle.load(open(save_path + 'itemcf_i2i_sim.pkl', 'rb'))

# 对每个物品的相似度列表进行排序
for i in tqdm(i2i_sim.keys()):
i2i_sim[i] = sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)

# 设置参数
sim_item_topk = topk # 每个物品考虑的相似物品数
recall_item_num = topk # 召回数量

# 为每个用户生成召回结果
for user in tqdm(all_click_df['user_id'].unique()):
user_recall_items_dict[user] = item_based_recommend(user, user_item_time_dict, i2i_sim,
sim_item_topk, recall_item_num)

# 将召回结果转换为DataFrame格式
user_item_score_list = []
for user, items in tqdm(user_recall_items_dict.items()):
for item, score in items:
user_item_score_list.append([user, item, score])

recall_df = pd.DataFrame(user_item_score_list, columns=['user_id', 'click_article_id', 'pred_score'])
recall_df.to_csv(save_path + 'recall_df.csv', index=False) # 保存召回结果

# 分离训练集和测试集的召回结果
tst_recall = recall_df[recall_df['user_id'].isin(test_last_click['user_id'].unique())]
train_recall = recall_df[recall_df['user_id'].isin(train_last_click['user_id'].unique())]

# 处理测试集召回
test_recall = tst_recall.copy()
test_recall = test_recall.sort_values(by=['user_id', 'pred_score'])
test_recall = test_recall.drop(columns=['pred_score']) # 移除预测分数
test_recall.to_csv(save_path + 'itemcf_test_recall.csv', index=False)

# 处理训练集召回
train_recall.to_csv(save_path + 'itemcf_train_recall.csv', index=False)

print('Itemcf Recall Finished! Cost time: {}'.format(time.time() - ts))
return train_past_clicks, train_last_click, test_last_click

第182~183行:对每个物品的相似度列表进行排序

1
2
for i in tqdm(i2i_sim.keys()):  
i2i_sim[i] = sorted(i2i_sim[i].items(), key=lambda x: x[1], reverse=True)
  1. .items()

    • 字典方法,返回一个键值对视图(可迭代对象),每个元素是 (key, value) 元组。
  2. sorted(..., key=lambda x: x[1], reverse=True)

    • sorted():Python 内置函数,对可迭代对象排序,返回新列表

    • key=lambda x: x[1]

      • lambda x: x[1] 是一个匿名函数,表示“取每个元组的第 1 个元素(即索引为 1 的值)”。
      • 因为 x(item_j, sim_score),所以 x[1] 就是相似度得分
      • 排序将基于相似度数值大小进行。
    • reverse=True

      • 表示降序排序(从高到低),即最相似的排在前面。

第204行:分离训练集和测试集的召回结果

1
tst_recall = recall_df[recall_df['user_id'].isin(test_last_click['user_id'].unique())]
  1. isin() 是 pandas 的方法,用于判断 recall_df 中每个 user_id 是否出现在给定的集合中。
    • 返回一个 布尔 Series(True/False),长度与 recall_df 相同。
  2. recall_df[...]
    • 使用布尔索引,只保留 user_id 属于测试集用户的那些行
  3. ✅ 结果 tst_recall:只包含在测试集中出现过的用户的召回结果。

分离用户历史点击和最后一次点击

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def get_past_click():
"""
分离用户的历史点击和最后一次点击

用于构建训练集和验证集
训练时使用历史点击预测最后一次点击
"""
train = train_click.sort_values(['user_id', 'click_timestamp']).reset_index().copy()
list1 = [] # 存储用户最后一次点击
train_indexs = [] # 存储最后一次点击的索引

# 遍历训练集用户
for user_id in tqdm(train['user_id'].unique()):
user = train[train['user_id'] == user_id]
row = user.tail(1) # 获取最后一次点击
train_indexs.append(row.index.values[0])

# 过滤掉只点击了一次的用户(测试集A中的噪声用户)
if len(user) >= 2:
list1.append(row.values.tolist()[0])

# 创建训练集最后一次点击DataFrame
train_last_click = pd.DataFrame(list1, columns=['index', 'user_id', 'article_id', 'click_timestamp',
'click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type'])
train_last_click = train_last_click.drop(columns=['index'])

# 获取训练集历史点击(排除最后一次)
train_past_clicks = train[~train.index.isin(train_indexs)]
train_past_clicks = train_past_clicks.drop(columns=['index'])

# 处理测试集
test = test_click.sort_values(['user_id', 'click_timestamp']).reset_index().copy()
list2 = []
for user_id in tqdm(test['user_id'].unique()):
user = test[test['user_id'] == user_id]
row = user.tail(1) # 获取最后一次点击
list2.append(row.values.tolist()[0])

test_last_click = pd.DataFrame(list2, columns=['index', 'user_id', 'article_id', 'click_timestamp',
'click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type'])
test_last_click = test_last_click.drop(columns=['index'])

# 合并所有历史点击数据(用于计算相似度)
all_click_df = pd.concat([train_past_clicks, test_click])
all_click_df = all_click_df.reset_index().drop(columns=['index'])
all_click_df = all_click_df.drop_duplicates((['user_id', 'click_article_id', 'click_timestamp']))

return all_click_df, train_past_clicks, train_last_click, test_last_click

第8行:train = train_click.sort_values([‘user_id’, ‘click_timestamp’]).reset_index().copy()

  1. .sort_values([‘user_id’, ‘click_timestamp’])

    • 首先按 user_id排序,然后在每个用户内部按 click_timestamp排序

    • 这样可以将每个用户的数据按时间顺序排列

  2. .reset_index()
    • 重置 DataFrame 的索引为默认的整数索引(0, 1, 2, …)
    • 除非使用 reset_index(drop=False),否则会丢弃旧索引
    • 排序后使用这个功能可以获得干净的索引
  3. .copy()
    • 创建 DataFrame 的深拷贝
    • 确保对 train的修改不会影响原始的 train_clickDataFrame
    • 防止 pandas 中的 SettingWithCopyWarning 警告

第13~20行:遍历训练集用户

1
2
3
4
5
6
7
8
for user_id in tqdm(train['user_id'].unique()):
user = train[train['user_id'] == user_id]
row = user.tail(1) # 获取最后一次点击
train_indexs.append(row.index.values[0])

# 过滤掉只点击了一次的用户(测试集A中的噪声用户)
if len(user) >= 2:
list1.append(row.values.tolist()[0])

user = train[train[‘user_id’] == user_id]

功能:筛选出当前用户的所有行

优缺点:这个写法虽然直观,但效率可能不高,特别是当数据量大时。

改进方法:可以使用groupby + 聚合等

1
2
3
4
5
6
7
8
9
10
11
12
# 方法1:使用 groupby + 聚合(推荐)
last_clicks = train.groupby('user_id').tail(1)
train_indexs = last_clicks.index.tolist()

# 方法2:使用 groupby + idxmax(如果按时间排序)
# 假设已经按时间排序
last_indices = train.groupby('user_id')['click_timestamp'].idxmax()
train_indexs = last_indices.tolist()

# 方法3:使用 drop_duplicates(如果已排序)
last_clicks = train.drop_duplicates(subset=['user_id'], keep='last')
train_indexs = last_clicks.index.tolist()

list1.append(row.values.tolist()[0])

  1. row.values - 将 pandas Series(行)转换为 numpy 数组
  2. tolist() - 将 numpy 数组转换为 Python 列表,例如:[[1, 101, 1609459200]]
  3. [0] - 获取列表的第一个元素(也就是唯一的那一行)

第29行:train_past_clicks = train[~train.index.isin(train_indexs)]

  1. train.index.isin(train_indexs)

    • 检查 trainDataFrame 的索引是否在 train_indexs列表中

    • 返回一个布尔序列,True 表示索引在列表中,False 表示不在

  2. ~符号

    • 取反操作符(NOT)
    • 将 True 变为 False,False 变为 True
    • 相当于 “不在列表中”
  3. train[…]

    • 使用布尔序列筛选 DataFrame
    • 只选择布尔值为 True 的行

第30行:all_click_df = all_click_df.reset_index().drop(columns=[‘index’])

  1. .reset_index()
    • 将 DataFrame 的当前索引重置为默认的整数索引(0, 1, 2, …)
    • 原来的索引会变成新的列,默认列名为 ‘index’
  2. .drop(columns=[‘index’]):
    • 删除名为 ‘index’ 的列
    • 也就是删除刚刚从索引转换来的那列
  3. 将结果重新赋值给 all_click_df

第38行:all_click_df = all_click_df.drop_duplicates(([‘user_id’, ‘click_article_id’, ‘click_timestamp’]))

  1. subset=[‘user_id’, ‘click_article_id’, ‘click_timestamp’]
    • 指定判断重复行的列组合
    • 只有这三列的值完全相同的行才会被视为重复
    • 其他列的值可以不同
  2. drop_duplicates()默认行为
    • 保留第一个出现的重复行
    • 删除后续的重复行
    • 默认对所有列进行比较(如果不指定 subset

根据点击时间获取用户的点击文章序列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def get_user_item_time(click_df):
"""
构建用户-物品-时间的字典结构

返回格式: {user1: [(item1, time1), (item2, time2)...], ...}

参数:
click_df: 点击数据DataFrame
"""
click_df = click_df.sort_values('click_timestamp')

def make_item_time_pair(df):
"""将用户的点击记录转换为(物品ID, 时间戳)元组列表"""
return list(zip(df['click_article_id'], df['click_timestamp']))

# 按用户分组,构建物品-时间列表
user_item_time_df = click_df.groupby('user_id')[['click_article_id', 'click_timestamp']] \
.apply(lambda x: make_item_time_pair(x)) \
.reset_index().rename(columns={0: 'item_time_list'})

# 转换为字典
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))

return user_item_time_dict

第14行:list(zip(df[‘click_article_id’], df[‘click_timestamp’]))

  1. zip(...)
    • Python 内置函数,将两个可迭代对象“配对”。
    • 例如:zip([101, 102], [1609, 1610]) → 生成 (101, 1609), (102, 1610)
  2. list(...)
    • 将 zip 对象转为实际的列表(因为 zip 返回的是迭代器)。

第17~19行:user_item_time_df = click_df.groupby(‘user_id’)[[‘click_article_id’, ‘click_timestamp’]] \
.apply(lambda x: make_item_time_pair(x)) \
.reset_index().rename(columns={0: ‘item_time_list’})

  1. click_df.groupby('user_id')

    • groupby('user_id')
      • pandas 的核心方法,user_id 分组,将同一个用户的点击记录归到一起。
      • 返回一个 GroupBy 对象。
  2. [['click_article_id', 'click_timestamp']]

    • 选择分组后每个组中只保留这两列,减少后续处理的数据量。
  3. .apply(lambda x: make_item_time_pair(x))
    • .apply(func)
      • 对每个分组(即每个用户的子 DataFrame)应用自定义函数
      • 这里的 lambda x: make_item_time_pair(x) 等价于直接传 make_item_time_pair(可简写为 .apply(make_item_time_pair))。
      • 每个用户的结果是一个 [(item, time), ...] 列表。
  4. .reset_index()
    • user_id 从索引变回普通列,方便后续操作。
  5. .rename(columns={0: 'item_time_list'})
    • 将默认列名 0(因为 apply 返回的是匿名列)重命名为 'item_time_list'

第22行:user_item_time_dict = dict(zip(user_item_time_df[‘user_id’], user_item_time_df[‘item_time_list’]))

  1. zip(...)
    • 将两列(user_iditem_time_list)一一配对。
  2. dict(...)
    • 将配对结果转为字典。
    • 例如:{ 1: [(101, 1609), (102, 1610)], 2: [(105, 1620)], … }

计算物品相似度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def itemcf_sim(df):
"""
计算物品间的协同过滤相似度

使用改进的相似度计算公式:
1. 考虑时间衰减:1/log(len(item_list)+1)
2. 归一化:余弦相似度
"""
user_item_time_dict = get_user_item_time(df)

# 初始化相似度矩阵
i2i_sim = {}
item_cnt = defaultdict(int) # 记录每个物品的点击次数

# 遍历所有用户的点击历史
for user, item_time_list in tqdm(user_item_time_dict.items()):
# 遍历用户点击的每个物品
for i, i_click_time in item_time_list:
item_cnt[i] += 1
i2i_sim.setdefault(i, {})

# 计算物品i与其他物品的相似度
for j, j_click_time in item_time_list:
if(i == j):
continue
i2i_sim[i].setdefault(j, 0)

# 相似度累加,加入时间衰减因子
i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

# 归一化相似度(余弦相似度)
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])

# 保存相似度矩阵到本地
pickle.dump(i2i_sim_, open(save_path + 'itemcf_i2i_sim.pkl', 'wb'))

return i2i_sim_

第20行:i2i_sim.setdefault(i, {})

  • 如果 i 不在 i2i_sim 中,则初始化为一个空字典 {}

  • 等价于:

    1
    2
    if i not in i2i_sim:
    i2i_sim[i] = {}

第26行:i2i_sim[i].setdefault(j, 0)

  • 确保 i2i_sim[i][j] 存在,初始为 0。

第29行: i2i_sim[i][j] += 1 / math.log(len(item_time_list) + 1)

  • len(item_time_list):该用户总共点击了多少个物品(即行为序列长度)。
  • 为什么叫“时间衰减”?
    • 实际上这里不是基于真实时间差,而是基于用户活跃度(点击越多,说明越“泛”,其共现信息越不可靠)。
    • 这是一种启发式惩罚:如果一个用户点击了 100 个物品,那么任意两个物品的共现可能只是偶然;而如果只点了 2 个,共现更可能是强关联。
    • 因此,用 1 / log(N + 1) 作为共现权重,N 越大,权重越小。
    • +1 是为了避免 log(0)log(1)=0 导致除零错误。

✅ 举例:

  • 用户 A 点了 2 个物品 → 权重 = 1 / log(2+1) ≈ 1 / 1.1 ≈ 0.91
  • 用户 B 点了 100 个物品 → 权重 = 1 / log(101) ≈ 1 / 4.6 ≈ 0.22 → 同样的共现,在活跃用户那里贡献更小。

第32~35行:归一化(余弦相似度)

1
2
3
4
i2i_sim_ = i2i_sim.copy()
for i, related_items in i2i_sim.items():
for j, wij in related_items.items():
i2i_sim_[i][j] = wij / math.sqrt(item_cnt[i] * item_cnt[j])
  • 目的:将原始共现得分转换为余弦相似度

  • 标准 ItemCF 余弦相似度公式

    其中:

    • N(i)∩N(j)∣:同时点击过 i 和 j 的用户数(这里是加权后的值 wij
    • N(i)∣:点击过 i 的用户总数(即 item_cnt[i]
  • 这里做了什么?

    • 分子 wij:是加权共现次数(考虑了用户活跃度)
    • 分母:item_cnt[i]×item_cnt[j]
    • 结果是一个 0~1 之间的相似度分数,可比性更强。

⚠️ 注意:严格来说,分母也应使用加权后的物品流行度,但此处简化为原始点击次数,是常见近似做法。

第38行:pickle.dump(i2isim, open(save_path + ‘itemcf_i2i_sim.pkl’, ‘wb’))

  • pickle.dump()
    • Python 内置模块 pickle 用于序列化对象(把 Python 字典保存到磁盘)。
    • 'wb':以二进制写模式打开文件。
  • save_path
    • 应该是函数外部定义的路径变量(代码中未展示,需确保已定义)。
    • 例如:save_path = './model/'

基于商品的召回i2i

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num):
"""
基于物品相似度进行推荐

步骤:
1. 获取用户历史点击的物品
2. 对每个历史物品,获取其相似物品
3. 累加相似度得分
4. 排序并返回topN

参数:
user_id: 用户ID
user_item_time_dict: 用户-物品-时间字典
i2i_sim: 物品相似度矩阵
sim_item_topk: 每个物品考虑的相似物品数
recall_item_num: 召回物品数量
"""
# 获取用户历史点击的物品
user_hist_items = user_item_time_dict[user_id]
#user_hist_items_ = {user_id for user_id, _ in user_hist_items} # 转换为集合便于查询 user_id命名错误
user_hist_items_ = {item_id for item_id, _ in user_hist_items}

item_rank = {}
# 遍历用户的历史点击物品
for loc, (i, click_time) in enumerate(user_hist_items):
# 获取物品i的topk相似物品
for j, wij in i2i_sim[i][:sim_item_topk]:
if j in user_hist_items_: # 排除用户已经点击过的物品
continue
item_rank.setdefault(j, 0)
item_rank[j] += wij # 累加相似度得分 可能有用户历史中的多个物品都和某个物品相似

# 按得分排序,取前recall_item_num个
item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]

return item_rank

第20/21行:userhist_items = {userid for user_id, in user_hist_items}

  • 作用:
    • 将用户历史点击的 物品 ID 提取出来,存入一个 set
    • 使用 set 是为了O(1) 快速判断某个物品是否已被点击过,避免重复推荐。
  • ✅ 示例:user_hist_items_ = {101, 105, 110}

热度召回

其中包括

def getitem_topk_click(hot_articles, hot_articles_dict, click_time, past_click_articles, k)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def hot_recall(topk=10, train_past_clicks=None, test_last_click=None):
"""
基于热度的召回策略

核心思想: 推荐在用户点击时间附近最热门的文章
创新点: 考虑24小时时间窗口,避免推荐过时内容
"""
ts = time.time()

# 获取训练集和测试集点击数据
train_click_df = get_all_click_df(test=False)
test_click_df = get_all_click_df(train=False)

# 获取ItemCF召回结果(用于获取最后一次点击时间)
train_past_clicks, _, test_last_click = itemcf_recall()

# 按时间排序
train_click_df = train_click_df.sort_values(['user_id', 'click_timestamp'])
test_click_df = test_click_df.sort_values(['user_id', 'click_timestamp'])

# 合并文章元数据(将文章元信息“挂载”到每条点击记录上)
articles_copy = articles.rename(columns={'article_id': 'click_article_id'})
train_click_df = train_click_df.merge(articles_copy, on='click_article_id', how='left')
test_click_df = test_click_df.merge(articles_copy, on='click_article_id', how='left')

# 获取用户最后一次点击时间
train_last_click = train_past_clicks.groupby('user_id').agg({'click_timestamp': 'max'}).reset_index()
train_last_click_time = train_last_click.set_index('user_id')['click_timestamp'].to_dict()
test_last_click_time = test_last_click.set_index('user_id')['click_timestamp'].to_dict()

def get_item_topk_click_(hot_articles, hot_articles_dict, click_time, past_click_articles, k):
"""
获取用户的topk热门文章,考虑时间窗口

过滤条件:
1. 排除用户已点击过的文章
2. 文章创建时间在用户点击时间±24小时内
"""
topk_click = []
min_time = click_time - 24 * 60 * 60 * 1000 # 24小时前(毫秒)
max_time = click_time + 24 * 60 * 60 * 1000 # 24小时后(毫秒)

for article_id in hot_articles['article_id'].unique():
if article_id in past_click_articles: # 排除已点击
continue
if not min_time <= hot_articles_dict[article_id] <= max_time: # 时间窗口检查
continue
topk_click.append(article_id)
if len(topk_click) == k: # 达到指定数量
break
return topk_click

# 计算训练集热门文章(按点击次数排序)
train_hot_articles = pd.DataFrame(train_click_df['click_article_id'].value_counts().index.to_list(),
columns=['article_id'])
train_hot_articles = train_hot_articles.merge(articles).drop(columns=['category_id', 'words_count'])
train_hot_articles_dict = train_hot_articles.set_index('article_id')['created_at_ts'].to_dict()

# 计算测试集热门文章
test_hot_articles = pd.DataFrame(test_click_df['click_article_id'].value_counts().index.to_list(),
columns=['article_id'])
test_hot_articles = test_hot_articles.merge(articles).drop(columns=['category_id', 'words_count'])
test_hot_articles_dict = test_hot_articles.set_index('article_id')['created_at_ts'].to_dict()

# 为训练集用户生成热门召回
train_list = []
for user_id in tqdm(train_past_clicks['user_id'].unique()):
user = train_past_clicks.loc[train_past_clicks['user_id'] == user_id] # 可以换成groupby 执行更快
click_time = train_last_click_time[user_id] # 用户最后一次点击时间
past_click_articles = user['click_article_id'].values # 用户历史点击
item_topk_click = get_item_topk_click_(train_hot_articles, train_hot_articles_dict,
click_time, past_click_articles, k=topk)
for id in item_topk_click:
rows = [user_id, id]
train_list.append(rows)

hot_train_recall = pd.DataFrame(train_list, columns=['user_id', 'article_id'])
hot_train_recall.to_csv(save_path + 'hot_train_recall.csv', index=False)

# 为测试集用户生成热门召回
test_list = []
for user_id in tqdm(test_click_df['user_id'].unique()):
user = test_click_df.loc[test_click_df['user_id'] == user_id]
click_time = test_last_click_time[user_id]
past_click_articles = user['click_article_id'].values
item_topk_click = get_item_topk_click_(test_hot_articles, test_hot_articles_dict,
click_time, past_click_articles, k=topk)
for id in item_topk_click:
rows = [user_id, id]
test_list.append(rows)

hot_test_recall = pd.DataFrame(test_list, columns=['user_id', 'article_id'])
hot_test_recall.to_csv(save_path + 'hot_test_recall.csv', index=False)

print('Hot Recall Finished! Cost time: {}'.format(time.time() - ts))

第22行:articles_copy = articles.rename(columns={‘article_id’: ‘click_article_id’})

  1. 作用:重命名列以对齐键名
  2. rename(...) 的作用
    • articles 中的 'article_id' 列重命名为 'click_article_id'
    • 目的:让两个表在后续 merge 时有相同的连接键(join key)

第23/24行:train_click_df = train_click_df.merge(articles_copy, on=’click_article_id’, how=’left’)

  1. merge(...) 是 pandas 的核心函数,用于表连接(类似 SQL JOIN)
    • on='click_article_id'
      • 指定连接键:两个表都必须有这一列。
      • 现在 train_click_dfarticles_copy 都有 click_article_id,可以匹配。
    • how='left'
      • 表示左连接(left join)
        • 保留 train_click_df 中的所有行;
        • 如果某 click_article_idarticles_copy 中找不到,则对应的文章字段填 NaN
        • 不会丢掉任何点击记录。

第27行:train_last_click = train_past_clicks.groupby(‘user_id’).agg({‘click_timestamp’: ‘max’}).reset_index()

  1. groupby('user_id')
    • 将数据按 user_id 分组,每个用户的所有点击记录被归到一组。
  2. .agg({'click_timestamp': 'max'})
    • 对每个分组,对 'click_timestamp' 列应用聚合函数 'max'(取最大值)。
    • 因为时间戳越大表示越晚,所以 max(click_timestamp) 就是该用户最后一次点击的时间
  3. .reset_index()
    • user_id索引转回普通列

第28行:train_last_click_time = train_last_click.set_index(‘user_id’)[‘click_timestamp’].to_dict()

  1. .set_index('user_id')

    • user_id 列设为 DataFrame 的行索引
  2. ['click_timestamp']

    • 选取 click_timestamp 这一列,得到一个 pandas Series,其索引是 user_id,值是时间戳。
  3. .to_dict()

    • 将 Series 转换为 Python 原生字典:

      1
      2
      3
      4
      5
      {
      1: 1609466400,
      2: 1609470000,
      ...
      }

第54~55行:

1
2
3
4
train_hot_articles = pd.DataFrame(
train_click_df['click_article_id'].value_counts().index.to_list(),
columns=['article_id']
)
  1. train_click_df['click_article_id']

    • 从训练点击日志 DataFrame 中取出 'click_article_id' 列(Series)。

    • 示例:

      1
      [101, 105, 101, 110, 105, 101, ...]
  2. .value_counts()

    • pandas 方法:统计每个唯一值出现的次数,并自动按频次降序排列
    • 返回一个 Series,索引是 article_id,值是点击次数。
  3. .index

    • 获取上述 Series 的索引,即文章 ID 列表(已按点击频次从高到低排序)。

    • 示例:

      1
      Int64Index([101, 105, 110], dtype='int64')
  4. .to_list()

    • 将 Index 转换为 Python 原生列表:[101, 105, 110]
  5. pd.DataFrame(..., columns=['article_id'])

    • 用这个列表创建一个新的 DataFrame,列名为 'article_id'

第56行:train_hot_articles = train_hot_articles.merge(articles).drop(columns=[‘category_id’, ‘words_count’])

  1. train_hot_articles.merge(articles)
    • merge() 是 pandas 的表连接函数(类似 SQL 的 JOIN)。
    • 默认行为:
      • 自动根据两个表中同名的列进行连接(这里就是 'article_id')。
      • 默认是 inner join(只保留两个表都存在的 article_id)。
    • 等价于:train_hot_articles.merge(articles, on=’article_id’, how=’inner’)

召回结果整合

def get_test_recall(itemcf=False, hot=False)

def get_train_recall(itemcf=False, hot=False, train_last_click=None)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def get_test_recall(itemcf=False, hot=False):
"""
整合测试集的召回结果

支持多种召回策略的结果合并
"""
if itemcf:
itemcf_test_recall = pd.read_csv(save_path + 'itemcf_test_recall.csv')
itemcf_test_recall = itemcf_test_recall.rename(columns={'click_article_id': 'article_id'})

if hot:
hot_test_recall = pd.read_csv(save_path + 'hot_test_recall.csv')

if itemcf:
test = itemcf_test_recall.copy()
if hot:
test = pd.concat([test, hot_test_recall]) # 合并不同召回策略的结果

test_recall = test.drop_duplicates(['user_id', 'article_id']) # 去重
test_recall.to_csv(save_path + 'test_recall.csv', index=False)
print('Test Recall Finished!')
return test_recall

def get_train_recall(itemcf=False, hot=False, train_last_click=None):
"""
整合训练集的召回结果,并生成标签

标签定义:
- 正样本(1): 用户最后一次点击的文章
- 负样本(0): 召回但用户未点击的文章
"""
if itemcf:
itemcf_train_recall = pd.read_csv(save_path + 'itemcf_train_recall.csv')
itemcf_train_recall = itemcf_train_recall.rename(columns={'click_article_id': 'article_id'})
# 标记正样本
itemcf_train_recall = itemcf_train_recall.merge(train_last_click, on=['user_id', 'article_id'], how='left')
itemcf_train_recall['label'] = itemcf_train_recall['click_timestamp'].apply(
lambda x: 0.0 if np.isnan(x) else 1.0
)
print('Train ItemCF RECALL:{}%'.format(
(itemcf_train_recall['label'].value_counts()[1]) / len(train_last_click['user_id'].unique()) * 100
))

if hot:
hot_train_recall = pd.read_csv(save_path + 'hot_train_recall.csv')
hot_train_recall['label'] = hot_train_recall.merge(train_last_click, on=['user_id', 'article_id'], how='left')['click_timestamp'].apply(
lambda x: 0.0 if np.isnan(x) else 1.0
)
print('Train Hot RECALL:{}%'.format(
(hot_train_recall['label'].value_counts()[1]) / len(train_last_click['user_id'].unique()) * 100
))

if itemcf:
train = itemcf_train_recall.copy()
if hot:
train = pd.concat([train, hot_train_recall]) # 合并召回结果

train = train.drop_duplicates(['user_id', 'article_id']) # 去重
train['pred_score'] = train['pred_score'].fillna(-100) # 填充缺失的预测分数

# 计算总召回率
print('Train Total RECALL:{}%'.format(
(train['label'].value_counts()[1]) / len(train_last_click['user_id'].unique()) * 100
))
print('Train Total Recall Finished!')
train.to_csv(save_path + 'train_recall.csv', index=False)

# 负采样函数
def neg_sample(train=None):
"""
负采样策略

由于正负样本比例极度不平衡(通常1:1000以上),
需要进行负采样以平衡数据集
"""
ts = time.time()

def neg_sample_recall_data(recall_items_df, sample_rate=0.001):
"""
执行负采样

采用两种采样策略:
1. 按用户采样:保证每个用户都有负样本
2. 按物品采样:保证每个物品都被采样
"""
pos_data = recall_items_df[recall_items_df['label'] == 1] # 正样本
neg_data = recall_items_df[recall_items_df['label'] == 0] # 负样本

print('pos_data_num:', len(pos_data), 'neg_data_num:', len(neg_data),
'pos/neg:', len(pos_data)/len(neg_data))

def neg_sample_func(group_df):
"""分组采样函数"""
neg_num = len(group_df)
sample_num = max(int(neg_num * sample_rate), 1) # 最少采1个
sample_num = min(sample_num, 5) # 最多采5个
return group_df.sample(n=sample_num, replace=False)

# 按用户进行负采样
neg_data_user_sample = neg_data.groupby('user_id', group_keys=False).apply(neg_sample_func)
# 按物品进行负采样
neg_data_item_sample = neg_data.groupby('article_id', group_keys=False).apply(neg_sample_func)

# 合并两种采样结果
neg_data_new = pd.concat([neg_data_user_sample, neg_data_item_sample])
# 去重
neg_data_new = neg_data_new.sort_values(['user_id', 'pred_score']) \
.drop_duplicates(['user_id', 'article_id'], keep='last')

# 合并正负样本
data_new = pd.concat([pos_data, neg_data_new], ignore_index=True)

return data_new

train = neg_sample_recall_data(train)
print('Negative Data Sample Finished! Cost time: {}'.format(time.time() - ts))
return train

# 执行负采样
train = neg_sample(train)
return train

第36行:与真实标签左连接(关键步骤!)

1
itemcf_train_recall = itemcf_train_recall.merge(train_last_click, on=['user_id', 'article_id'], how='left')
  1. on=['user_id', 'article_id']

    • 连接条件:同一个用户 + 同一个文章 ID
    • 即:检查召回的 (user, item) 是否出现在 train_last_click(真实最后一次点击)中。
  2. how='left'

    • 保留所有召回结果;
    • 如果某 (user, item) 不在 train_last_click 中,则 click_timestamp 等字段为 NaN
    • 如果,则填充真实的 click_timestamp
  3. 结果示例:

    | user_id | article_id | score | click_timestamp |
    | ———- | ————— | ——- | ————————————————————- |
    | 1 | 205 | 0.9 | 1609470000 ← 被召回且是真实点击(正样本) |
    | 1 | 301 | 0.8 | NaN ← 被召回但不是真实点击(负样本) |
    | 2 | 150 | 0.7 | NaN ← … |

    只有当召回的物品恰好是该用户的最后一次点击时,click_timestamp 才非空。

第37~39行:生成二值标签(label)

1
itemcf_train_recall['label'] = itemcf_train_recall['click_timestamp'].apply(lambda x: 0.0 if np.isnan(x) else 1.0)
  1. np.isnan(x):判断 click_timestamp 是否为 NaN

  2. 逻辑:

    • 如果是 NaN → 说明该召回物品不是用户的真实最后一次点击 → label = 0
    • 如果非 NaN → 说明命中了真实点击label = 1

    ✅ 这样就为每个召回结果打上了 0/1 标签。

第40~42行:计算并打印召回率(Recall)

1
2
3
print('Train ItemCF RECALL:{}%'.format(
(itemcf_train_recall['label'].value_counts()[1]) / len(train_last_click['user_id'].unique()) * 100
))
  1. 分子:itemcf_train_recall['label'].value_counts()[1]

    • 统计 label=1 的数量 → 成功召回的用户数(注意:每个用户最多贡献 1 次命中,因为 train_last_click 每用户只有一个正样本)

    ✅ 假设用户1的最后一次点击是205,只要205出现在他的召回列表中,就算命中1次。

  2. 分母:len(train_last_click['user_id'].unique())

    • train_last_click 中的用户总数 → 总测试用户数
  3. 公式本质:

    📌 这是 Hit Rate @K(或称为 Recall@K)的一种形式,常用于 Top-K 推荐评估。

第59行:train[‘pred_score’] = train[‘pred_score’].fillna(-100) # 填充缺失的预测分数

  • 作用:train DataFrame 中 pred_score 列的所有缺失值(NaN)替换为 -100
  • train['pred_score']:选取 pred_score 列(通常表示模型对某 (user, item) 的打分或相似度)。
  • .fillna(-100):pandas 方法,将该列中所有 NaN(缺失值)替换为 -100
  • 赋值回原列,确保后续操作不会因 NaN 出错。

第100行:neg_data_user_sample = neg_data.groupby(‘user_id’, group_keys=False).apply(neg_sample_func)

  1. 目的:对每个用户的负样本单独进行采样,确保每个用户都有一定数量的负样本被保留下来

    • neg_data(所有负样本)中:

      • user_id 分组
      • 每个用户的负样本子集,调用 neg_sample_func 进行采样(比如最多保留 5 个)
      • 最终得到一个每个用户都包含少量负样本的新 DataFrame

      ✅ 核心思想:避免某些活跃用户“垄断”负样本,也防止不活跃用户完全没有负样本,保证训练时每个用户都能贡献梯度。

  2. .groupby('user_id', group_keys=False)

    • groupby('user_id')

      • neg_datauser_id 分成多个小组。
      • 例如:
        • 用户 1 的负样本 → Group A
        • 用户 2 的负样本 → Group B
    • group_keys=False

      • 作用:禁止在 apply 结果中自动添加分组键(user_id)作为多级索引(MultiIndex)
      • 为什么重要?
        • 默认 group_keys=True 时,apply 返回的 DataFrame 会带有 (user_id, original_index) 的复合索引。
        • 这会导致后续 pd.concat() 或去重时索引混乱。
        • 设为 False 后,返回的是干净的普通 DataFrame,索引是连续的或原始的(取决于 neg_sample_func)。

      最佳实践:在 groupby().apply() 后要合并数据时,通常设 group_keys=False

  3. .apply(neg_sample_func)

    • apply()

      • 每一个分组(即每个用户的负样本子集),调用函数 neg_sample_func

      • 相当于循环:

        1
        2
        3
        4
        5
        results = []
        for user_id, group_df in neg_data.groupby('user_id'):
        sampled = neg_sample_func(group_df)
        results.append(sampled)
        neg_data_user_sample = pd.concat(results, ignore_index=True)
    • neg_sample_func(group_df):回顾定义的负采样函数

      1
      2
      3
      4
      5
      def neg_sample_func(group_df):
      neg_num = len(group_df)
      sample_num = max(int(neg_num * sample_rate), 1) # 至少1个
      sample_num = min(sample_num, 5) # 最多5个
      return group_df.sample(n=sample_num, replace=False)
      • 输入:某个用户的全部负样本(DataFrame)
      • 逻辑:
        • 计算该用户有多少负样本(neg_num
        • 按比例 sample_rate(如 0.001)计算要采多少个
        • 限制在 [1, 5] 范围内
        • 随机无放回抽样 sample_num 个(在 Pandas 的 DataFrame.sample() 方法中,参数 replace 控制采样时是否允许重复(即“放回”还是“不放回”)。)
      • 输出:采样后的子 DataFrame

      📌 举例:

      • 用户 1 有 1000 个负样本 → int(1000 * 0.001) = 1 → 采 1 个
      • 用户 2 有 3 个负样本 → int(3 * 0.001) = 0max(0,1)=1 → 采 1 个
      • 用户 3 有 10000 个 → 10 → 但 min(10,5)=5 → 采 5 个

第107~108行:对合并后的负样本去重

1
2
neg_data_new = neg_data_new.sort_values(['user_id', 'pred_score']) \
.drop_duplicates(['user_id', 'article_id'], keep='last')
  • 去重:同一个 (user, item) 可能在两种采样中都被选中,保留一个(keep='last'pred_score 排序后保留最后一个,即分数较高的?需注意排序逻辑)。
  • .sort_values(['user_id', 'pred_score'])默认是升序排列,所有后面的ItemCF相似度越高

⚠️ 注意:sort_values(['user_id', 'pred_score']) 中,若 pred_score 是 ItemCF 相似度,则分数越高越相关keep='last' 会保留同用户同物品中分数最高的那个——这其实是合理的,因为高分负样本更“难”(Hard Negative)。

排序模型训练与预测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def train_and_predict(itemcf=False, itemcf_topk=10, hot=False, hot_topk=10, offline=True):
"""
完整的训练和预测流程

主要步骤:
1. 召回阶段
2. 特征工程
3. 模型训练
4. 评估预测
"""
ts = time.time()

# 召回阶段
if itemcf:
train_past_clicks, train_last_click, test_last_click = itemcf_recall(itemcf_topk)
if hot:
hot_recall(hot_topk, train_past_clicks, test_last_click)

# 获取用户最后点击时间
train_past_clicks = train_past_clicks.groupby('user_id').agg({'click_timestamp': 'max'})
print(train_past_clicks.head())

# 获取训练集召回结果
train = get_train_recall(itemcf, hot, train_last_click)
train = train.sort_values('user_id').drop(columns=['click_timestamp']).reset_index(drop=True)
print(train.head())

# 特征工程
# 合并用户特征
train = train.drop(columns=['click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type']) \
.merge(train_last_click.drop(columns=['article_id', 'click_timestamp']))
# 合并文章特征
train = train.merge(articles, on='article_id', how='left')
# 合并最后点击时间
train = train.merge(train_past_clicks, on='user_id', how='left')
# 创建时间差特征
train['delta_time'] = train['created_at_ts'] - train['click_timestamp']

# 准备训练数据
X = train.copy()
print(X.head())
y = train['label']

# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=66)
X_eval, X_off, y_eval, y_off = train_test_split(X_test, y_test, test_size=0.5, random_state=66)

# 获取用户组信息(用于LGBMRanker)
g_train = X_train.groupby(['user_id'], as_index=False).count()['label'].values
g_eval = X_eval.groupby(['user_id'], as_index=False).count()['label'].values

# 特征列定义
lgb_cols = ['click_environment', 'click_deviceGroup', 'click_os', 'click_country',
'click_region', 'click_referrer_type', 'category_id', 'created_at_ts',
'words_count', 'click_timestamp', 'delta_time']

# 初始化LightGBM排序模型
lgb_ranker = lgb.LGBMRanker(
boosting_type='gbdt', # 梯度提升决策树
num_leaves=31, # 叶子节点数
reg_alpha=0.0, # L1正则化系数
reg_lambda=1, # L2正则化系数
max_depth=-1, # 不限制深度
n_estimators=1000, # 树的数量
subsample=0.7, # 样本采样率
colsample_bytree=0.7, # 特征采样率
subsample_freq=1, # 采样频率
learning_rate=0.01, # 学习率
min_child_weight=50, # 叶子节点最小样本权重
random_state=66, # 随机种子
n_jobs=-1 # 使用所有CPU核心
)

# 训练模型
lgb_ranker.fit(
X_train[lgb_cols], y_train,
group=g_train, # 用户组信息(用于pairwise排序)
eval_set=[(X_eval[lgb_cols], y_eval)], # 验证集
eval_group=[g_eval] # 验证集用户组
)

# 输出特征重要性
def print_feature_importance(columns, scores):
"""打印特征重要性排序"""
print('--------------------------------')
result = list(zip(columns, scores))
result.sort(key=lambda v: v[1], reverse=True) # 按重要性降序排序
for col, score in result:
print('{}: {}'.format(col, score))
print('--------------------------------')

print_feature_importance(lgb_cols, lgb_ranker.feature_importances_)

# 离线评估
X_off['pred_score'] = lgb_ranker.predict(X_off[lgb_cols], num_iteration=lgb_ranker.best_iteration_)

# 清理特征列
X_off = X_off.drop(columns=['category_id', 'created_at_ts', 'words_count', 'click_environment',
'click_deviceGroup', 'click_os', 'click_country', 'click_region',
'click_referrer_type', 'click_timestamp', 'delta_time'])

# 排序和排名
recall_df = X_off.copy()
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

# 生成提交格式
del recall_df['pred_score'], recall_df['label']
submit = recall_df[recall_df['rank'] <= 5].set_index(['user_id', 'rank']).unstack(-1).reset_index()
max_article = int(recall_df['rank'].value_counts().index.max())
submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

# 重命名列
submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
3: 'article_3', 4: 'article_4', 5: 'article_5'})
submit = submit.fillna(-1) # 填充缺失值

# 计算MRR指标
sums = 0
for user_id in tqdm(submit['user_id'].unique()):
user = submit.loc[submit['user_id'] == user_id]
art_id = train_last_click.loc[train_last_click['user_id'] == user_id, 'article_id'].values[0]
for i in range(1, max_article):
if user['article_{}'.format(i)].values[0] == art_id:
sums += 1 / i # MRR计算:1/排名
print('MRR:{}'.format(sums / len(submit['user_id'].unique())))

# 在线预测(生成最终提交文件)
if not offline:
test_recall = get_test_recall(itemcf, hot)
test_recall = test_recall.merge(test_last_click.drop(columns=['article_id']))
test_recall = test_recall.merge(articles, on='article_id', how='left')
test_recall['delta_time'] = test_recall['created_at_ts'] - test_recall['click_timestamp']

# 预测得分
test_recall['pred_score'] = lgb_ranker.predict(test_recall[lgb_cols], num_iteration=lgb_ranker.best_iteration_)
result = test_recall.sort_values(by=['user_id', 'pred_score'], ascending=(True, False))

# 清理特征列
result = result.drop(columns=['category_id', 'created_at_ts', 'words_count', 'click_environment',
'click_deviceGroup', 'click_os', 'click_country', 'click_region',
'click_referrer_type', 'click_timestamp', 'delta_time'])

def submit_f(recall_df, topk=10, model_name=None):
"""
生成最终提交文件

格式要求:
user_id, article_1, article_2, article_3, article_4, article_5
"""
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

# 验证每个用户都有足够的推荐结果
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() >= topk

del recall_df['pred_score']
# 转换为宽格式
submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()

submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

# 重命名列
submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
3: 'article_3', 4: 'article_4', 5: 'article_5'})

# 保存结果
save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d-%H-%M') + '.csv'
submit.to_csv(save_name, index=False, header=True)

# 生成提交文件
submit_f(result, topk=5, model_name='lgb_ranker')
print('Submit Finished! Cost time: {}'.format(time.time() - ts))

召回阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 召回阶段
if itemcf:
train_past_clicks, train_last_click, test_last_click = itemcf_recall(itemcf_topk)
if hot:
hot_recall(hot_topk, train_past_clicks, test_last_click)

# 获取用户最后点击时间
train_past_clicks = train_past_clicks.groupby('user_id').agg({'click_timestamp': 'max'})
print(train_past_clicks.head())

# 获取训练集召回结果
train = get_train_recall(itemcf, hot, train_last_click)
train = train.sort_values('user_id').drop(columns=['click_timestamp']).reset_index(drop=True)
print(train.head())

第8行:train_past_clicks = train_past_clicks.groupby('user_id').agg({'click_timestamp': 'max'})

  • .agg({'click_timestamp': 'max'})
    • 作用聚合。这是紧接在 groupby之后的操作,用于对每个分组(小组DataFrame)应用一个或多个聚合函数,并将每个分组的结果合并成一个新的DataFrame。
    • 参数解析agg接受一个字典,这个字典定义了 “对哪一列执行什么操作”
      • {'click_timestamp': 'max'}
      • 键(Key)'click_timestamp',表示我们要操作的列名。
      • 值(Value)'max',表示要应用的聚合函数,这里是取最大值。对于时间戳列,最大值就是最近的时间。

特征工程

1
2
3
4
5
6
7
8
9
10
11
# 特征工程
# 合并用户特征
train = train.drop(columns=['click_environment', 'click_deviceGroup', 'click_os',
'click_country', 'click_region', 'click_referrer_type']) \
.merge(train_last_click.drop(columns=['article_id', 'click_timestamp']))
# 合并文章特征
train = train.merge(articles, on='article_id', how='left')
# 合并最后点击时间
train = train.merge(train_past_clicks, on='user_id', how='left')
# 创建时间差特征
train['delta_time'] = train['created_at_ts'] - train['click_timestamp']

模型训练

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# 准备训练数据
X = train.copy()
print(X.head())
y = train['label']

# 数据分割
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=66)
X_eval, X_off, y_eval, y_off = train_test_split(X_test, y_test, test_size=0.5, random_state=66)

# 获取用户组信息(用于LGBMRanker)
g_train = X_train.groupby(['user_id'], as_index=False).count()['label'].values
g_eval = X_eval.groupby(['user_id'], as_index=False).count()['label'].values

# 特征列定义
lgb_cols = ['click_environment', 'click_deviceGroup', 'click_os', 'click_country',
'click_region', 'click_referrer_type', 'category_id', 'created_at_ts',
'words_count', 'click_timestamp', 'delta_time']

# 初始化LightGBM排序模型
lgb_ranker = lgb.LGBMRanker(
boosting_type='gbdt', # 梯度提升决策树
num_leaves=31, # 叶子节点数
reg_alpha=0.0, # L1正则化系数
reg_lambda=1, # L2正则化系数
max_depth=-1, # 不限制深度
n_estimators=1000, # 树的数量
subsample=0.7, # 样本采样率
colsample_bytree=0.7, # 特征采样率
subsample_freq=1, # 采样频率
learning_rate=0.01, # 学习率
min_child_weight=50, # 叶子节点最小样本权重
random_state=66, # 随机种子
n_jobs=-1 # 使用所有CPU核心
)

# 训练模型
lgb_ranker.fit(
X_train[lgb_cols], y_train,
group=g_train, # 用户组信息(用于pairwise排序)
eval_set=[(X_eval[lgb_cols], y_eval)], # 验证集
eval_group=[g_eval] # 验证集用户组
)

# 输出特征重要性
def print_feature_importance(columns, scores):
"""打印特征重要性排序"""
print('--------------------------------')
result = list(zip(columns, scores))
result.sort(key=lambda v: v[1], reverse=True) # 按重要性降序排序
for col, score in result:
print('{}: {}'.format(col, score))
print('--------------------------------')

print_feature_importance(lgb_cols, lgb_ranker.feature_importances_)

第6行:X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=66)

  • 使用 sklearn.model_selection.train_test_split
  • 80% 训练,20% 测试
  • random_state=66:保证结果可复现
  • 注意:这是随机打乱划分不考虑用户分组(后续需重建 group 信息)

⚠️ 在推荐系统中,更严谨的做法是按用户划分(避免同一用户的数据同时出现在 train/test),但这里采用简单随机划分,适用于某些场景(如样本独立性假设成立)

第11行:g_train = X_train.groupby(['user_id'], as_index=False).count()['label'].values

  1. X_train.groupby(['user_id'], as_index=False)
    • user_id 分组
    • as_index=False:表示分组后不将 user_id 设为索引,而是保留在列中(返回普通 DataFrame)。
  2. .count()
    • 对每列统计非空值数量(因为 x_train 包含 user_id, article_id, 特征列, label 等)
  3. ['label']
    • label 列的计数值(其实任意列都一样,因为每行都有值)
  4. .values
    • 转为 NumPy 数组,形如:[5, 3, 8, 2, ...],表示:
      • 用户1 有 5 个样本
      • 用户2 有 3 个样本

✅ 这个数组 g_train 就是 LightGBM 中 group 参数所需的格式

第20~34行:初始化LightGBM排序模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
lgb_ranker = lgb.LGBMRanker(
boosting_type='gbdt', # 梯度提升决策树
num_leaves=31, # 叶子节点数
reg_alpha=0.0, # L1正则化系数
reg_lambda=1, # L2正则化系数
max_depth=-1, # 不限制深度
n_estimators=1000, # 树的数量
subsample=0.7, # 样本采样率
colsample_bytree=0.7, # 特征采样率
subsample_freq=1, # 采样频率
learning_rate=0.01, # 学习率
min_child_weight=50, # 叶子节点最小样本权重
random_state=66, # 随机种子
n_jobs=-1 # 使用所有CPU核心
)
  • 小学习率(0.01) + 大树数量(1000)
    → 配合验证集早停(early stopping),找到最优迭代次数。
  • min_child_weight=50
    → 在排序任务中,防止模型对稀疏用户/物品过拟合(因每个 group 样本数可能很少)。
  • 正则化(L2=1) + 采样(subsample/colsample=0.7)
    → 典型的防过拟合组合,适合高维稀疏特征。

✅ 这是一套稳健、防过拟合、适合工业级排序任务的参数配置。

第37~42行:训练模型

1
2
3
4
5
6
lgb_ranker.fit(
X_train[lgb_cols], y_train,
group=g_train, # 用户组信息(用于pairwise排序)
eval_set=[(X_eval[lgb_cols], y_eval)], # 验证集
eval_group=[g_eval] # 验证集用户组
)
  1. 核心机制:group 参数

    • 作用:告诉模型哪些样本属于同一个 query(在推荐中 = 同一个 user)

    • 格式g_train 是一个数组,表示每个用户的样本数量,例如:

      1
      g_train = [5, 3, 8, ...]  # 用户1有5个候选,用户2有3个...
  2. 内部行为

    • LightGBM 会按 group 切分数据:[样本0~4] → 用户1, [5~7] → 用户2, …
    • 在每个用户内部计算 pairwise loss(如 LambdaRank),优化 NDCG 等排序指标

    ❗ 如果没有 group,模型会退化为普通分类/回归,完全失去排序能力

评估预测

离线评估

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 离线评估
X_off['pred_score'] = lgb_ranker.predict(X_off[lgb_cols], num_iteration=lgb_ranker.best_iteration_)

# 清理特征列
X_off = X_off.drop(columns=['category_id', 'created_at_ts', 'words_count', 'click_environment',
'click_deviceGroup', 'click_os', 'click_country', 'click_region',
'click_referrer_type', 'click_timestamp', 'delta_time'])

# 排序和排名
recall_df = X_off.copy()
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

# 生成提交格式
del recall_df['pred_score'], recall_df['label']
submit = recall_df[recall_df['rank'] <= 5].set_index(['user_id', 'rank']).unstack(-1).reset_index()
max_article = int(recall_df['rank'].value_counts().index.max())
submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

# 重命名列
submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
3: 'article_3', 4: 'article_4', 5: 'article_5'})
submit = submit.fillna(-1) # 填充缺失值

# 计算MRR指标
sums = 0
for user_id in tqdm(submit['user_id'].unique()):
user = submit.loc[submit['user_id'] == user_id]
art_id = train_last_click.loc[train_last_click['user_id'] == user_id, 'article_id'].values[0]
for i in range(1, max_article):
if user['article_{}'.format(i)].values[0] == art_id:
sums += 1 / i # MRR计算:1/排名
print('MRR:{}'.format(sums / len(submit['user_id'].unique())))

第2行:X_off['pred_score'] = lgb_ranker.predict(X_off[lgb_cols], num_iteration=lgb_ranker.best_iteration_)

  1. 作用:

    • 使用训练好的 lgb_ranker 模型对 X_off 中的样本进行预测。
    • 将预测得分存入新列 'pred_score'
  2. 关键点:

    • X_off[lgb_cols]:只传入模型训练时使用的特征列。
    • num_iteration=lgb_ranker.best_iteration_:使用验证集效果最好时的迭代轮数(需在 fit() 时启用早停),防止过拟合。

    ✅ 这是标准做法,确保使用最优模型状态。

第12行:

recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

  • groupby(...).rank(ascending=False, method='first')
    • ascending=False:分数越高,排名越靠前(rank=1 是最高分)
    • method='first':当分数相同时,按 DataFrame 中出现顺序定 rank(避免随机性)

✅ 这是 Learning-to-Rank 评估的核心步骤。

第15行:del recall_df['pred_score'], recall_df['label']

  1. 作用:删除 pred_scorelabel 列,因为生成提交格式时不需要它们。
  2. 说明:
    • del df['col'] 是直接从 DataFrame 中删除列的快捷方式。
    • 此时 recall_df 应只包含:user_id, article_id, rank

第16行:筛选 Top-5 并转为宽表

1
submit = recall_df[recall_df['rank'] <= 5].set_index(['user_id', 'rank']).unstack(-1).reset_index()
  1. 作用:

    • 保留每个用户的 Top-5 推荐;
    • 将“长表”(每行一个推荐)转为“宽表”(每行一个用户,5 列为文章 ID)。
  2. 函数说明:

    • recall_df[recall_df['rank'] <= 5]
      → 筛选 Top-5。

    • .set_index(['user_id', 'rank'])
      → 设置双层索引:(user_id, rank)

    • .unstack(-1)
      → 将最内层索引 rank 展开为列,形成 MultiIndex 列:

      1
      2
      3
      4
                   article_id
      rank 1 2 3 ...
      user_id
      101 205 301 401 ...
    • .reset_index()
      → 将 user_id 从索引变回普通列。

第17行:max_article = int(recall_df['rank'].value_counts().index.max())

第18行:处理 MultiIndex 列名

submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

  1. 作用:

    • 去掉 MultiIndex 的第一层(通常是 'article_id'),只保留数字 rank。
    • 将列名转为:['', 1, 2, 3, 4, 5](其中 ''user_id 列)
  2. 函数说明:

    • submit.columns.droplevel(0):去掉 MultiIndex 的第 0 层。
    • 列表推导式尝试将数字列名转为 int
  3. 列表推导式:[int(col) if isinstance(col, int) else col for col in ...]

    • 遍历 droplevel(0) 后的每个列名 col
    • 如果 col 已经是 int 类型,就转成 int(其实没变);
    • 否则保留原样(比如字符串 'user_id''')。

    ⚠️ 注意:这个 int(col) 转换对字符串数字无效
    例如:如果 col = '1'(字符串),isinstance('1', int)False,所以不会转成整数 1。

    所以这行代码并不能保证把字符串数字转为整数,它的实际作用很有限。

第26~33行:计算 MRR 指标

1
2
3
4
5
6
7
8
sums = 0
for user_id in tqdm(submit['user_id'].unique()):
user = submit.loc[submit['user_id'] == user_id]
art_id = train_last_click.loc[train_last_click['user_id'] == user_id, 'article_id'].values[0]
for i in range(1, max_article):
if user['article_{}'.format(i)].values[0] == art_id:
sums += 1 / i # MRR计算:1/排名
print('MRR@{}:{}'.format(max_article,sums / len(submit['user_id'].unique())))
  1. MRR 定义:
    • 若真实点击出现在第 k 位,贡献 k1
    • MRR = 所有用户贡献的平均值
  2. 问题分析:
    • 效率极低
      • submit.loc[submit['user_id'] == user_id] 是 O(n) 查询,总复杂度 O(n²)
      • 大数据下非常慢
    • 无异常处理
      • user_id 不在 train_last_click 中,.values[0] 报错

在线测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 在线预测(生成最终提交文件)
if not offline:
test_recall = get_test_recall(itemcf, hot)
test_recall = test_recall.merge(test_last_click.drop(columns=['article_id']))
test_recall = test_recall.merge(articles, on='article_id', how='left')
test_recall['delta_time'] = test_recall['created_at_ts'] - test_recall['click_timestamp']

# 预测得分
test_recall['pred_score'] = lgb_ranker.predict(test_recall[lgb_cols], num_iteration=lgb_ranker.best_iteration_)
result = test_recall.sort_values(by=['user_id', 'pred_score'], ascending=(True, False))

# 清理特征列
result = result.drop(columns=['category_id', 'created_at_ts', 'words_count', 'click_environment',
'click_deviceGroup', 'click_os', 'click_country', 'click_region',
'click_referrer_type', 'click_timestamp', 'delta_time'])

def submit_f(recall_df, topk=10, model_name=None):
"""
生成最终提交文件

格式要求:
user_id, article_1, article_2, article_3, article_4, article_5
"""
recall_df = recall_df.sort_values(by=['user_id', 'pred_score'])
recall_df['rank'] = recall_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first')

# 验证每个用户都有足够的推荐结果
tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())
assert tmp.min() >= topk

del recall_df['pred_score']
# 转换为宽格式
submit = recall_df[recall_df['rank'] <= topk].set_index(['user_id', 'rank']).unstack(-1).reset_index()

submit.columns = [int(col) if isinstance(col, int) else col for col in submit.columns.droplevel(0)]

# 重命名列
submit = submit.rename(columns={'': 'user_id', 1: 'article_1', 2: 'article_2',
3: 'article_3', 4: 'article_4', 5: 'article_5'})

# 保存结果
save_name = save_path + model_name + '_' + datetime.today().strftime('%m-%d-%H-%M') + '.csv'
submit.to_csv(save_name, index=False, header=True)

# 生成提交文件
submit_f(result, topk=5, model_name='lgb_ranker')
print('Submit Finished! Cost time: {}'.format(time.time() - ts))

第28行:tmp = recall_df.groupby('user_id').apply(lambda x: x['rank'].max())

  1. 功能:按用户分组,计算每个用户的最大排名

    • recall_dfuser_id 分组;
    • 对每个用户,找出其 rank 列的最大值;
    • 返回一个 Series,索引是 user_id,值是该用户最大的 rank
  2. 示例:

    • 假设某用户有 7 个候选文章,其 rank[1, 2, 3, 4, 5, 6, 7],则 x['rank'].max() 返回 7
      若另一用户只有 3 个候选,则返回 3

      💡 注意:这里的 rank 是从 1 开始连续编号的(由 .rank(method='first')cumcount()+1 生成),所以 最大 rank = 候选文章数量

      因此,tmp[user_id] == 该用户的候选文章总数

第29行:assert tmp.min() >= topk 断言所有用户的候选数 ≥ topk

  1. 功能:

    • 检查所有用户中最小的候选数量是否 ≥ topk(如 5);

    • 如果有任何一个用户的候选数 < topkassert 会抛出 AssertionError,程序中断。

  2. 目的:

    • 确保后续执行 recall_df[recall_df['rank'] <= topk] 时,每个用户都能取出完整的 topk 个推荐
    • 避免因某些用户候选不足,导致提交格式中出现过多 -1,或评估指标偏差。

主程序执行

1
2
3
# 执行完整的训练和预测流程
offline = True # True表示离线评估,False表示生成最终提交
train_and_predict(itemcf=True, itemcf_topk=10, hot=True, hot_topk=10, offline=offline)

存在问题以及可改进点

存在问题

  1. ItemCF中的时间衰减因子并非与时间有关而是与用户的活跃度有关。
  2. hot召回中为了获取分离后的训练集和测试集,会重新调用一次Item_CF,导致花费大量时间
    1. 解决方法:把分离数据集的函数从Item_CF中提出来

改进点

  1. 可以考虑加召回通道
  2. 有的用户匹配可以换成groupby
    1. 例如:user = train_past_clicks.loc[train_past_clicks[‘user_id’] == user_id] # 可以换成groupby 执行更快(取自hot召回)