from excel_util import ExcelUtil from mysql_db import MysqlDB from itertools import groupby class TongCe: """ 同策测试数据清洗 """ # 统计筒体结果 sql_1 = ''' SELECT a.sub_question_id, b.sub_question_content, a.score, b.sub_option_content, count(1) FROM f_t_daren_score_2 a LEFT JOIN d_shangju_tiku_02 b ON a. STATUS = b. STATUS = 1 WHERE a.testcase_id in %s and a.testcase_id = b.testcase_id AND a.sub_question_id = b.sub_question_id AND ( a.score = b.score OR a.score = b.sub_option_id ) GROUP BY b.sub_question_content, a.score, b.sub_option_content ''' # 选项信息 sql_2 = ''' SELECT b.id as question_id, b. NAME as question_title, a.id as sub_question_id, a. NAME as sub_question_title, d.id as option_id, d.content as option_title, c.id as sub_option_id, c.content as sub_option_title FROM bq_sub_question a LEFT JOIN bq_question b ON a.father_id = b.id LEFT JOIN bq_sub_option c ON a.id = c.sub_question_id LEFT JOIN bq_option d ON c.father_id = d.id WHERE FIND_IN_SET( a.id, ( SELECT GROUP_CONCAT(question_ids) FROM bq_testcase WHERE house_ids = %s GROUP BY house_ids ) ) AND a. STATUS = b. STATUS = c. STATUS = 1 ORDER BY a.id ''' # 表 sql_3 = ''' INSERT INTO mvp_page_display_match ( house_id, question_id, question_title, sub_question_id, sub_question_title, option_id, option_content, sub_option_id, sub_option_content, data_item_tab, data_item_title, data_item_name, STATUS, creator, created ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, 1, 'binren', now() ) ''' sql_4 = ''' SELECT id, sub_question_id, sub_option_id FROM mvp_page_display_match WHERE STATUS = 1 ''' sql_5 = ''' SELECT id FROM bq_testcase WHERE STATUS = 1 AND FIND_IN_SET( ( SELECT id FROM bq_house WHERE STATUS = 1 AND NAME = %s ), house_ids ) ''' sql_6 = ''' insert INTO mvp_page_display_data ( match_id, value, STATUS, creator, created ) VALUES (%s, %s, 1, 'binren', now()) ''' def __init__(self): self.shangju_db = MysqlDB('shangju') self.marketing_db = MysqlDB('bi_report') self.linshi_db = MysqlDB('linshi', db_type=1) self.options_info = ExcelUtil('工作表6', 'tongce.xlsx').read_options_info() def get_question_info_from_db(self): result = self.shangju_db.select(self.sql_2, [67]) insert_data = [] for rt in result: rt = list(rt) option_configuration = self.options_info.get('67' + str(rt[6])) if option_configuration and len(option_configuration) == 3: rt.insert(0, 67) rt.extend(option_configuration) insert_data.append(rt) return insert_data def get_option_match_info(self): result = self.shangju_db.select(self.sql_4) return result def get_testcase_ids_by_house_name(self, house_name): testcase_ids = self.shangju_db.select(self.sql_5, [house_name]) return testcase_ids def scores(self): testcase_ids = self.get_testcase_ids_by_house_name('同策 领地') db_data = self.marketing_db.select(self.sql_1, [testcase_ids]) answer = [] for data in db_data: answer.append([data[0], data[2], data[4]]) answer.sort(key=lambda obj: obj[0]) sub_option_score = [] for sub_question_id, others in groupby(answer, key=lambda obj: obj[0]): others_data = [] for ot in others: others_data.append([x for x in ot]) sub_question_count = sum([x[2] for x in others_data]) for td in others_data: sub_option_id = td[1] sub_option_count = td[2] rate = int(sub_option_count) / sub_question_count sub_option_score.append([sub_question_id, sub_option_id, rate]) return sub_option_score def tongce(self): """ tongce测试数据清洗 :return: """ match_data = self.get_question_info_from_db() self.linshi_db.add_some(self.sql_3, match_data) scores = self.scores() match_data_info = self.get_option_match_info() dispaly_data = [] for score in scores: sub_question_id = score[0] sub_option_id = score[1] value = score[2] for mi in match_data_info: if str(mi[1]) == str(sub_question_id) and str(mi[2]) == str(sub_option_id): dispaly_data.append([mi[0], value]) if len(dispaly_data) > 0: self.linshi_db.add_some(self.sql_6, [dispaly_data]) return {'插入数据条数': len(dispaly_data)} if __name__ == '__main__': pass