Recommendation-System-EGES

Introduction

这篇文章主要回顾和介绍2018年Alibaba在KDD里面提出的的EGES(Enhanced Graph Embedding with Side information) 模型。 这个模型主要是在DeepWalk 学习到embedding的基础上增添了其他item的补充信息(side inforamtion). 在介绍EGES之前,先介绍一下过去Item2Vec以及DeepWalk 学习item的embedding的特点以及一些不足。 之后再介绍EGES的思路和训练方法。

Item2Vec

Item2Vec可以看成是Word2Vec 在推荐系统里面的推广。 Word2Vec 假设的是 在一个长为T的窗口/句子里面有{w1, w2,… wT}个单词,然后计算第i个单词(central word)和第j个单词(context word)的同时出现的隔离. 而ItemVec则是不考虑窗口的概念,而是考虑在用户的一个浏览的历史里面浏览过的商品{w1, w2,… wT}, 把单词替换成item。它的objective function是

$$
\mathbf{L} = \frac{1}{K} \sum_ {i=1}^k\sum_ {j \neq i}^k log(w _i | w _j)
$$

Note: 在item2Vec的objective function里面它是计算每对商品之间的log的概率, 而word2Vec里面是计算给定的central word和其他context word之间的概率。这样看起来有点像是FM里面的特征两两交叉的操作

Item2Vec 的一些特点

  • 广义上来讲, 只要能通过item的特征生成特征向量的方法都是item2Vec.
  • Advantages:
    • Item2Vec 是一个很广泛的概念,只要是序列数据都能够用来做embedding的学习,对item的embedding vector进行学习
  • Disadvantages:
    • 和Word2Vec 一样, Item2Vec只用了序列的数据,比如item的购买序列,但是却没有用到其他的特征信息,比如item和item, user和user, user和item之间的关联信息(Relationship data), item的tag是补充信息。
    • 为了解决Item2Vec只能用序列信息的问题, GraphEmbedding提供了很好的解决方案。Graph Embedding里面充分利用了item的补充信息外还能学习item,user之间的关联信息。而DeepWalk 就是其中一种GraphEmbedding的方法

EGES

正如上面说的Item2Vec 只能用sequential data进行item的embedding vector学习,但是不能用到其他信息,比如item-item直接的关联,item的补充信息(categorical or numerical tags)。 而Graph Embedding的出现能够有效解决这个问题,之后也出现大量的graph embedding的学习方法。而 EGES (Enhanced Graph Embedding with Side Information) 就是其中之一。

Motivation

EGES (Enhanced Graph Embedding with Side Information) 是有Alibaba在2018年KDD里面提出来的基于DeepWalk的GraphEmbedding 方法结合了item补充信息而得到的模型。它先基于DeepWalk得到基本的GraphEmbedding 模型,然后逐步把Side Information添加到模型里面进行优化. 所以下面先解释DeepWalk, 之后再解释EGES模型

DeepWalk / Basic Graph Embedding (BGE)

DeepWalk 是在2014年KDD里面有Byran等人提出来的模型. Link:https://arxiv.org/pdf/1403.6652.pdf

在应用到Alibaba的推荐系统里面后它的基本流程如下:

  1. 收集用户的历史浏览数据

  2. 根据浏览数据搭建item之间的有向概率图。图里面每个节点代表一个item,而item之间的边是有向边,每条边都有对应的跳转概率。概率的计算公式如下, $P(v _j | v _i)$ 是从节点vi跳到vj 的概率,$M _{ij}$ 是从节点vi跳到vj的边的权重, 而 $N _{+} (v _i)$ 是节点vi的out-degree(出边)的节点的集合。 $\epsilon$是图中所有边的集合

  3. 在概率图中用RandomWalk 随机游走采样多个物品序列

  4. 用这些序列放到Word2Vec里面训练得到Embedding,而其中的Objective/loss function 和Word2Vec的一样(如下形式), 都是minimize在center node已知的情况下context node的出现概率的negative log的值。

Graph Embedding with Side information (BES)

虽然DeepWalk学习到的embedding能够学习到Collabortive Filtering学不到的行为序列的关系,但是它依然有以下问题:

  • 不能解决冷启动问题,面对新进来的item, item的graph是没有新进来的item信息
  • item的其他side information比如 标签信息等并没有充分利用

