from mysql_db import MysqlDB import datetime class ReportPush(object): """ 报表推送功能实现类 """ pass # 1:总浏览量 # # 集团部分+项目部分!!! # ----项目PV--权限项目范围内,求和 # 参数:时间区间 和 项目列表 sql_1_1 = """ SELECT SUM(pv) AS pv1 FROM a_idfa_behavior_sum WHERE `report_d` >= %s and report_d < %s AND house_id IN %s """ # ----集团PV--权限项目范围内,求和 # 参数:数据区间和项目列表 sql_1_2 = """ SELECT SUM(pv) AS pav2 FROM a_behavior_brand_mini_day WHERE report_d >= %s and report_d < %s AND house_id IN %s """ # 2:总浏览人数 # # 参数,数据区间, 项目列表 sql_1_3 = """ SELECT count( DISTINCT IFNULL(mobile, user_id) ) as people FROM ( SELECT A.user_id, B.mobile FROM a_idfa_behavior_sum A LEFT JOIN d_user B ON A.user_id = B.user_id WHERE A.report_d >= %s and A.report_d < %s AND A.house_id IN %s UNION SELECT A.brand_user_id AS user_id, B.mobile FROM a_behavior_brand_mini_day A LEFT JOIN `a_brand_app_customer` B ON A.brand_user_id = B.brand_customer_id WHERE A.report_d >= %s and A.report_d < %s AND A.house_id IN %s ) t1 """ # 3:新增获客 sql_1_4 = """ SELECT count( DISTINCT IFNULL(mobile, user_id) ) FROM ( SELECT user_id, mobile, created FROM `d_user` WHERE created >= %s AND created < %s AND house_id IN %s UNION SELECT brand_customer_id AS user_id, mobile, rlat_created FROM `a_brand_app_customer_house_rlat` WHERE rlat_created >= %s AND rlat_created < %s AND rlat_house_id IN %s ) t1 """ # 4:新增获电 sql_1_5 = """ SELECT COUNT(DISTINCT mobile) FROM ( SELECT user_id, mobile, wx_phone_time AS created FROM `d_user` WHERE wx_phone_time >= %s AND wx_phone_time < %s AND house_id IN %s UNION SELECT brand_customer_id, mobile, houdian_time AS created FROM ( SELECT *, CASE WHEN rlat_created > shouquan_time THEN rlat_created ELSE shouquan_time END AS houdian_time FROM a_brand_app_customer_house_rlat WHERE mobile IS NOT NULL AND rlat_house_id IN %s AND cust_house_flag = '1' ) t1 WHERE houdian_time >= %s AND houdian_time < %s ) t1 """ # 5:集团小程序总浏览量(针对香港置地要命名为【CNC小程序总浏览量】) # 集团部分 # 6:集团小程序总浏览人数(针对香港置地要命名为【CNC小程序总浏览人数】) # # 集团小程序整体UV(见SQL3) sql_1_6 = """ SELECT COUNT(DISTINCT brand_user_id) AS UV4 FROM a_behavior_brand_mini_day WHERE report_d > %s AND report_d < %s AND house_id IN %s """ # 7: 集团小程序新增获客(针对香港置地要命名为【CNC小程序新增获客】) # # 权限项目范围内,集团维度的获客 # 8: 集团小程序新增获电(针对香港置地要命名为【CNC小程序新增获电】) # # 权限项目范围内,集团维度的获电 # 9.单项目小程序总浏览量 # # 项目部分 # # 10.单项目小程序总浏览人数 # # 权限项目未授权部分求和+去重的授权部分(见SQL2) # # 11.单项目小程序新增获客 # # 权限项目未授权部分求和+去重的授权部分(见SQL7) # # 12.单项目小程序新增获电 # # 权限项目所有授权手机号去重(见SQL8) # 1.默认值/001_大麦/项目排行榜/小程序排行榜TOP_N sql_2_1 = """ SELECT a.house_id, a.house_name, SUM(a.pv), SUM(a.uv), SUM(a.new_cust_num), SUM(a.wx_num) from (SELECT a.*, b.house_name, c.interested_num, d.wx_num, e.new_cust_num FROM ( SELECT house_id, count( DISTINCT ifnull(user_id, idfa) ) uv, sum(session_times) session_times, sum(sum_session_time) sum_session_time, sum(pv) pv, sum(page_num) page_num FROM a_idfa_behavior_sum WHERE report_d >= %s AND report_d < %s GROUP BY house_id ) a JOIN d_house b ON a.house_id = b.house_id LEFT JOIN ( SELECT house_id, count(*) interested_num FROM f_interested_custlist WHERE report_d >= %s AND report_d < %s GROUP BY house_id ) c ON a.house_id = c.house_id LEFT JOIN ( SELECT house_id, count(*) wx_num FROM f_customer_dynamic WHERE dynamic = 1 AND report_d >= %s AND report_d <= %s GROUP BY house_id ) d ON a.house_id = d.house_id LEFT JOIN ( SELECT house_id, count(*) new_cust_num FROM d_user WHERE created >= %s AND created < %s # 时间需要加一天!!!! GROUP BY house_id ) e ON a.house_id = e.house_id ) a GROUP BY a.house_id, a.house_name order by a.pv desc """ # 2.默认值/006_大麦(集团)/集团项目排行榜v1.3/集团排行榜 sql_2_2 = """ select a.brand_id, a.house_id, a.house_name, SUM(a.pv), SUM(a.uv), SUM(a.new_cust), SUM(a.shouquan_cust) from (SELECT c.pv,c.uv,a.brand_id,a.house_id, a.house_name,a.brand_name, ifnull(b.house_layout_num,0) house_layout_num, ifnull(d.launch_time,'--') launch_time, c.new_cust, c.shouquan_cust, c.revisit_cust FROM ( SELECT brand_id, ifnull(house_id, '0') house_id, sum(pv) pv, count(DISTINCT brand_user_id) uv, count(DISTINCT case when is_new_user = 1 then brand_user_id end) new_cust, count(DISTINCT case when is_shouquan_user = 1 then brand_user_id end) shouquan_cust, count(DISTINCT case when is_new_user = 0 then brand_user_id end) revisit_cust from a_behavior_brand_mini_day where report_d>= %s and report_d < %s GROUP BY brand_id, ifnull(house_id, '0') ) c LEFT JOIN ( SELECT house_id, count(1) house_layout_num FROM d_content_layout WHERE `status` = '1' and house_id <> 1 group by house_id union all SELECT bb.brand_id house_id, count(1) house_layout_num FROM d_content_layout aa join d_house bb on aa.house_id = bb.house_id WHERE aa.`status` = '1' and bb.status = '1' and aa.house_id <> 1 group by bb.brand_id ) b ON c.house_id = b.house_id JOIN d_house a ON a.house_id = c.house_id and a.brand_id = c.brand_id left join d_house_attr d on c.house_id = d.house_id and c.brand_id = d.brand_id ) a group by a.brand_id, a.house_id, a.house_name order by a.pv desc """ # 默认值/001_大麦/场景_用户来源渠道/用户来源渠道—明细 sql_3_1 = """ SELECT house_id, house_name, label_wx, COUNT(a.id) as counts FROM d_user_attr a LEFT JOIN d_scene b ON a.scene = b. CODE WHERE a.source IN (1, 2, 3, 4, 10) AND a.report_d >= %s AND a.report_d < '%s GROUP BY house_id, house_name, label_wx """ # 默认值/006_大麦(集团)/场景(集团)_用户来源渠道_v1.1/用户来源渠道—明细 sql_3_2 = """ select brand_id, x.brand_name, house_id, house_name, label_wx, COUNT(1) from (SELECT a.scene, a.brand_id, b.*, a.share_brand_customer_id, '2' adviser_agent, a.house_id house_id, c.house_name house_name, c.brand_name FROM ( SELECT scene, brand_id, share_brand_customer_id, house_id FROM d_brand_app_customer WHERE created >= %s AND created < DATE_ADD( %s, INTERVAL 1 DAY ) UNION ALL SELECT scene, brand_id, share_brand_customer_id, brand_id house_id FROM d_brand_app_customer WHERE created >= %s AND created < DATE_ADD( %s, INTERVAL 1 DAY ) ) a LEFT JOIN d_scene b ON a.scene = b. CODE JOIN d_house c ON a.house_id = c.house_id AND a.brand_id = c.brand_id) x group by x.brand_id, x.brand_name, x.house_id, x.house_name, x.label_wx """ # 根据任务id获取推送客户信息 sql_4 = """ select a.task_key, a.customer_id, b.customer_type, b.`name`, b.mail, GROUP_CONCAT(c.house_or_brand_id) as ids from report_task_info a left join report_push_customer_info b on b.id = a.customer_id left join report_customer_authority_info c on b.id = c.customer_id where a.task_key = %s and a.status = b.status = c.status = 1 group by a.task_key, a.customer_id, b.customer_type, b.`name`, b.mail """ # 根据集团id获取项目id sql_5 = """ select house_id, house_name from d_house where brand_id = %s """ sql_6 = """insert into report_push_log(name, mail, report_name, push_time, send_status, status) values(%s, %s, %s, now(), %s, 1) """ # 根据项目id获取集团id和名称 sql_7 = """ select a.brand_id, a.brand_name from d_house a where a.house_id = %s; """ def __init__(self, db_name): self.db = MysqlDB(db_name) def report_data_query(self, task_key): """ 定时任务推送数据准备 :param task_key: :return: """ # 根据任务key获取需要推送的客户以及可以的权限 result = {} customers = self.db.select(self.sql_4, [task_key]) # a.task_key, a.customer_id, b.customer_type, b.`name`, b.mail, GROUP_CONCAT(c.house_or_brand_id) time_range = self.get_time_range(task_key) all_time_rang = self.get_time_range(4) # 有限时间范围内的数据 xcx_top_data = self.xcx_top(time_range) brand_top_data = self.brand_top(time_range) customer_channel_details_data = self.customer_channel_details(time_range) brand_customer_channel_details = self.brand_customer_channel_details(time_range) # 所有历史数据 xcx_top_data_all = self.xcx_top(all_time_rang) brand_top_data_all = self.brand_top(all_time_rang) for customer in customers: customer_type = customer[2] house_ids = [] brand_id_list = [] if customer_type == 1: # 项目 ids = customer[5] if str(house_ids).find(',') != -1: house_ids = [x for x in str(ids).split(',')] else: house_ids = [ids] pass elif customer_type == 2: # 集团 brands = [] brand_ids = customer[5] if str(brand_ids).find(',') != -1: brands = [x for x in str(brand_ids).split(',')] else: brands = [brand_ids] for id in brands: house_ids.extend(self.get_house_ids_by_brand_id(id)) brand_id_list = brands result_data_1 = [] result_data_2 = [] result_data_3 = [] result_data_4 = [] result_data_5 = [] result_data_6 = [] result_data_7 = [] result_data_8 = [] # 1 数据总览 12个统计指标 data_overview = self.data_overview(time_range, house_ids, xcx_top_data, brand_top_data) result_data_1.extend(data_overview) # 4:单个项目小程序数据排行榜 for index, x in enumerate(xcx_top_data): if x[0] in house_ids: result_data_4.append([index, x[1], x[2], x[3], x[4], x[5]]) # 5: 集团项目数据排行榜 if customer_type == 2: for index, x2 in enumerate(brand_top_data): if x2[1] in house_ids or x2[0] in brand_id_list: result_data_5.append([index, x2[2], x2[3], x2[4], x2[5], x2[5], x2[6]]) pass pass else: for index, x1 in enumerate(brand_top_data): if x1[1] in house_ids: result_data_5.append([index, x1[2], x1[3], x1[4], x1[5], x1[5], x1[6]]) # 2: 项目数据排行榜 house_with_brand_data = self.house_with_brand(xcx_top_data, brand_top_data) for index, x in enumerate(house_with_brand_data): if x[1] in house_ids or x[0] in brand_id_list: obj = [index] obj.extend(x) result_data_2.append(obj) # 3: 项目历史累计总数 if customer_type == 2: all_data_history = self.house_with_brand(xcx_top_data_all, brand_top_data_all, brand_id_list) for index, x in enumerate(all_data_history): if x[1] in house_ids or x[0] in brand_id_list: obj = [index] obj.extend(x) result_data_3.append(obj) else: all_data_history = self.house_with_brand(xcx_top_data_all, brand_top_data_all) for index, x in enumerate(all_data_history): if x[0] in house_ids: obj = [index] obj.extend(x) result_data_3.append(obj) result[1] = result_data_1 result[2] = result_data_2 result[3] = result_data_3 result[4] = result_data_4 result[5] = result_data_5 result[6] = result_data_6 break # 7: 单个项目小程序获客来源场景分析 # 8: 集团项目获客来源场景分析 # 6: 项目获客来源场景分析 return result def data_overview(self, time_range, house_ids, xcx_top_data, brand_top_data): """ 统计数据总览 :param time_range: :param house_ids: :return: """ result = [] # 1:总浏览量 data_1_1 = self.db.select(self.sql_1_1, [time_range[0], time_range[1], house_ids]) number_1_1 = data_1_1[0][0] data_1_2 = self.db.select(self.sql_1_2, [time_range[0], time_range[1], house_ids]) number_1_2 = data_1_2[0][0] number_1 = number_1_1 + number_1_2 result.append(number_1) # 2: 总浏览人数 data_2 = self.db.select(self.sql_1_3, [time_range[0], time_range[1], house_ids, time_range[0], time_range[1]]) number_2 = data_2[0][0] result.append(number_2) # 3:新增获客 data_3 = self.db.select(self.sql_1_4, [time_range[0], time_range[1], house_ids, time_range[0], time_range[1]]) number_3 = data_3[0][0] result.append(number_3) # 4:新增获电 data_4 = self.db.select(self.sql_1_5, [time_range[0], time_range[1], house_ids, house_ids, time_range[0], time_range[1]]) number_4 = data_4[0][0] result.append(number_4) xcx_top_data_part = [x for x in xcx_top_data if x[0] in house_ids] brand_top_data_part = [x for x in brand_top_data if x[0] in house_ids] # 5 6 7 8 number_5 = 0 number_6 = 0 number_7 = 0 number_8 = 0 for x in brand_top_data_part: number_5 += x[3] number_6 += x[4] number_7 += x[5] number_8 += x[6] result.append(number_5) result.append(number_6) result.append(number_7) result.append(number_8) # 9 10 11 12 number_9 = 0 number_10 = 0 number_11 = 0 number_12 = 0 for x in xcx_top_data_part: number_9 += x[2] number_10 += x[3] number_11 += x[4] number_12 += x[5] result.append(number_9) result.append(number_10) result.append(number_11) result.append(number_12) return result def house_with_brand(self, xcx_top_data, brand_top_data, brands=None): """ 项目数据和集团数据的求和 :param brands: :param xcx_top_data: :param brand_top_data: :return: """ result = [] house_ids = [] if brands: house_ids.extend(house_ids) for x in xcx_top_data: house_ids.append(x[0]) for x in brand_top_data: if x[1] not in house_ids: house_ids.append(x[1]) for house_id in house_ids: a = [] for index, x in enumerate(xcx_top_data): if house_id == x[0]: a.extend(x) b = [] for index, y in enumerate(brand_top_data): if house_id == y[1] or house_id == y[0]: b.extend(y) if len(a) == 1 and len(b) == 1: result.append([b[0], a[0], a[1], a[2] + b[3], a[3] + b[4], a[4] + b[5], a[5] + b[6]]) elif len(a) == 1 and len(b) == 0: _a = [1] for x in a: _a.append(x) result.append(_a) elif len(a) == 0 and len(b) == 1: result.append(b) else: pass result.sort(key=lambda obj: obj[1]) result.reverse() return result def xcx_top(self, time_range): """ 获取 1.默认值/001_大麦/项目排行榜/小程序排行榜TOP_N :return: """ params = [] params.extend(time_range) params.extend(time_range) params.extend(time_range) params.extend(time_range) xcx_top_data = self.db.select(self.sql_2_1, params) result = [] for x in xcx_top_data: result.append([n for n in x]) # xcx_top_data的结果结构 # 0 a.house_id, 项目id # 1 a.house_name, 项目名称 # 2 SUM(a.pv), 浏览总量 # 3 SUM(a.uv), 浏览人数 # 4 SUM(a.new_cust_num), 新增获客 # 5 SUM(a.wx_num) 授权手机号 result.sort(key=lambda obj: obj[2]) result.reverse() return result def brand_top(self, time_range): """ 2.默认值/006_大麦(集团)/集团项目排行榜v1.3/集团排行榜 :param task_key: :return: """ params = [time_range] brand_top_data = self.db.select(self.sql_2_2, params) result = [] for x in brand_top_data: result.append([n for n in x]) # brand_top_data结果的结构 # 0 a.brand_id, 集团id # 1 a.house1_id, 项目id # 2 a.house_name, 项目名称 # 3 SUM(a.pv), 浏览总量 # 4 SUM(a.uv), 浏览人数 # 5 SUM(a.new_cust), 新增获客 # 6 SUM(a.shouquan_cust) 授权手机号 result.sort(key=lambda obj: obj[3]) result.reverse() return result """ 数据分享类别 1:长按识别二维码 2:会话 3:公众号菜单 4:公众号文章 5:小程序历史列表 6:扫一扫二维码 7:搜索 8:相册选取二维码 9:其他小程序 10:其他 """ share_way = { "长按识别二维码": 1, "会话": 2, "公众号菜单": 3, '公众号文章': 4, '小程序历史列表': 5, '扫一扫二维码': 6, '搜索': 7, '相册选取二维码': 8, '其他小程序': 9, '': 10 } def customer_channel_details(self, time_range): """ 1.默认值/001_大麦/场景_用户来源渠道/用户来源渠道—明细 :param task_key: :return: """ params = [] params.extend(time_range) params.extend(time_range) customer_channel_details_data = self.db.select(self.sql_3_1, params) # customer_channel_details_data数据结构 # house_id, 项目id # house_name, 项目名称 # label_wx, 分享类别 # COUNT(a.id) as counts, 数量 return customer_channel_details_data def brand_customer_channel_details(self, time_range): """ 2.默认值/006_大麦(集团)/场景(集团)_用户来源渠道_v1.1/用户来源渠道—明细 :param frequency: :return: """ params = [time_range[0], time_range[1]] brand_customer_channel_details_data = self.db.select(self.sql_3_2, params) # brand_customer_channel_details_data数据结构 # brand_id, 集团id # x.brand_name, 集团名称 # house_id, 项目id # house_name, 项目名称 # label_wx, 分享类别 # COUNT(1) 数量 return brand_customer_channel_details_data def push_log_recording(self, push_message): """ 报表推送日志记录 :param push_message: :return: """ self.db.add_some(self.sql_6, push_message) def get_house_ids_by_brand_id(self, brand_id): return self.db.select(self.sql_5, [brand_id]) def get_brand_info_by_house_id(self, house_id): """ 根据项目id或者相应的集团信息 :param house_id: :return: """ brand_info = self.db.select(self.sql_7, [house_id]) if len(brand_info) == 1: return brand_info[0][0] return def get_time_range(self, task_key): """ 根据定时任务id获取时间区间 时间格式 yyyy-mm-dd :param task_key:1: 日报,2:周报, 3:all :return: """ now_time = datetime.datetime.now() pre_time = None if task_key in (2, 3): # 上周,上周一到上周天 pre_time = now_time + datetime.timedelta(days=-7) pass elif task_key in (1): # 昨天 pre_time = now_time + datetime.timedelta(days=-1) pass elif task_key in (4): # 不限时间 pre_time = now_time + datetime.timedelta(days=-2999) return [pre_time.strftime('%Y-%m-%d'), now_time.strftime('%Y-%m-%d')] if __name__ == '__main__': rp = ReportPush('linshi') print(rp.get_time_range(3)) sql = "select COUNT(1) from t_house_image where id in %s and status = %s" print(rp.db.select(sql, [[46, 47, 48], -1])) list_data = [2, 1, 4] for x in list_data: print(x)