Skip to content

Latest commit

 

History

History
196 lines (161 loc) · 10 KB

3.2.4.1.md

File metadata and controls

196 lines (161 loc) · 10 KB

规则类召回

在实际的推荐场景中,并不是所有的召回算法都是通过模型来做的,还有很多召回会根据具体的业务场景,通过指定相关的规则来进行召回,例如我们常见的"热门页",这个一般可以通过热门召回来实现。还有比如我们发现业务中不同地域的人的消费内容也有不同的喜好,那么我们也可以做一些地域类的召回。

  • 对于热门召回来说,最简单的做法就是统计所有物料的热度分数,这个热度分数可以考虑多种因素通过一个加权的分数来表示,最后根据热度分数倒排。除此之外还可能会考虑一些业务上的其他规则,例如当把物料库中的所有物料的热度分数算完之后,发现热门物料的数量比较多,这个时候一般可以根据用户的一些兴趣标签,进一步缩小每个用户的热门列表,使得热门页的内容也是带有一定的个性化的。

  • 与热门召回类似,对于地域类召回的话,可以先通过地域将物料进行分类。对于不同地域中的物料,如果物料量仍然比较多的话,可能对于地域类的物料可以单独考虑使用模型做召回,如果不是很多,也可以通过简单的规则做召回,例如可以对每个地域中的物料通过上述热门召回的思路来实现。

总之:规则类的召回一般都跟业务强相关,不同的筛选策略一般是通过对业务数据进行分析或者业务经验确定的。

下面分别给出了新闻推荐系统中使用的两个数据集的规则类召回代码(后续会把这一块的内容整合到一个数据集上):

  • 热度召回使用的数据集是通过爬虫获取的数据集
  • 地域召回使用的是科大讯飞新闻推荐竞赛的数据集

热度召回

import sys
sys.path.append('../../')
from conf.dao_config import cate_dict
from dao.mongo_server import MongoServer
from dao.redis_server import RedisServer
from datetime import datetime


# 这里需要从物料库中获取物料的信息,然后更新物料当天最新的热度信息
# 最终将计算好的物料热度,对物料进行排序

class HotRecall(object):
    def __init__(self):
        self.feature_protrail_collection = MongoServer().get_feature_protrail_collection()
        self.reclist_redis = RedisServer().get_reclist_redis()
        self.cate_dict = cate_dict

    def update_hot_value(self):
        """更新物料库中所有新闻的热度值
        """
        # 遍历物料池里面的所有文章
        for item in self.feature_protrail_collection.find():
            news_id = item['news_id']
            news_cate = item['cate']
            news_ctime = item['ctime']
            news_likes_num = item['likes']
            news_collections_num = item['collections']
            news_read_num = item['read_num']
            news_hot_value = item['hot_value']

            # 时间转换与计算时间差   前提要保证当前时间大于新闻创建时间,目前没有捕捉异常
            news_ctime_standard = datetime.strptime(news_ctime, "%Y-%m-%d %H:%M")
            cur_time_standard = datetime.now()
            time_day_diff = (cur_time_standard - news_ctime_standard).days
            time_hour_diff = (cur_time_standard - news_ctime_standard).seconds / 3600

            # 只要最近3天的内容
            if time_day_diff > 3:
                continue

            # 72 表示的是3天,
            news_hot_value = (news_likes_num * 0.6 + news_collections_num * 0.3 + news_read_num * 0.1) * 10 / (1 + time_hour_diff / 72) 

            # 更新物料池的文章hot_value
            item['hot_value'] = news_hot_value
            self.feature_protrail_collection.update({'news_id':news_id}, item)

    def group_cate_for_news_list_to_redis(self, ):
        """将每个用户的推荐列表按照类别分开,方便后续打散策略的实现
        对于热门页来说,只需要提前将所有的类别新闻都分组聚合就行,后面单独取就可以
        注意:运行当前脚本的时候需要需要先更新新闻的热度值
        """
        # 1. 按照类别先将所有的新闻都分开存储
        for cate_id, cate_name in self.cate_dict.items():
            redis_key = "hot_list_news_cate:{}".format(cate_id)
            for item in self.feature_protrail_collection.find({"cate": cate_name}):
                self.reclist_redis.zadd(redis_key, {'{}_{}'.format(item['cate'], item['news_id']): item['hot_value']})
        

if __name__ == "__main__":
    hot_recall = HotRecall()
    # 更新物料的热度值
    hot_recall.update_hot_value()
    # 将新闻的热度模板添加到redis中
    hot_recall.group_cate_for_news_list_to_redis()

地域召回

import os 
import sys 
sys.path.append("../../")
from conf.proj_path import log_data_path, user_info_path, doc_info_path
import pandas as pd