所以这里做的一个改进就是把side information也做embedding。得到item embedding以及side information的embedding。 其中$W _v^0$ 是原来学习到的item v 的embedding, $W _v^s$是第s个的side information的embedding。如果side information的embedding有n个,那么这个item v的embedding个数总共有n+1个,之后item v的所有embedding通过average pooling的方法进行信息融合得到对应item的embedding, 即$H _v$

Enhanced Graph Embedding with Side information (EBES)

那么问题又来了, 虽然GES能够通过side information解决冷启动的问题,但是每个用户对一个item的喜欢的信息是有倾向性的,比如一个用户喜欢一台手机是因为它是IPhone, 另一个用户喜欢一台手机是因为它是续航能力强,那么用户对这些特征的倾向性也应该反映到对embedding的倾向性上面,如对”Iphone” 或”续航能力强”这个tag的embedding的权重应该提高。 因此它对每个embedding应该加上一个权重。于是, item的embedding的形式被调整成下面形式

其中:

  • $a _v^j$ 是第v个item第j个embedding的权重值,由于要保证这个weight是大于0,于是用来exponent 作为底数
  • $H _v$是对第v个item所有embedding的weighted sum

在更新了Embedding的公式之后,EGES的embedding再乘上一个output matrix然后做softmax进行概率的预测,这一点和Word2Vec 一样。

loss用的是binary cross entropy. $Z _u$ 是context node u的embedding的信息,$H _v$是center node的embedding信息。这里的context node在原文中是通过negative sampling方法来采样负样本的节点,和Word2Vec一样。

Experiment

原文里面用的评估方式也是传统的线下AUC和线上A/B CTR testing的评估。这个就不多说了

Opinions

这里对上面一些算法的思考:

  • 在DeepWalk 生成概率图来重新采样时,这个跳转的概率应该是要逼近实际的数据中两个item之间的有向关系,这种情况下,一般是要数据越大越好,才能逼近真实的关系
  • Deepwalk 里面的图只能计算已经知道的item之间的关系,对应新进来的item是始终有冷启动的问题。 而面对这个问题EGES是通过引入side information的embedding进行新的item信息填充来解决一开始新item没有embedding的情况
  • 个人认为EGES创新性一般,主要还是在原因的deepwalk模型里面引入其他信息的embedding,而其他步骤基本上和Word2Vec一样。 不过这个其实能够解决冷启动问题而且也一定程度上提高了CTR的表现,算是比较实用的模型

Source Code

这里基于SparrowRecSys 开源推荐系统的github代码自己重新写了一遍 DeepWalk的graph embedding的计算. 我自己在弄的推荐系统的github:https://github.com/wenkangwei/RecSys

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
%%writefile embedding.py

def getItemSeqs(spark, samplesRating):
"""
extract item sequences for each user from dataframe
1. for each user, collect the corresponding visited movies and timestamp into a list
2. use UDF to process movie list and timestamp list to sort the movie sequence for each user
3. join the movie list to get a string for each user
"""
def sortF(movie_list, timestamp_list):
"""
sort by time and return the corresponding movie sequence
eg:
input: movie_list:[1,2,3]
timestamp_list:[1112486027,1212546032,1012486033]
return [3,1,2]
"""
pairs = []
# concat timestamp with movie id
for m, t in zip(movie_list, timestamp_list):
pairs.append((m, t))
# sort by time
pairs = sorted(pairs, key=lambda x: x[1])
return [x[0] for x in pairs]


sortUDF = udf(sortF, ArrayType(StringType()))

# rating data
#ratingSamples.show(5)
# ratingSamples.printSchema()
userSequence = samplesRating.where(F.col("rating") > 3) \
.groupBy("userId")\
.agg(sortUDF(F.collect_list("movieId"), F.collect_list("timestamp")).alias("movieIds"))\
.withColumn("movieIdStr", F.array_join(F.col("movieIds"), " "))
seq = userSequence.select("movieIdStr").rdd.map(lambda x : x[0].split(" "))
#print(seq.collect()[:5])
return seq



