data.py

1
2
3
4
5
6
7
8
9
10
11
import argparse
import os
import random
from random import sample

import pandas as pd
from tqdm import tqdm

from utils import Logger # 从自定义工具模块导入日志工具

random.seed(2020) # 设置随机种子保证可重复性
1
2
3
4
5
6
7
8
9
# 命令行参数解析
parser = argparse.ArgumentParser(description='数据处理')
parser.add_argument('--mode', default='valid') # 模式参数,默认'valid'(线下验证)
parser.add_argument('--logfile', default='test.log') # 日志文件名参数

args = parser.parse_args() # 解析命令行参数

mode = args.mode
logfile = args.logfile
1
2
3
4
5
# 创建日志目录(如果不存在)
os.makedirs('../user_data/log', exist_ok=True)
# 初始化日志记录器
log = Logger(f'../user_data/log/{logfile}').logger
log.info(f'数据处理,mode: {mode}') # 记录开始信息

离线模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def data_offline(df_train_click, df_test_click):
"""
线下模式:从训练集中划分验证集
参数:
df_train_click: 训练点击数据
df_test_click: 测试点击数据
"""
# 获取所有训练用户ID
train_users = df_train_click['user_id'].values.tolist()
# 从训练用户中随机采样5万个用户作为验证集用户
val_users = sample(train_users, 50000)
log.debug(f'val_users num: {len(set(val_users))}') # 记录实际唯一的验证用户数

# 初始化列表存储数据
click_list = [] # 存储所有点击数据
valid_query_list = [] # 存储验证查询(最后一条行为)

# 按用户分组处理
groups = df_train_click.groupby(['user_id'])
for user_id, g in tqdm(groups): # tqdm显示进度条
if user_id in val_users: # 如果是验证集用户
# 取用户最后一条行为作为验证查询
valid_query = g.tail(1)
valid_query_list.append(
valid_query[['user_id', 'click_article_id']])

# 剩余行为作为训练数据
train_click = g.head(g.shape[0] - 1)
click_list.append(train_click)
else: # 如果是训练集用户
click_list.append(g) # 所有行为都作为训练数据

# 合并数据
df_train_click = pd.concat(click_list, sort=False) # 训练点击数据
df_valid_query = pd.concat(valid_query_list, sort=False) # 验证查询数据

# 准备测试查询(测试集用户)
test_users = df_test_click['user_id'].unique() # 去重的测试用户
test_query_list = []

for user in tqdm(test_users): # 为每个测试用户创建查询
test_query_list.append([user, -1]) # -1表示待预测的点击文章ID

df_test_query = pd.DataFrame(test_query_list,
columns=['user_id', 'click_article_id'])

# 合并验证查询和测试查询
df_query = pd.concat([df_valid_query, df_test_query], # 验证集query在前,测试集query在后 纵向拼接
sort=False).reset_index(drop=True)
# reset_index: 重新生成从 0 开始的连续整数索引,并丢弃原来的旧索引(drop=True 表示不保留旧索引作为新列)。
# 合并训练点击和测试点击
df_click = pd.concat([df_train_click, df_test_click],
sort=False).reset_index(drop=True)
# 按用户和时间排序
df_click = df_click.sort_values(['user_id', # 首先按 user_id 从小到大排;
'click_timestamp']).reset_index(drop=True)
# 在 user_id 相同的情况下,再按 click_timestamp 从小到大排。

# 记录数据形状和头部信息
log.debug(
f'df_query shape: {df_query.shape}, df_click shape: {df_click.shape}')
log.debug(f'{df_query.head()}') # head方法:查看数据框(DataFrame)的前 5 行内容。
log.debug(f'{df_click.head()}')

# 保存文件
os.makedirs('../user_data/data/offline', exist_ok=True) # 创建保存目录

# 保存为pickle格式
# 将 df_click/df_query 以二进制格式(pickle)保存到指定路径的文件 click.pkl/query.pkl 中,以便后续快速加载使用。
df_click.to_pickle('../user_data/data/offline/click.pkl')
df_query.to_pickle('../user_data/data/offline/query.pkl')

