123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- from flask import Flask, request
- import json
- from apscheduler_elab import Config
- from flask_apscheduler import APScheduler
- import decimal
- from report_push import ReportPush
- from jianye_report import JianYeReport
- app = Flask(__name__)
- app.config.from_object(Config())
- @app.route('/report_test', methods=['GET', 'POST'])
- def report_test():
- global result
- try:
- task_id = request.args.get('id', default=0, type=int)
- report_push = ReportPush('bi_report')
- result = report_push.report_push(task_id)
- except Exception as e:
- print(str(e))
- result['error'] = str(e)
- finally:
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/report_jianye', methods=['GET', 'POST'])
- def report_jianye():
- report_jianye = JianYeReport()
- task_id = request.args.get('id', default=0, type=int)
- result = report_jianye.send_mail_to_customer(task_id)
- report_jianye.db.close()
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/debug_test', methods=['GET', 'POST'])
- def debug_func():
- result = {}
- rj = JianYeReport()
- try:
- # data = rj.brand_data()
- house_ids = rj.get_house_id_by_brand_id('13')
- # result['houseids'] = house_ids
- # result['data1'] = data
- # content = rj.get_brand_content(MailContentText.text_1, data)
- # result['content'] = 'success'
- table_2 = rj.house_data(house_ids)
- result['data2'] = table_2
- except Exception as e:
- result['error'] = str(e)
- finally:
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- @app.route('/send_mail_to_customer', methods=['GET', 'POST'])
- def send_mail_to_customer():
- result = {}
- rj = JianYeReport()
- try:
- customer_id = request.args.get('id', default=0, type=int)
- mail = request.args.get('mail', default=None, type=str)
- data = rj.send_mail_for_customer_id(customer_id, mail)
- result['data'] = data
- pass
- except Exception as e:
- result['error'] = str(e)
- return json.dumps(result, ensure_ascii=False, cls=DecimalEncoder)
- class DecimalEncoder(json.JSONEncoder):
- def default(self, o):
- if isinstance(o, decimal.Decimal):
- return float(o)
- super(DecimalEncoder, self).default(o)
- if __name__ == '__main__':
- scheduler = APScheduler()
- scheduler.init_app(app)
- scheduler.start()
- app.run(
- host='0.0.0.0',
- port=5001
- )
|