|
- from flask import Flask, request
- from mvp import Mvp
- import json
- from test_info import TestInfo
- from tongce import TongCe
- from apscheduler_elab import Config
- from flask_apscheduler import APScheduler
- from email_util import EmailUtil
- import decimal
- from report_push import ReportPush
- from jianye_report import JianYeReport
- from mail_content_text import MailContentText
- app = Flask(__name__)
- app.config.from_object(Config())
- @app.route('/score', methods=['GET', 'POST'])
- def score():
- """
- 父选项对应的标准化值
- :return:
- """
- city = request.args.get('city', default=None, type=str)
- age = request.args.get('age', default=None, type=str)
- crowd = request.args.get('crowd', default=None, type=str)
- print(city, age, crowd)
- mvp = Mvp()
- scores = mvp.query_behavioral_info(city, age, crowd)
- mvp.close()
- return json.dumps(scores, ensure_ascii=False)
- @app.route('/scores', methods=['GET', 'POST'])
- def scores():
- mvp = Mvp()
- data = mvp.scores()
- return json.dumps(data, ensure_ascii=False)
- @app.route('/infos', methods=["GET", 'POST'])
- def get_city_age_crowd():
- """
- 测试数据中城市 年龄 人群分类信息
- :return:
- """
- mvp = Mvp()
- infos = {'城市': mvp.citys, '年龄段': mvp.age, '人群分类': mvp.crowd}
- mvp.close()
- return json.dumps(infos, ensure_ascii=False)
- @app.route('/crowd_people', methods=['GET', 'POST'])
- def crowd_people():
- """
- 人群分类人数统计
- :return:
- """
- mvp = Mvp()
- people_count = mvp.get_crowd_people()
- mvp.close()
- return json.dumps(people_count, ensure_ascii=False)
- @app.route('/set_behavior_tag', methods=['GET', 'POST'])
- def set_behavior_tag():
- """
- 模块标准化值
- :return:
- """
- mvp = Mvp()
- mvp.close()
- return json.dumps(mvp.module_scores, ensure_ascii=False)
- @app.route('/insert_into', methods=['GET', 'POST'])
- def insert_info():
- mvp = Mvp()
- mvp.insert()
- query_data = mvp.query_data()
- mvp.close()
- return json.dumps(query_data, ensure_ascii=False)
- @app.route('/shanghai_85', methods=['GET', 'POST'])
- def shanghai_85():
- mvp = Mvp()
- data = mvp.shanghai_85_module_score_insert()
- mvp.close()
- return json.dumps(data, ensure_ascii=False)
- @app.route('/tag_tree', methods=['GET', 'POST'])
- def tag_tree():
- mvp = Mvp()
- tags = mvp.tag_data
- mvp.close()
- return json.dumps(tags, ensure_ascii=False)
- @app.route('/update_data', methods=['GET', 'POST'])
- def update_data():
- message = None
- global mvp
- try:
- mvp = Mvp()
- message = mvp.update_data()
- except Exception as e:
- message['error'] = str(e)
- return json.dumps(message, ensure_ascii=False)
- finally:
- mvp.close()
- return json.dumps(message, ensure_ascii=False)
- @app.route('/people', methods=['GET', 'POST'])
- def people():
- mvp = Mvp()
- mvp.close()
- return json.dumps(mvp.people_data(), ensure_ascii=False)
- @app.route('/update_gender')
- def update_gender_rate():
- mvp = Mvp()
- try:
- mvp.update_gender_rate(ids=1)
- mvp.close()
- except Exception as e:
- mvp.close()
- return str(e)
- return '人群性别比列更新完成...'
- @app.route('/test_api', methods=['GET', 'POST'])
- def test_api():
- return '成功'
- @app.route('/testcase_info', methods=['GET', 'POST'])
- def testcase_info():
- testcase_id = request.args.get('id', default=0, type=int)
- ti = TestInfo()
- result = ti.test_detail_info(testcase_id)
- return json.dumps(result, ensure_ascii=False)
- @app.route('/get_uuids', methods=['GET', 'POST'])
- def get_uuids():
- city = request.args.get('city', default=None, type=str)
- age = request.args.get('age', default=None, type=str)
- crowd = request.args.get('crowd', default=None, type=str)
- mvp = Mvp()
- uuids = mvp.people_filter(city, age, crowd)
- mvp.close()
- return json.dumps(uuids, ensure_ascii=False)
- @app.route('/tongce', methods=['GET', 'POST'])
- def tongce():
- response = {}
- try:
- tongce = TongCe()
- result = tongce.tongce()
- response['code'] = 0
- response['message'] = '成功'
- response['data'] = result
- except Exception as e:
- response['code'] = 1
- response['message'] = '失败:' + str(e)
- return json.dumps(response, ensure_ascii=False)
- return json.dumps(response, ensure_ascii=False)
- @app.route('/tongce_data', methods=['GET', 'POST'])
- def tongce_data():
- response = {}
- tongce = TongCe()
- try:
- result = tongce.lingdi_data_scores()
- response['code'] = 0
- response['message'] = '成功'
- response['data'] = result
- except Exception as e:
- response['code'] = 1
- response['message'] = '失败:' + str(e)
- return json.dumps(response, ensure_ascii=False)
- finally:
- tongce.close()
- return json.dumps(response, ensure_ascii=False)
- @app.route('/update_rule', methods=['GET', 'POST'])
- def tongce_update_rule():
- try:
- tongce = TongCe()
- tongce.table_type_insert()
- except Exception as e:
- return str(e)
- return '更新成功!! 1'
- @app.route('/send_mail', methods=['GET', 'POST'])
- def send_mail():
- mail = EmailUtil()
- mail.send_test()
- return '<h1>邮件发送成功</h1>'
- @app.route('/update_other_city', methods=['GET', 'POST'])
- def update_other_city():
- response = {}
- try:
- tongce = TongCe()
- result = tongce.other_city_clean()
- response['code'] = 0
- response['message'] = '成功'
- response['data'] = result
- except Exception as e:
- response['code'] = 1
- response['message'] = '失败:' + str(e)
- return json.dumps(response, ensure_ascii=False)
- return json.dumps(response, ensure_ascii=False)
- @app.route('/report_test', methods=['GET', 'POST'])
- def report_test():
- global result
- try:
- task_id = request.args.get('id', default=0, type=int)
- report_push = ReportPush('bi_report')
- result = report_push.report_push_test(task_id)
- except Exception as e:
- print(str(e))
- result['error'] = str(e)
- finally:
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/report_jianye', methods=['GET', 'POST'])
- def report_jianye():
- report_jianye = JianYeReport()
- task_id = request.args.get('id', default=0, type=int)
- result = report_jianye.send_mail_to_customer(task_id)
- report_jianye.db.close()
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/debug_test', methods=['GET', 'POST'])
- def debug_func():
- result = {}
- rj = JianYeReport()
- try:
- data = rj.brand_data()
- house_ids = rj.get_house_id_by_brand_id('13')
- result['houseids'] = house_ids
- result['data1'] = data
- content = rj.get_brand_content(MailContentText.text_1, data)
- result['content'] = 'success'
- table_2 = rj.house_data(house_ids)
- result['data2'] = table_2
- except Exception as e:
- result['error'] = str(e)
- finally:
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/send_mail_to_customer', methods=['GET', 'POT'])
- def send_mail_to_customer():
- result = {}
- rj = JianYeReport()
- try:
- customer_id = request.args.get('id', default=0, type=int)
- mail = request.args.get('mail', default=0, type=int)
- data = rj.send_mail_for_customer_id(customer_id, mail)
- result['data'] = data
- pass
- except Exception as e:
- result['error'] = str(e)
- finally:
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- class DecimalEncoder(json.JSONEncoder):
- def default(self, o):
- if isinstance(o, decimal.Decimal):
- return float(o)
- super(DecimalEncoder, self).default(o)
- if __name__ == '__main__':
- scheduler = APScheduler()
- scheduler.init_app(app)
- scheduler.start()
- app.run(
- host='0.0.0.0',
- port=5001
- )
|