在线模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def data_online(df_train_click, df_test_click):
"""
线上模式:使用全量训练数据,测试集作为查询
"""
# 准备测试查询
test_users = df_test_click['user_id'].unique()
test_query_list = []

for user in tqdm(test_users):
test_query_list.append([user, -1]) # 为测试用户创建查询

df_test_query = pd.DataFrame(test_query_list,
columns=['user_id', 'click_article_id'])

# 线上模式只用测试查询
df_query = df_test_query
# 合并所有点击数据
df_click = pd.concat([df_train_click, df_test_click],
sort=False).reset_index(drop=True)
# 按用户和时间排序
df_click = df_click.sort_values(['user_id',
'click_timestamp']).reset_index(drop=True)

# 记录日志
log.debug(
f'df_query shape: {df_query.shape}, df_click shape: {df_click.shape}')
log.debug(f'{df_query.head()}')
log.debug(f'{df_click.head()}')

# 保存文件
os.makedirs('../data/online', exist_ok=True) # 注意:这里路径与offline不同

df_click.to_pickle('../user_data/data/online/click.pkl')
df_query.to_pickle('../user_data/data/online/query.pkl')

主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
if __name__ == '__main__':
df_train_click = pd.read_csv('../tcdata/train_click_log.csv')
# df_test_click = pd.read_csv('../tcdata/testB_click_log_Test_B.csv')
df_test_click = pd.read_csv('../tcdata/testA_click_log.csv')

log.debug(
f'df_train_click shape: {df_train_click.shape}, df_test_click shape: {df_test_click.shape}'
)

if mode == 'valid':
data_offline(df_train_click, df_test_click)
else:
data_online(df_train_click, df_test_click)

recall_w2v.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
# 导入标准库
import argparse # 命令行参数解析
import math # 数学函数(未显式使用,可能为后续扩展预留)
import os # 操作系统接口(路径、文件等)
import pickle # 序列化/反序列化 Python 对象
import random # 随机数生成
import signal # 处理系统信号(如 Ctrl+C)
import warnings # 控制警告信息

# 导入第三方库
from collections import defaultdict # 默认字典,避免 KeyError
from random import shuffle # 打乱列表顺序

import multitasking # 多进程/多线程任务调度
import numpy as np # 数值计算
import pandas as pd # 数据处理
from annoy import AnnoyIndex # 近似最近邻索引(高效向量检索)
from gensim.models import Word2Vec # Word2Vec 词向量模型
from tqdm import tqdm # 进度条显示

# 导入自定义模块
from utils import Logger, evaluate # 日志记录器和评估函数

# 忽略所有警告(如 pandas FutureWarning)
warnings.filterwarnings('ignore')

# 配置多任务:使用 CPU 核心数作为最大线程数,引擎设为进程(非线程)
max_threads = multitasking.config['CPU_CORES']
multitasking.set_max_threads(max_threads)
multitasking.set_engine('process') # 使用多进程,避免 GIL 限制

# 注册信号处理:Ctrl+C 时终止所有子进程
signal.signal(signal.SIGINT, multitasking.killall)

# 设置随机种子,保证可复现性
seed = 2020
random.seed(seed)
# 注意:未设置 np.random.seed 或 torch.manual_seed,若需完全复现需补充

# ================== 命令行参数配置 ==================
parser = argparse.ArgumentParser(description='w2v 召回')
parser.add_argument('--mode', default='valid', help="运行模式: 'valid'(离线验证)或 'online'(线上推理)")
parser.add_argument('--logfile', default='test.log', help="日志文件名")

args = parser.parse_args()
mode = args.mode
logfile = args.logfile

# ================== 初始化日志 ==================
os.makedirs('../user_data/log', exist_ok=True) # 创建日志目录
log = Logger(f'../user_data/log/{logfile}').logger # 使用自定义 Logger
log.info(f'w2v 召回,mode: {mode}')

