新闻个性化推荐系统源码之构建离线用户画像( 二 )


new_data.registerTempTable("temptable")self.spark.sql('''insert overwrite table user_article_basic select user_id, max(action_time) as action_time,article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked,max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptablegroup by user_id, article_id''')表 user_article_basic 结果如下所示

新闻个性化推荐系统源码之构建离线用户画像

文章插图
 
user_article_basic
计算用户画像我们选择将用户画像存储在 Hbase 中,因为 Hbase 支持原子性操作和快速读取,并且 Hive 也可以通过创建外部表关联到 Hbase,进行离线分析,如果要删除 Hive 外部表的话,对 Hbase 也没有影响 。首先,在 Hbase 中创建用户画像表
create 'user_profile', 'basic','partial','env'在 Hive 中创建 Hbase 外部表,注意字段类型设置为 map
create external table user_profile_hbase(user_idSTRING comment "userID",informationMAP<STRING, DOUBLE> comment "user basic information",article_partial MAP<STRING, DOUBLE> comment "article partial",envMAP<STRING, INT> comment "user env")COMMENT "user profile table"STORED BY 'org.Apache.hadoop.hive.hbase.HBaseStorageHandler'WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")TBLPROPERTIES ("hbase.table.name" = "user_profile");创建外部表之后,还需要导入一些依赖包
cp -r /root/bigdata/hbase/lib/hbase-*.jar /root/bigdata/spark/jars/cp -r /root/bigdata/hive/lib/h*.jar /root/bigdata/spark/jars/接下来,读取处理好的用户行为数据,由于日志中的 channel_id 有可能是来自于推荐频道(0),而不是文章真实的频道,所以这里要将 channel_id 列删除
spark.sql("use profile")user_article_basic = spark.sql("select * from user_article_basic").drop('channel_id')通过文章 ID,将用户行为数据与文章画像数据进行连接,从而得到文章频道 ID 和文章主题词
spark.sql('use article')article_topic = spark.sql("select article_id, channel_id, topics from article_profile")user_article_topic = user_article_basic.join(article_topic, how='left', on=['article_id'])user_article_topic 结果如下图所示,其中 topics 列即为文章主题词列表,如 ['补码', '字符串', '李白', ...]
新闻个性化推荐系统源码之构建离线用户画像

文章插图
 
user_article_topic
接下来,我们需要计算每一个主题词对于用户的权重,所以需要将 topics 列中的每个主题词都拆分为单独的一条记录 。可以利用 Spark 的 explode() 方法,达到类似“爆炸”的效果
import pyspark.sql.functions as Fuser_article_topic = user_topic.withColumn('topic', F.explode('topics')).drop('topics')user_article_topic 如下图所示
新闻个性化推荐系统源码之构建离线用户画像

文章插图
 
user_article_topic
我们通过用户对哪些文章发生了行为以及该文章有哪些主题词,计算出了用户对哪些主题词发生了行为 。这样,我们就可以根据用户对主题词的行为来计算主题词对用户的权重,并且将这些主题词作为用户的标签 。那么,用户标签权重的计算公式为:用户标签权重 =(用户行为分值之和)x 时间衰减 。其中,时间衰减公式为:时间衰减系数 = 1 / (log(t) + 1),其中 t 为发生行为的时间距离当前时间的大小
不同的用户行为对应不同的权重,如下所示
用户行为分值阅读时间(<1000)1阅读时间(>=1000)2收藏2分享3点击5
计算用户标签及权重,并存储到 Hbase 中 user_profile 表的 partial 列族中 。注意,这里我们将频道 ID 和标签一起作为 partial 列族的键存储,这样我们就方便查询不同频道的标签及权重了
def compute_user_label_weights(partitions):""" 计算用户标签权重"""action_weight = {"read_min": 1,"read_middle": 2,"collect": 2,"share": 3,"click": 5}from datetime import datetimeimport numpy as np# 循环处理每个用户对应的每个主题词for row in partitions:# 计算时间衰减系数t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')alpha = 1 / (np.log(t.days + 1) + 1)if row.read_time== '':read_t = 0else:read_t = int(row.read_time)# 计算阅读时间的行为分数read_score = action_weight['read_middle'] if read_t > 1000 else action_weight['read_min']# 计算各种行为的权重和并乘以时间衰减系数weights = alpha * (row.shared * action_weight['share'] + row.clicked * action_weight['click'] +row.collected * action_weight['collect'] + read_score)# 更新到user_profilehbase表with pool.connection() as conn:table = conn.table('user_profile')table.put('user:{}'.format(row.user_id).encode(),{'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(weights).encode()})conn.close()user_topic.foreachPartition(compute_user_label_weights)


推荐阅读