import pandas as pd import openpyxl as ox from itertools import groupby import os import tablib class ExcelUtil: # 当前项目路径 dir_path = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + r'/elab_mvp/resources' """ 解析excel文件 """ def __init__(self, sheet_name=None, file_name=None): if file_name: self.path = os.path.join(self.dir_path, file_name) else: self.path = os.path.join(self.dir_path, 'mvp.xlsx') if sheet_name: self.sheet_name = sheet_name else: self.sheet_name = '测试数据' def read_excel_by_pd(self): df = pd.read_excel(self.path) data = df.head() print('获取到的数据{}'.format(data)) def read_excel_by_ox(self): work_book = ox.load_workbook(self.path, data_only=True) work_sheet = work_book.get_sheet_by_name(self.sheet_name) # print('max_row:{}, max_col:{}'.format(work_sheet.max_row, work_sheet.max_column)) return work_sheet def read_excel_by_ox_name(self, sheet_name): work_book = ox.load_workbook(self.path, data_only=True) work_sheet = work_book.get_sheet_by_name(sheet_name) # print('max_row:{}, max_col:{}'.format(work_sheet.max_row, work_sheet.max_column)) return work_sheet def init_crowd_info(self): """ 整理不同人群包含的父选序号 :return: """ rows = [row for row in self.read_excel_by_ox().rows] crowd_a = [] crowd_b = [] crowd_c = [] crowd_d = [] crowd_e = [] crowd_f = [] for row in rows[2:]: option = row[4].value a = row[6].value if a is not None and a == 1 and option not in crowd_a: crowd_a.append(option) b = row[7].value if b is not None and b == 1 and option not in crowd_b: crowd_b.append(option) c = row[8].value if c is not None and c == 1 and option not in crowd_d: crowd_c.append(option) d = row[9].value if d is not None and d == 1 and option not in crowd_d: crowd_d.append(option) e = row[10].value if e is not None and e == 1 and option not in crowd_e: crowd_e.append(option) f = row[11].value if f is not None and f == 1 and option not in crowd_f: crowd_f.append(option) return {'A': crowd_a, 'B': crowd_b, 'C': crowd_c, 'D': crowd_d, 'E': crowd_e, 'F': crowd_f} def init_out_way(self): result = {} work_sheet = self.read_excel_by_ox_name('用户画像-出行方式') rows = work_sheet.rows for row in rows: key = row[3].value + row[4].value + '市' + row[6].value + row[7].value result[key] = float(row[9].value) return result def init_mvp_data(self): """ 获取每个标签包括的父题父选项编号 :return: """ no_need_module = ['空间需求图谱-单品偏好', '空间需求图谱-精装关注点', '空间需求图谱-空间特性偏好', '空间需求-材质', '空间需求-色调', '空间需求-色相'] rows = [row for row in self.read_excel_by_ox().rows][36:] tag_name = None tag_type = None datas = [] for row in rows: tag_type_1 = row[0].value tag = row[1].value values = row[3].value corr = row[4].value if tag_type_1: tag_type = tag_type_1 if tag: tag_name = tag if values is not None and values != '找不到': datas.append([tag_type, tag_name, values, corr]) result = {} datas.sort(key=lambda obj: obj[0]) for tag_type, sub_datas in groupby(datas, key=lambda obj: obj[0]): if tag_type not in no_need_module: sub_list = [x for x in sub_datas] sub_list.sort(key=lambda obj: obj[1]) sub_result = {} for name, items in groupby(sub_list, key=lambda obj: obj[1]): orders = [] for n in items: orders.append([n[2], n[3]]) sub_result[name] = orders result[tag_type] = sub_result return result def init_scores(self): work_sheet = self.read_excel_by_ox() rows = [row for row in work_sheet.rows] datas = [] for row in rows[1:]: if row[0].value is not None: datas.append([row[0].value, row[1].value, row[2].value, row[3].value, row[4].value]) return datas def init_module_info(self): work_sheet = self.read_excel_by_ox() max_column = work_sheet.max_column rows = [row for row in work_sheet.rows][3:] crowd_name = None datas = [] for row in rows: crowd = row[1].value if crowd is not None: crowd_name = crowd behavior = row[2].value score = row[4].value for index in range(6, max_column - 1, 2): module_name = row[index].value if module_name is not None: weight = row[index + 1].value datas.append([crowd_name, behavior, score, module_name, weight]) results = {} datas.sort(key=lambda obj: obj[0]) for name, items in groupby(datas, key=lambda obj: obj[0]): sub_results = {} sub_list = [] for it in items: sub_list.append([x for x in it]) sub_list.sort(key=lambda obj: obj[3]) for name_1, itmes_1 in groupby(sub_list, key=lambda obj: obj[3]): sub_data = [] for n in itmes_1: # print(' {}'.format(n[1])) sub_data.append([n[1], n[2], n[4]]) sub_results[name_1] = sub_data results[name] = sub_results return results def module_behavior_info(self): """ 构建模块和行为的关联信息 :return: """ work_sheet = self.read_excel_by_ox_name('行为-模块映射表') max_column = work_sheet.max_column rows = [row for row in work_sheet.rows][1:] infos = [] for row in rows: behavior_name = row[1].value for i in range(2, max_column - 1): module_name = row[i].value if module_name: if i == 2: weight = 1 else: weight = 0.5 infos.append([row[i].value, behavior_name, weight]) infos.sort(key=lambda obj: obj[0]) result = {} for key, data in groupby(infos, key=lambda obj: obj[0]): behavior_data = [] for dt in data: dt_list = [x for x in dt] if len(behavior_data) <= 14: behavior_data.append([dt_list[1], dt_list[2]]) result[key] = behavior_data return result def read_options_info(self): """ 获取选项的配置信息 :return: """ work_sheet = self.read_excel_by_ox() rows = [row for row in work_sheet.rows][1:] info = {} for row in rows: key = str(int(row[1].value)) + str(int(row[9].value)) # tag, title, name info[key] = [row[15].value, row[13].value, row[14].value, row[6].value] return info def create_excle(self, file_name, header, data): data_set = tablib.Dataset(data, header=header) save_path = os.path.join(self.dir_path, file_name) with open(save_path, 'wb', encoding='utf8') as f: f.write(data_set.xlsx) def wenjuanxin_84(self): work_sheet = self.read_excel_by_ox() question_work_sheet = self.read_excel_by_ox_name('Sheet2') rows = [row for row in work_sheet.rows][1:] question_rows = [row for row in question_work_sheet.rows][1:] question_data = [] for qr in question_rows: question_data.append([str(qr[0].value), str(qr[1].value), str(qr[2].value)]) question_dict = {} question_data.sort(key=lambda obj: obj[0]) for key, data in groupby(question_data, key=lambda obj: obj[0]): data_list = [] for dt in data: data_list.append(dt[1:]) question_dict[str(key)] = data_list # print(json.dumps(question_dict, indent=4, ensure_ascii=False)) def get_sub_option_id(sub_question_id, sub_option_content): sub_option_contents = question_dict.get(str(sub_question_id)) if sub_option_contents: for sc in sub_option_contents: if sc[1] == sub_option_content: return sc[0] else: # print(sub_question_id, sub_option_content) pass # uuid,score(sub_option_id),created,sub_question_id insert_data = [] for row in rows: uuid = row[0].value + '1000' date = row[1].value question_1 = str(row[6].value).split('.')[1] id_1 = get_sub_option_id(20, question_1) if id_1: insert_data.append([uuid, id_1, date, 20]) question_2 = str(row[7].value).split('.')[1] id_2 = get_sub_option_id(29, question_2) if id_2: insert_data.append([uuid, id_2, date, 29]) question_3 = str(row[8].value).split('┋') for q3 in question_3: content = str(q3.split('.')[1]) id_3 = get_sub_option_id(370, content) if id_3: insert_data.append([uuid, id_3, date, 370]) question_4 = str(row[9].value).split('┋') for q4 in question_4: content = q4.split('.')[1] id_4 = get_sub_option_id(371, content) if id_4: insert_data.append([uuid, id_4, date, 371]) question_5 = str(row[10].value).split('┋') for q5 in question_5: content = q5.split('、')[1] id_5 = get_sub_option_id(372, content) if id_5: insert_data.append([uuid, id_5, date, 372]) question_6 = str(row[11].value).split('┋') for q6 in question_6: if q6.find('E') == 0: content = q6.replace('E', '') else: content = q6.split('.')[1] id_6 = get_sub_option_id(379, content) if id_6: insert_data.append([uuid, id_6, date, 379]) question_7 = str(row[12].value).split('┋') for q7 in question_7: content = q7.split('.')[1] id_7 = get_sub_option_id(373, content) if id_7: insert_data.append([uuid, id_7, date, 373]) question_8 = str(row[13]).split('┋') for q8 in question_8: content = q8.split('.')[1] id_8 = get_sub_option_id(374, content) if id_8: insert_data.append([uuid, id_8, date, 374]) question_9 = str(row[14].value).split('┋') for q9 in question_9: content = q9.split('.')[1] id_9 = get_sub_option_id(375, content) if id_9: insert_data.append([uuid, id_9, date, 375]) question_10 = str(row[15].value).split('.') id_10 = get_sub_option_id(376, question_10) if id_10: insert_data.append([uuid, id_10, date, 376]) question_11 = str(row[16].value).split('、') id_11 = get_sub_option_id(379, question_11) if id_11: insert_data.append([uuid, id_11, date, 379]) question_12 = str(row[17].value).split('┋') for q12 in question_12: content = q12.split('.') id_12 = get_sub_option_id(380, content) if id_12: insert_data.append([uuid, id_12, date, 380]) question_13 = str(row[18].value).split('┋') for q13 in question_13: content = q13.split('、') id_13 = get_sub_option_id(381, content) if id_13: insert_data.append([uuid, id_13, date, 381]) question_14 = str(row[19].value).split('、') id_14 = get_sub_option_id(395, question_14) if id_14: insert_data.append([uuid, id_14, date, 395]) city = str(row[20].value).split('-')[1] id_city = get_sub_option_id(377, city) if id_city: insert_data.append([uuid, id_city, date, 377]) return insert_data def get_table_type_info(self): work_sheet = self.read_excel_by_ox() rows = [row for row in work_sheet.rows][1:] result = [] for row in rows: question_name = row[3].value info = row[8].value infos = str(info).split('+') if len(infos) == 2: table_type = infos[1] table_size = infos[0].split('p')[0] result.append([table_type, table_size, question_name]) pass else: pass return result sql_jianye_1 = """insert into report_push_customer_info(name,house_or_region,customer_type,mail,status,creator,created) values(%s,%s,%s,%s,1,'binren', now())""" sql_jianye_2 = """ insert into report_customer_authority_info (customer_id, house_or_brand_id, status, creator, created) values (%s, %s, 1, 'binren', now()) """ sql_jianye_3 = """ insert into report_task_info(customer_id, task_key, status, creator, created) values (%s, %s, 1, 'binren', now()) """ def jinaye_report(self): mail_list_sheet = self.read_excel_by_ox_name('邮箱列表') mail_rows =[row for row in mail_list_sheet.rows][1:] customers = [] for row in mail_rows: mail = row[0].value name = row[1].value customer_type = row[4].value house_or_region = row[2].value house_or_brand_id = row[6].value # if house_or_brand_id: customers.append([name, house_or_region, customer_type, mail, house_or_brand_id]) houses = {} # region_list = self.read_excel_by_ox_name('区域映射关系') # region_rows = [row for row in region_list.rows][1:] # region_names = [] # for row in region_rows: # name = row[2].value # if name not in region_names: # region_names.append(name) # for name in region_names: # ids = [] # for row in region_rows: # if row[2].value == name: # ids.append(row[0].value) # houses[name] = ids return customers if __name__ == '__main__': from mysql_db import MysqlDB eu = ExcelUtil('邮箱列表', r'D:\elab\elab_mvp\resources\建业邮件推送数据收件信息汇总(1).xlsx') mysql = MysqlDB('linshi', 1) customers = eu.jinaye_report() result1 = mysql.select("select * from report_push_customer_info where created like '2020-04-17%' and customer_type != 3") # mysql.add_some(eu.sql_jianye_1, customers) insert = [] for r in result1: for c in customers: if r[1] == c[0] and r[3] == c[2]: insert.append([r[0], c[4]]) print(insert)