# ================== Word2Vec 训练函数 ==================
def word2vec(df_, f1, f2, model_path):
"""
基于用户-物品交互序列训练 Word2Vec 模型,返回物品向量映射。

参数:
df_: DataFrame,包含用户和物品交互记录
f1: 用户列名(如 'user_id')
f2: 物品列名(如 'click_article_id')
model_path: 模型保存路径

返回:
article_vec_map: dict,{article_id: embedding_vector}
"""
df = df_.copy()
# 按用户聚合,生成 [item1, item2, ...] 序列
tmp = df.groupby(f1, as_index=False)[f2].agg({f'{f1}_{f2}_list': list})
sentences = tmp[f'{f1}_{f2}_list'].values.tolist()
del tmp[f'{f1}_{f2}_list'] # 释放内存

words = []
# 将所有 item 转为字符串(Word2Vec 要求输入为 str)
for i in range(len(sentences)):
x = [str(x) for x in sentences[i]]
sentences[i] = x
words += x

# 如果模型已存在,直接加载;否则训练新模型
if os.path.exists(f'{model_path}/w2v.m'):
model = Word2Vec.load(f'{model_path}/w2v.m')
else:
model = Word2Vec(
sentences=sentences,
size=256, # 向量维度
window=3, # 上下文窗口大小
min_count=1, # 最小词频(保留所有)
sg=1, # 1=Skip-gram, 0=CBOW
hs=0, # 0=负采样, 1=Hierarchical Softmax
seed=seed,
negative=5, # 负采样数量
workers=10, # 并行训练线程数
iter=1 # 训练轮数(通常需更多,此处为快速实验)
)
model.save(f'{model_path}/w2v.m')

# 构建 article_id -> vector 映射(转回 int key)
article_vec_map = {}
for word in set(words):
if word in model.wv: # 推荐使用 model.wv 而非 model
article_vec_map[int(word)] = model.wv[word]
return article_vec_map

# ================== 多进程召回任务 ==================
@multitasking.task
def recall(df_query, article_vec_map, article_index, user_item_dict, worker_id):
"""
对一批用户进行召回,结果保存为临时 pkl 文件。

参数:
df_query: DataFrame,包含 ['user_id', 'click_article_id'],-1 表示无标签
article_vec_map: 物品向量映射
article_index: Annoy 索引
user_item_dict: 用户历史点击字典 {user_id: [item1, item2, ...]}
worker_id: 任务 ID,用于命名临时文件
"""
data_list = []

# 遍历每个用户(带进度条)
for user_id, item_id in tqdm(df_query.values, desc=f'Worker {worker_id}'):
rank = defaultdict(int) # 存储候选物品的累计相似分

# 获取用户历史点击(只取最近 1 个)
interacted_items = user_item_dict[user_id][-1:]

# 基于每个历史物品,召回相似物品
for item in interacted_items:
article_vec = article_vec_map[item]
# 使用 Annoy 检索 Top-100 最近邻
item_ids, distances = article_index.get_nns_by_vector(
article_vec, 100, include_distances=True
)
# 将欧氏距离转换为相似度(Annoy angular 距离 ∈ [0,2])
sim_scores = [2 - distance for distance in distances]

# 累加相似分(过滤已交互物品)
for relate_item, wij in zip(item_ids, sim_scores):
if relate_item not in interacted_items:
rank[relate_item] += wij

# 取 Top-50 候选
sim_items = sorted(rank.items(), key=lambda d: d[1], reverse=True)[:50]
item_ids = [item[0] for item in sim_items]
item_sim_scores = [item[1] for item in sim_items]

# 构造结果 DataFrame
df_temp = pd.DataFrame({
'article_id': item_ids,
'sim_score': item_sim_scores,
'user_id': user_id
})

# 打标签:若为验证集,标记是否命中真实点击
if item_id == -1:
df_temp['label'] = np.nan # 线上无标签
else:
df_temp['label'] = 0
df_temp.loc[df_temp['article_id'] == item_id, 'label'] = 1

# 列顺序与类型标准化
df_temp = df_temp[['user_id', 'article_id', 'sim_score', 'label']]
df_temp['user_id'] = df_temp['user_id'].astype('int')
df_temp['article_id'] = df_temp['article_id'].astype('int')

data_list.append(df_temp)

# 合并当前 worker 的所有结果
df_data = pd.concat(data_list, sort=False)

# 保存临时文件
os.makedirs('../user_data/tmp/w2v', exist_ok=True)
df_data.to_pickle(f'../user_data/tmp/w2v/{worker_id}.pkl')