def embeddingLSH(spark, movieEmbMap):
"""
Local sensitive hashing using bucketedRandomProjection
"""
movieEmbSeq = []
for key, embedding_list in movieEmbMap.items():
embedding_list = [np.float64(embedding) for embedding in embedding_list]
movieEmbSeq.append((key, Vectors.dense(embedding_list)))
movieEmbDF = spark.createDataFrame(movieEmbSeq).toDF("movieId", "emb")
bucketProjectionLSH = BucketedRandomProjectionLSH(inputCol="emb", outputCol="bucketId", bucketLength=0.1,
numHashTables=3)
bucketModel = bucketProjectionLSH.fit(movieEmbDF)
embBucketResult = bucketModel.transform(movieEmbDF)
print("movieId, emb, bucketId schema:")
embBucketResult.printSchema()
print("movieId, emb, bucketId data result:")
embBucketResult.show(10, truncate=False)
print("Approximately searching for 5 nearest neighbors of the sample embedding:")
sampleEmb = Vectors.dense(0.795, 0.583, 1.120, 0.850, 0.174, -0.839, -0.0633, 0.249, 0.673, -0.237)
bucketModel.approxNearestNeighbors(movieEmbDF, sampleEmb, 5).show(truncate=False)



def getTransitionMatrix(item_seq):
"""
build graph and transition matrix based on input item sequences
input: list of item sequence in RDD format
output: transition matrix and item distribution in dictionary format

"""
def generate_pair(x):
"""
use a sliding window with size of 2 to generate item pairs
input: ls = list of items
output: list of pairs
example:
input: [86, 90, 11, 100,]
output: [[86,90], [90, 11], [11,100]]
"""
res = []
prev = None
print(x)
for i in range(len(x)):
if i >0:
res.append((x[i-1],x[i]))
return res


# convert item sequences to pair list
pair_seq = item_seq.flatMap(lambda x: generate_pair(x))
# convert pair list to dictionary, key = pair, value = count
pair_count_dict = pair_seq.countByValue()
tot_count = pair_seq.count()
trans_matrix = defaultdict(dict)
item_count = defaultdict(int)
item_dist = defaultdict(float)

# consider out-degree only
for item, cnt in pair_count_dict.items():
item1, item2 = item[0], item[1]
item_count[item1] += cnt
trans_matrix[item1][item2] = cnt

for item, cnt in pair_count_dict.items():
item1, item2 = item[0], item[1]
# possibility of transition
trans_matrix[item1][item2] /= item_count[item1]
# distribution of each source node (item)
item_dist[item1] = item_count[item1]/tot_count

return trans_matrix, item_dist

def oneRandomWalk(trans_mat, item_dist, sample_length):
"""
generate one random walk sequence based on transition matrix
input:
- trans_mat: transition matrix
- item_dist: distribution of item
- sample length: number of node in a path = length of a walk -1 = length of edges - 1
"""
rand_val = random.random()
# randomly pick item based on CDF , cumulative density function, obtained from the item distribution
# we can also randomly pick a item based on the distribution using choice () function from numpy as well
cdf_prob =0
first_item = ''
for item, prob in item_dist.items():
cdf_prob += prob
if cdf_prob >= rand_val:
first_item = item
break
item_list = [first_item]
cur_item = first_item

while len(item_list) < sample_length:
if (cur_item not in item_dist) or (cur_item not in trans_mat):
break
cdf_prob = 0
rand_val = random.random()
dist = trans_mat[cur_item]
for item, prob in dist.items():
cdf_prob += prob
if cdf_prob >= rand_val:
cur_item = item
break
item_list.append(cur_item)

return item_list



def generateItemSeqs(trans_mat, item_dist, num_seq=20000, sample_length = 10 ):
"""
use random walk to generate multiple item sequences
"""
samples = []
for i in range(num_seq):
samples.append(oneRandomWalk(trans_mat, item_dist, sample_length))

return samples


def trainItem2Vec(spark, item_seqs, emb_length, output_path, save_to_redis=False, redis_keyprefix=None):
"""
use Word2Vec to train item embedding
input:
- item_seqs: RDD pipeline instance, rather than dataframe
Note:
- Word2Vec from mllib is a function that take RDD pipeline as input.
- Word2Vec from ml is a function that take Dataframe as input

"""
# train word2Vec
# w2v = Word2Vec(vectorSize=emb_length, windowSize = 5, maxIter = 10, seed=42)
w2v = Word2Vec().setVectorSize(embLength).setWindowSize(5).setNumIterations(10)
model = w2v.fit(item_seqs)
# test word2vec
synonyms = model.findSynonyms("157", 20)
for synonym, cos_similarity in synonyms:
print(synonym, cos_similarity)