class RegionRecall(object):
    def __init__(self, log_data_path, user_info_path, doc_info_path):
        super().__init__()
        self.log_data_path = log_data_path
        self.user_info_path = user_info_path
        self.doc_info_path = doc_info_path
        self.read_and_process_data()

    def read_and_process_data(self):
        """读取并处理数据
        """
        log_columns = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums', 'expo_position', 'click', 'duration']
        user_columns = ['user_id', 'device', 'os', 'province', 'city', 'age','gender']
        doc_columns = ['article_id', 'title', 'ctime', 'img_num', 'cate','sub_cate', 'key_words']
        self.train_log_df = pd.read_csv(self.log_data_path, usecols=log_columns, sep='\t')
        self.doc_info_df = pd.read_csv(self.doc_info_path, names=doc_columns, sep='\t')
        self.user_info_df = pd.read_csv(self.user_info_path, usecols=user_columns, sep='\t')
        # 删除重复数据
        self.train_log_df = self.train_log_df.drop_duplicates(keep='last')
        self.doc_info_df = self.doc_info_df.drop_duplicates(keep='last')
        self.user_info_df = self.user_info_df.drop_duplicates(keep='last')
        # 转换成天的日期,可以用来筛选数据
        self.train_log_df['expo_time_day'] = pd.to_datetime(self.train_log_df['expo_time'], unit='ms').dt.strftime('%Y_%m_%d')
        print("read_and_process_data success...")

    def get_article_stat_data(self, train_df, doc_info_df):
        """统计所有文章的点击和曝光次数,以及点击率(点击次数 / 曝光次数)
        """
        # 日志数据去重, 保留最后一条记录
        train_df = train_df.drop_duplicates(keep='last')
        # 统计曝光
        expo_num_s = train_df.groupby('article_id')['user_id'].count()
        expo_num_df = pd.DataFrame({'article_id': expo_num_s.index, 'expo_num': expo_num_s.values})
        # 统计点击
        click_num_s = train_df.groupby('article_id')['click'].sum()
        click_num_df = pd.DataFrame({'article_id': click_num_s.index, 'click_num': click_num_s.values})
        article_df = pd.merge(expo_num_df, click_num_df, how='left', on='article_id')
        # 拼接文章信息
        article_df = pd.merge(article_df, doc_info_df[['article_id', 'ctime', 'cate']], how='left', on='article_id')
        article_df['click_rate'] = article_df['click_num'] / article_df['expo_num']
        return article_df

    def get_province_articles(self, df, topK, cur_time):
        """筛选和过滤规则, 里面的超参都可以根据业务数据的具体分布进行修改
        """
        df['ctime_date'] = pd.to_datetime(df['ctime'], unit='ms')
        # 时间差
        df['delta_time'] = pd.to_datetime([cur_time] * df.shape[0]) - df['ctime_date']  
        # 保留最近三天的新闻,保证新闻的时效性
        df = df[df['delta_time'].dt.days >= 3].reset_index(drop=True)
        # expo_num 过滤
        df = df[df['expo_num'] >= 1000].reset_index(drop=True)
        # 点击率
        df = df[df['click_rate'] >= 0.1].reset_index(drop=True)
        # 按照点击率倒排
        df = df.sort_values('click_rate', ascending=False).reset_index(drop=True)
        # 数据格式:article_id:cate:click_rate
        df['article_id_and_click_rate'] = df.apply(lambda x: str(\
            x['article_id']) + ':' + str(x['cate']) + ':' + str(round(x['click_rate'], 5)), axis=1)
        article_list = df['article_id_and_click_rate'].values[:topK]
        return article_list

    def province_recall(self, N, cur_time):
        article_df = self.get_article_stat_data(self.train_log_df, self.doc_info_df)
        region_articles_df = pd.merge(self.train_log_df, self.user_info_df, how='left', on='user_id')
        region_articles_df = region_articles_df[['user_id', 'article_id', 'province', 'city']]
        region_articles_df = pd.merge(region_articles_df, article_df, how='left', on='article_id')
        # 去除时间为空, 以及一些异常数据
        region_articles_df['ctime'] = region_articles_df['ctime'].astype(str)
        region_articles_df = region_articles_df[region_articles_df['ctime'].str.isnumeric()]
        # 分组
        province_df_dict = {}
        for name, df in region_articles_df.groupby('province'):
            if df.shape[0] < 5000:
                continue
            # 分完组之后可以取出重复数据
            df = df[['article_id', 'expo_num', 'click_num', 'click_rate', 'ctime', 'cate']].\
                drop_duplicates(subset='article_id').reset_index(drop=True)
            province_df_dict[name] = df
        # 给每个省份筛选一部分优质物料
        province_results_dict = {}
        for province, df in province_df_dict.items():
            province_results_dict[province] = self.get_province_articles(df, N, cur_time)
        print("province_recall success...")
        return province_results_dict


if __name__ == "__main__":
    root_path = '/data1/ryluo/5w_data/'
    region_recall = RegionRecall(log_data_path, user_info_path, doc_info_path)
    province_results_dict = region_recall.province_recall(N=300, cur_time='2021-07-03')
    # 这里的召回内容还没有落盘
    print(province_results_dict)
    # TODO 召回结果落盘逻辑