# ================== 主程序入口 ==================
if __name__ == '__main__':
# 根据 mode 加载不同数据路径
if mode == 'valid':
df_click = pd.read_pickle('../user_data/data/offline/click.pkl')
df_query = pd.read_pickle('../user_data/data/offline/query.pkl')
os.makedirs('../user_data/data/offline', exist_ok=True)
os.makedirs('../user_data/model/offline', exist_ok=True)
w2v_file = '../user_data/data/offline/article_w2v.pkl'
model_path = '../user_data/model/offline'
else:
df_click = pd.read_pickle('../user_data/data/online/click.pkl')
df_query = pd.read_pickle('../user_data/data/online/query.pkl')
os.makedirs('../user_data/data/online', exist_ok=True)
os.makedirs('../user_data/model/online', exist_ok=True)
w2v_file = '../user_data/data/online/article_w2v.pkl'
model_path = '../user_data/model/online'

log.debug(f'df_click shape: {df_click.shape}')
log.debug(f'{df_click.head()}')

# 训练或加载 Word2Vec 模型
article_vec_map = word2vec(df_click, 'user_id', 'click_article_id', model_path)
with open(w2v_file, 'wb') as f:
pickle.dump(article_vec_map, f)

# 构建 Annoy 向量索引(angular 距离,适用于余弦相似度)
article_index = AnnoyIndex(256, 'angular')
article_index.set_seed(2020) # 保证索引可复现

for article_id, emb in tqdm(article_vec_map.items(), desc='Building Annoy Index'):
article_index.add_item(article_id, emb)
article_index.build(100) # 构建 100 棵树,越多越准但越慢

# 构建用户历史点击字典
user_item_ = df_click.groupby('user_id')['click_article_id'].agg(list).reset_index()
user_item_dict = dict(zip(user_item_['user_id'], user_item_['click_article_id']))

# ================== 多进程召回 ==================
n_split = max_threads
all_users = df_query['user_id'].unique()
shuffle(all_users) # 打乱用户顺序,均衡负载
total = len(all_users)
n_len = total // n_split

# 清空旧的临时文件(注意:路径应为 ../user_data/tmp/w2v)
tmp_dir = '../user_data/tmp/w2v'
if os.path.exists(tmp_dir):
for file_name in os.listdir(tmp_dir):
os.remove(os.path.join(tmp_dir, file_name))

# 分发任务
for i in range(0, total, n_len):
part_users = all_users[i:i + n_len]
df_temp = df_query[df_query['user_id'].isin(part_users)]
recall(df_temp, article_vec_map, article_index, user_item_dict, i)

# 等待所有进程完成
multitasking.wait_for_tasks()
log.info('合并任务')

# 合并所有临时文件
df_data = pd.DataFrame()
for file_name in os.listdir(tmp_dir):
df_temp = pd.read_pickle(os.path.join(tmp_dir, file_name))
df_data = pd.concat([df_data, df_temp], ignore_index=True)

# 按用户和相似度排序(重要!确保后续取 Top-K 正确)
df_data = df_data.sort_values(['user_id', 'sim_score'], ascending=[True, False]).reset_index(drop=True)
log.debug(f'df_data.head: {df_data.head()}')

# ================== 评估与保存 ==================
if mode == 'valid':
log.info('计算召回指标')
# 仅统计有真实点击的用户数
total_users = df_query[df_query['click_article_id'] != -1].user_id.nunique()
# 调用自定义 evaluate 函数计算多组指标
metrics = evaluate(df_data[df_data['label'].notnull()], total_users)
hitrate_5, mrr_5, hitrate_10, mrr_10, hitrate_20, mrr_20, hitrate_40, mrr_40, hitrate_50, mrr_50 = metrics
log.debug(f'w2v: HR@5={hitrate_5:.4f}, MRR@5={mrr_5:.4f}, HR@10={hitrate_10:.4f}, MRR@10={mrr_10:.4f}, ...')

# 保存最终召回结果
output_path = '../user_data/data/offline/recall_w2v.pkl' if mode == 'valid' else '../user_data/data/online/recall_w2v.pkl'
df_data.to_pickle(output_path)
log.info(f'召回结果已保存至: {output_path}')