# save word2Vec to input path
if not os.path.exists(output_path):
os.makedirs(output_path)
with open(output_path, "w") as fp:
for movie_id in model.getVectors():
# convert vector to string type and store it
vector = " ".join([str(emb) for emb in model.getVectors()[movie_id]])
pair = movie_id + ":" + vector + "\n"
fp.write(pair)
return model


def getDeepWalk(spark, item_seq, sample_length=10, num_walk=20000, output_file='../../data/modeldata/embedding.csv',
save_to_redis=False, redis_key_prefix=None):
"""
use DeepWalk to generate graph embeddings
input:
- item_seq: RDD based sequence of item visited by a user

"""

# construct probability graph
trans_mat, item_dist = getTransitionMatrix(item_seq)

# generate sequence samples randomly
samples = generateItemSeqs(trans_mat, item_dist,num_seq=num_walk, sample_length = sample_length )
# convert list of samples to spark rdd
samples_rdd = spark.sparkContext.parallelize(samples)
# train item2Vec
graphEmbModel = trainItem2Vec(spark, samples_rdd, emb_length=10, output_path=output_file , save_to_redis=False, redis_keyprefix=None)

return graphEmbModel

def getUserEmb( spark ,samples_rating, item_emb_model, output_file):
"""
generate user embedding based on item embedding
use map reduce to sum up embeddings of items purchased by user to generate user embedding
input:
- spark: spark session
- samples_rating: dataframe with rating, movieId, userId data
- item_emb_model: word2Vec/Item2Vec model trained by deep walk.
- output_file: file name of user embedding

"""

# assert not item_emb or not item_emb_path, "Must input either item embedding vectors or path"
# if item_emb_path != None:
# item_emb = spark.read.csv(item_emb_path, header=True)

emb_dict = item_emb_model.getVectors()
item_emb_ls=[]
for item, emb in emb_dict.items():
#print((item, emb))
item_emb_ls.append((item, list(emb)))
fields = [StructField('movieId', StringType(),False),
StructField('emb', ArrayType(FloatType()),False),]
item_emb_schema = StructType(fields)
item_emb_df = spark.createDataFrame(item_emb_ls, item_emb_schema)

# apply mapreduce to sum up item embeddings for each user to obtain user embedding
# Note: we need inner join here to avoid empty item embedding during mapreduce calculation
user_emb = samples_rating.join(item_emb_df, on="movieId", how="inner")
print()
print("User Embdding")
user_emb.show(5)
user_emb.printSchema()
user_emb = user_emb.select("userId","emb").rdd.map(lambda row: (row[0], row[1]) ).reduceByKey(lambda emb1, emb2: [ float(emb1[i]) + float(emb2[i]) for i in range(len(emb1))] ).collect()
print(user_emb[:5])
#save user embedding
with open(output_file,"w") as fp:
for userId, emb in user_emb:
row = " ".join([str(e) for e in emb])
row = str(userId)+ ":"+ row + "\n"
fp.write(row)
print("User Embedding Saved!")
return


if __name__ == '__main__':
conf = SparkConf().setAppName('ctrModel').setMaster('local')
spark = SparkSession.builder.config(conf=conf).getOrCreate()
# Change to your own filepath
file_path = '../../data/'
rawSampleDataPath = file_path + "ratings.csv"
embLength = 10
print("Process ItemSquence...")
samplesRating = spark.read.csv(rawSampleDataPath, header = True)
item_seqs = getItemSeqs(spark, samplesRating)
#print(samples)

#trainItem2Vec(item_seqs, emb_length=10, output_path=file_path+"modeldata/itemGraphEmb.csv", save_to_redis=False, redis_keyprefix=None)

graphEmb = getDeepWalk(spark, item_seqs, sample_length=10, num_walk=20000, output_file=file_path+"modeldata/itemGraphEmb.csv",
save_to_redis=False, redis_key_prefix=None)
getUserEmb( spark ,samples_rating= samplesRating, item_emb_model= graphEmb, output_file= file_path+"modeldata/userEmb.csv")

print("Done!")

Reference

[1] Alibaba-EGES: https://arxiv.org/pdf/1803.02349v2.pdf

[2] DeepWalk: https://arxiv.org/pdf/1403.6652.pdf

[3] SparrowRecSys: https://time.geekbang.org/column/article/296932

[4] 推荐算法大佬的github: https://github.com/shenweichen/GraphEmbedding

[5] DeepWalk的原文: http://www.perozzi.net/publications/14_kdd_deepwalk.pdf

Comments