瀏覽代碼

历史物料批量打标

Signed-off-by: BinrenZhang <zhangbr@elab-plus.com>
BinrenZhang 4 年之前
父節點
當前提交
556aaf83ee
共有 16 個文件被更改,包括 11 次插入4744 次删除
  1. 0 6
      test/Crawler.py
  2. 0 27
      test/baidu.py
  3. 0 16
      test/chart_test.py
  4. 0 1
      test/crawler_test.py
  5. 0 1
      test/dictionary_data.py
  6. 0 28
      test/eureka_test.py
  7. 0 4102
      test/html.html
  8. 0 153
      test/linked_list.py
  9. 11 9
      test/material_label.py
  10. 0 31
      test/mongodb_test.py
  11. 0 208
      test/stock_test.py
  12. 0 40
      test/text.txt
  13. 0 109
      test/threading_test.py
  14. 0 13
      test/weixin_token_test.py
  15. 二進制
      test/测ss试.xls
  16. 二進制
      test/测ss试.xlsx

+ 0 - 6
test/Crawler.py

@@ -1,6 +0,0 @@
-from pprint import pprint
-from wechatsogou import WechatSogouAPI, WechatSogouConst
-ws_api = WechatSogouAPI()
-gzh_articles = ws_api.get_gzh_info('央视新闻',unlock_callback='unlock_callback')
-for i in gzh_articles:
-    pprint(i)

+ 0 - 27
test/baidu.py

@@ -1,27 +0,0 @@
-# encoding:utf-8
-import requests
-
-
-def assess_token():
-    # client_id 为官网获取的AK, client_secret 为官网获取的SK
-    host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=orZUHE5R2gTiPzzY1X4cRaqi&client_secret=48ObBtvIoYNWdFQ8RTZGR6DsQq9VLBpb'
-    response = requests.get(host)
-    if response:
-        return response.json()
-
-
-def similarity(assess_token):
-    url = 'https://aip.baidubce.com/rpc/2.0/nlp/v2/simnet?charset=UTF-8&access_token=' + assess_token
-    body = {
-        "text_1": "张斌仁",
-        "text_2": "张斌",
-        "model": "CNN"
-    }
-    response = requests.post(url, json=body)
-    if response:
-        print(response.json())
-
-
-if __name__ == '__main__':
-    # print(assess_token())
-    similarity('24.2b77398439546b3a983e9b93b84dbe14.2592000.1610182288.282335-23131081')

+ 0 - 16
test/chart_test.py

@@ -1,16 +0,0 @@
-from random import randrange
-from pyecharts import options as opts
-from pyecharts.charts import Bar
-
-
-class Chart(object):
-
-    def bar_base(self) -> []:
-        c = (
-            Bar()
-                .add_xaxis(["衬衫", "羊毛衫", "雪纺衫", "裤子", "高跟鞋", "袜子"])
-                .add_yaxis("商家A", [randrange(0, 101) for _ in range(6)])
-                .set_global_opts(title_opts=opts.TitleOpts(title="Bar-基本示例", subtitle="我是副标题"))
-        )
-
-        return c

+ 0 - 1
test/crawler_test.py

@@ -1 +0,0 @@
-import request

+ 0 - 1
test/dictionary_data.py

@@ -1 +0,0 @@
-import openpyxl as ox

+ 0 - 28
test/eureka_test.py

@@ -1,28 +0,0 @@
-from flask import Flask
-import py_eureka_client.eureka_client as eureka_client
-
-app = Flask(__name__)
-
-
-def setEureka():
-    server_host = "localhost"
-    server_port = 5000
-    eureka_client.init(eureka_server="http://localhost:5100/eureka/",
-                       app_name="flask_server",
-                       # 当前组件的主机名,可选参数,如果不填写会自动计算一个,如果服务和 eureka 服务器部署在同一台机器,请必须填写,否则会计算出 127.0.0.1
-                       instance_host=server_host,
-                       instance_port=server_port,
-                       # 调用其他服务时的高可用策略,可选,默认为随机
-                       ha_strategy=eureka_client.HA_STRATEGY_RANDOM)
-
-
-setEureka()
-
-
-@app.route('/')
-def hello_world():
-    return 'Hello World!'
-
-
-if __name__ == '__main__':
-    app.run(debug=True, threaded=True, port=5000, host="0.0.0.0")

文件差異過大導致無法顯示
+ 0 - 4102
test/html.html


+ 0 - 153
test/linked_list.py

@@ -1,153 +0,0 @@
-class Node(object):
-    def __init__(self, value):
-        self.value = value
-        self.next = None
-
-
-class LinkedListOneway(object):
-    def __init__(self, node=None):
-        self.__head = node
-
-    def get_head(self):
-        return self.__head
-
-    def __len__(self):
-        cur = self.__head
-        count = 0
-        while cur:
-            count += 1
-            cur = cur.next
-        return count
-
-    def is_empty(self):
-        return self.__head == None
-
-    def add(self, value):
-        """
-        头插法
-        先让新节点的next指向头节点
-        再将头节点替换为新节点
-        顺序不可错,要先保证原链表的链不断,否则头节点后面的链会丢失
-        """
-        node = Node(value)
-        node.next = self.__head
-        self.__head = node
-
-    def append(self, value):
-        """
-            尾插法
-        :param value:
-        :return:
-        """
-        node = Node(value)
-        cur = self.__head
-        if self.is_empty():
-            self.__head = node
-        else:
-            while cur.next:
-                cur = cur.next
-            cur.next = node
-
-    def insert(self, pos, value):
-        # 插入指定位置
-        if pos <= 0:
-            self.add(value)
-        elif pos > len(self) - 1:
-            self.append(value)
-        else:
-            node = Node(value)
-            prior = self.__head
-            count = 0
-            # 在插入位置的前一个节点停下
-            while count < (pos - 1):
-                prior = prior.next
-                count += 1
-            # 先将插入节点与节点后的节点连接,防止链表断掉,先链接后面的,再链接前面的
-            node.next = prior.next
-            prior.next = node
-
-    def remove_value(self, value):
-        """
-        删除值
-        :param value:
-        :return:
-        """
-        curr = self.__head
-        if curr is None:
-            raise Exception("链表为空,无法删除值")
-        i = 0
-        while curr:
-            if curr.value == value:
-                break
-            curr = curr.next
-            i += 1
-        prior = self.__head
-        j = 0
-        k = i
-        while prior:
-            if j == k - 1:
-                print(j)
-                break
-            prior = prior.next
-            j += 1
-        if i == 0:
-            self.__head = self.__head.next
-        elif i > self.__len__() - 1:
-            raise Exception("未查询到此数据")
-        elif 0 < i < self.__len__() - 1:
-            nextnext = prior.next
-            prior.next = nextnext.next
-        else:
-            pass
-        print("删除的值为:%s 位置为:%s" % (value, i))
-
-    def remove_index(self, index):
-        cur = self.__head
-        if cur is None:
-            raise Exception('链表为空,无法删除')
-        if index > self.__len__():
-            raise Exception('下标越界')
-        stu = self.__head
-        for i in range(index - 1):
-            stu = stu.nex
-        temp = stu.next
-        stu.next = temp.next
-
-    def search(self, value):
-        cur = self.__head
-        if cur is None:
-            raise Exception('链表为空')
-        while cur is not None:
-            if cur.value == value:
-                return True
-            else:
-                cur = cur.next
-        return False
-
-    def reverse(self):
-        cur = self.__head
-        pre = None
-        while cur:
-            prenext = cur.next
-            cur.next = pre
-            pre = cur
-            cur = prenext
-        self.__head = pre
-
-    def show(self):
-        cur = self.__head
-        while cur:
-            print(cur.value)
-            cur = cur.next
-
-
-if __name__ == '__main__':
-    head = LinkedListOneway()
-    head.append(1)
-    head.append(2)
-    head.append(3)
-    head.insert(2, 5)
-    head.remove_value(3)
-    # head.reverse()
-    head.show()
-

+ 11 - 9
test/material_label.py

@@ -41,21 +41,23 @@ if __name__ == '__main__':
     import time
     api_path_online = 'http://101.132.79.13:5308/materialLabel/markingForMaterial'
     api_path_uat3 = '"https://api-uat3.elaber.cn/elab-marketing-content/materialLabel/markingForMaterial"'
-    # 配置集团id
-    brand_id = 1
-    # 配置数据库连接信息
-    mysql = MysqlDB('marketing_db', 'o8rd4c8a62ma9mfj9vvq-rw4rm.rwlb.rds.aliyuncs.com', 'dmmanager', 'RSc3TLtmuprZgo1a')
-    # 打标接口地址,
-    url = api_path_online
+    api_path_test = 'http://101.132.138.87:5555/elab-marketing-content/materialLabel/markingForMaterial'
+    api_path_test2 = 'http://106.14.187.241:5555/elab-marketing-content//materialLabel/markingForMaterial'
+    # 1. 配置集团id
+    brand_id = 46
+    # 2. 配置数据库连接信息
+    mysql = MysqlDB('marketing_db', '106.15.201.221', 'kaifa', 'elab@123')
+    # 3. 打标接口地址
+    url = api_path_test2
     # 需要打标物料的id
     sql_dict = {
         1: 'select a.id from house a LEFT JOIN brand_house_rlat b on a.id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = {}'.format(
             brand_id),
         2: 'select a.id from content_moment a LEFT JOIN brand_house_rlat b on a.house_id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = {}'.format(
             brand_id),
-        3: 'select a.id from t_content_album a LEFT JOIN brand_house_rlat b on a.house_id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = 1'.format(
+        3: 'select a.id from t_content_album a LEFT JOIN brand_house_rlat b on a.house_id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = {}'.format(
             brand_id),
-        4: 'select a.id from content_layout a LEFT JOIN brand_house_rlat b on a.house_id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = 1'.format(
+        4: 'select a.id from content_layout a LEFT JOIN brand_house_rlat b on a.house_id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = {}'.format(
             brand_id)
     }
     material_dict = {
@@ -74,7 +76,7 @@ if __name__ == '__main__':
                 "materialId": x[0],
                 # 物料类型,1:项目,2:视频,3:图集(图片),4:户型
                 "materialType": index,
-                "user": "batch20201230"
+                "user": ":wq"
             }
             response = requests.post(url, json=params, headers=headers_1)
             print(material_dict.get(index), str(x[0]), '打标结果:', response.text)

+ 0 - 31
test/mongodb_test.py

@@ -1,31 +0,0 @@
-from pymongo import MongoClient
-
-
-class MongoDB(object):
-    conn_addr1 = 'dds-uf6da0fedc9881d41.mongodb.rds.aliyuncs.com:3717'
-    conn_addr2 = 'dds-uf6da0fedc9881d42.mongodb.rds.aliyuncs.com:3717'
-    replicat_set = 'mgset-12835903'
-    username = 'brand'
-    password = 'brand'
-
-    def __init__(self, col_name, db_name=None):
-
-        self.client = MongoClient([self.conn_addr1, self.conn_addr2], replicaSet=self.replicat_set)
-        self.client.diaoyanbao.authenticate(self.username, self.password)
-        if db_name:
-            self.mydb = self.client[db_name]
-        else:
-            self.mydb = self.client['diaoyanbao']
-        self.mycol = self.mydb[col_name]
-
-    def find(self, query):
-        data = self.mycol.find(query)
-        return [x for x in data]
-
-
-if __name__ == '__main__':
-    mongodb = MongoDB('answers')
-    query = {'testcaseId': 84}
-    datas = mongodb.find(query)
-    print(datas[0])
-

+ 0 - 208
test/stock_test.py

@@ -1,208 +0,0 @@
-import smtplib
-from email.mime.text import MIMEText
-from email.header import Header
-
-"""
-    回溯算法(试探法)
-    在搜索尝试过程中寻找问题的解,当发现已不满足求解条件时,就“回溯”返回,尝试别的路径。回溯法是一种选优搜索法,按选优条件向前搜索,以达到目标。但当探索到某一步时,发现原先选择并不优或达不到目标,就退回一步重新选择,这种走不通就退回再走的技术为回溯法,而满足回溯条件的某个状态的点称为“回溯点”。
-
-    回溯算法解决问题的
-    针对所给问题,定义问题的解空间,它至少包含问题的一个(最优)解。
-    确定易于搜索的解空间结构,使得能用回溯法方便地搜索整个解空间 。
-    以深度优先的方式搜索解空间,并且在搜索过程中用剪枝函数避免无效搜索。
-"""
-
-
-class StockTest(object):
-    """
-        栈练习
-    """
-
-    def queen(self, A, cur=0):
-        """
-            八皇后问题
-        :param A:
-        :param cur:
-        :return:
-        """
-        if cur == len(A):
-            print(A)
-            # return 0
-        else:
-            for col in range(len(A)):
-                A[cur], flag = col, True
-                # 判断当前与之前的所有是否规则冲突
-                for row in range(cur):
-                    if A[row] == col or abs(col - A[row]) == cur - row:
-                        flag = False
-                        break
-                if flag:
-                    self.queen(A, cur + 1)
-
-    def movingCount(self, threshold, rows, cols):
-        "产生 0 矩阵 "
-        board = [[0 for i in range(cols)] for j in range(rows)]
-        global acc
-        acc = 0
-        "下标之和,若大于threshold则TRUE,否则Folse"
-
-        def block(r, c):
-            s = sum(map(int, str(r) + str(c)))
-            return s > threshold
-
-        def traverse(r, c):
-            global acc
-            if not (0 <= r < rows and 0 <= c < cols):  # 超出角标范围挑出
-                return
-            if board[r][c] != 0:  # 不等于0 跳出
-                return
-            if board[r][c] == -1 or block(r, c):
-                board[r][c] = -1  # 超出门限的点记录-1
-                return
-
-            board[r][c] = 1  # 符合规定的点记录1,并计数加一
-            acc += 1
-            traverse(r + 1, c)
-            traverse(r - 1, c)
-            traverse(r, c + 1)
-            traverse(r, c - 1)
-
-        traverse(0, 0)
-        return acc
-
-    def bubble_sort(self, numbers):
-        """
-            时间复杂度:O(n^2)
-        :param numbers:
-        :return:
-        """
-        n = len(numbers)
-        for i in range(n):
-            for j in range(0, n - i - 1):
-                if numbers[j] > numbers[j + 1]:
-                    numbers[j], numbers[j + 1] = numbers[j + 1], numbers[j]
-
-        return numbers
-
-    def choose_sort(self, numbers):
-        """
-            选择排序,时间复杂度O(n^2)
-        :param numbers:
-        :return:
-        """
-        n = len(numbers)
-        for i in range(n):
-            min_index = i
-            min_value = numbers[i]
-            for j in range(i + 1, n):
-                if min_value > numbers[j]:
-                    min_value = numbers[j]
-                    min_index = j
-            if min_index != i:
-                numbers[min_index] = numbers[i]
-                numbers[i] = min_value
-
-        return numbers
-
-    def insert_sort(self, numbers):
-        """
-            插入排序,时间复杂度O(n^2)
-        :return:
-        """
-        n = len(numbers)
-        for i in range(1, n):
-            insert_index = i - 1
-            insert_value = numbers[i]
-            while insert_index >= 0 and insert_value < numbers[insert_index]:
-                numbers[insert_index + 1] = numbers[insert_index]
-                insert_index -= 1
-            if insert_index + 1 != i:
-                numbers[insert_index + 1] = insert_value
-        return numbers
-
-    def shell_sort(self, numbers):
-        """
-            希尔排序,时间复杂度O(n^2)
-        :param numbers:
-        :return:
-        """
-        n = len(numbers)
-        gap = int(n / 2)
-        while gap > 0:
-            for i in range(gap, n):
-                temp = numbers[i]
-                j = i
-                while j >= gap and numbers[j - gap] > temp:
-                    numbers[j] = numbers[j - gap]
-                    j -= gap
-                numbers[i] = temp
-            gap = int(gap / 2)
-        return numbers
-
-    def quick_sort(self, numbers):
-        """
-            快速排序,时间复杂度O(n^2)
-        :param numbers:
-        :return:
-        """
-        if len(numbers) <= 1:
-            return numbers
-        pivot = numbers[int(len(numbers) / 2)]
-        left = [x for x in numbers if x < pivot]
-        middle = [pivot]
-        right = [x for x in numbers if x > pivot]
-        return self.quick_sort(left) + middle + self.quick_sort(right)
-
-    def merge_sort(self, numbers):
-        """
-            归并排序
-        :param numbers:
-        :return:
-        """
-
-        pass
-
-    def radix_sort(self, numbers):
-        """
-            基数排序,时间复杂度O(n*k)
-        :param number:
-        :return:
-        """
-        # 记录当前正在排拿一位,最低位为1
-        i = 0
-        max_num = max(numbers)
-        # 记录最大值的位数
-        j = len(str(max_num))
-        while i < j:
-            # 初始化桶数组
-            bucket_list = [[] for _ in range(10)]
-            for x in numbers:
-                # 找到位置放入桶数组
-                bucket_list[int(x / (10 ** i)) % 10].append(x)
-
-            numbers.clear()
-            # 放回原序列
-            for x in bucket_list:
-                numbers.extend([a for a in x])
-            i += 1
-        return numbers
-
-    def send_mail(self):
-        sender = '1285211525@qq.com'
-        receivers = '1285211525@qq.com'
-        message = MIMEText('Python 邮件发送测试...', 'plain', 'utf-8')
-        message['From'] = Header('binren')
-        message['To'] = Header('zhangbr')
-        subject = 'test mail'
-        message['Subject'] = Header(subject, 'utf-8')
-        try:
-            stmp_obj = smtplib.SMTP('localhost')
-            stmp_obj.sendmail(sender, receivers, message.as_string())
-            print('send successful')
-        except Exception as e:
-            print(e)
-
-if __name__ == '__main__':
-    st = StockTest()
-    nums = [1, 2, 5, 3, 1, 101, 10, 1111, 2]
-    print(st.send_mail())

文件差異過大導致無法顯示
+ 0 - 40
test/text.txt


+ 0 - 109
test/threading_test.py

@@ -1,109 +0,0 @@
-# -*-coding:utf-8-*-
-
-import requests
-import random
-import json
-import time
-import threading
-
-
-class MyThread(threading.Thread):
-    def __init__(self, func, args, name=''):
-        threading.Thread.__init__(self)
-        self.name = name
-        self.func = func
-        self.args = args
-
-    def run(self):
-        self.result = self.func(self.args)
-
-    def get_result(self):
-        try:
-            return self.result
-        except Exception:
-            return None
-
-
-url = 'http://106.14.187.241:5555/elab-marketing-user//zhidi/activity/concurrent/turntableActivity'
-headers_2 = {'content-type': "application/json", 'Authorization': 'APP appid = 4abf1a,token = 9480295ab2e2eddb8',
-             'dsNo': 'source2'}
-
-
-def loop(nloop):
-    response = requests.post(url, json=nloop, headers=headers_2)
-    return response.text
-
-
-# 15
-def main():
-    result_list = []
-    threads = []
-    people = 200
-    times = range(people)
-    for i in times:
-        params = {
-            "actionType": 1,
-            "activityId": 8939,
-            "brandId": 1,
-            "customerId": 223222 + random.randint(5000, 9000),
-            "houseId": 10110,
-            "mobile": "15622363" + str(random.randint(1000, 9999)),
-            "shareActivityId": 0,
-            "uuid": ""
-        }
-        t = MyThread(loop, params, ' 第' + str(i + range(i + 1)[0]) + '线程')
-        threads.append(t)
-    for i in range(people):  # start threads 此处并不会执行线程,而是将任务分发到每个线程,同步线程。等同步完成后再开始执行start方法
-        threads[i].start()
-    for i in range(people):  # jion()方法等待线程完成
-        threads[i].join()
-        result = threads[i].get_result()
-        result_list.append(json.loads(result))
-    return result_list
-
-
-def analysis_result(result_list):
-    # 抽奖结果统计
-    dict_data = {}
-    # 中奖
-    prize_count = 0
-    # 谢谢参与
-    think_prize = 0
-    # 抽奖失败
-    failed_prize = 0
-    for x in result_list:
-        success = x['success']
-        if success is True:
-            # 抽奖成功,统计具体的抽奖情况
-            single = x['single']
-            name = single['name']
-            if name in dict_data.keys():
-                count = dict_data[name]
-                dict_data[name] = count + 1
-            else:
-                dict_data[name] = 1
-            thinks = single['thinks']
-            if str(thinks) == str(1):
-                think_prize += 1
-            else:
-                prize_count += 1
-
-        else:
-            # 抽奖失败统计
-            failed_prize += 1
-    all = prize_count + think_prize
-    for key in dict_data.keys():
-        count = dict_data[key]
-        if all > 0:
-            print(key + ':   ' + str(count), ",占比: " + str(count/all))
-        else:
-            print(key + ':   ' + str(count))
-    print('------------,参与抽奖人数:' + str(all))
-    print("中奖:" + str(prize_count), ",中奖率:" + str(prize_count / all))
-    print("谢谢参与; " + str(think_prize), ",谢谢参与率: " + str(think_prize / all))
-    # print("抽奖失败:" + str(failed_prize), ",抽奖失败率; " + str(failed_prize / len(result_list)))
-
-
-if __name__ == '__main__':
-    result = main()
-    analysis_result(result)

+ 0 - 13
test/weixin_token_test.py

@@ -1,13 +0,0 @@
-from apscheduler.triggers.cron import CronTrigger
-from flask_apscheduler import APScheduler
-
-keys = {
-            'args': '',
-            'trigger': 'cron',
-            'day_of_week': 'mon',
-            'hour': 10,
-            'minute': 50
-        }
-
-APScheduler().add_job('', id=1, func="", **keys)
-print(CronTrigger.from_crontab("0 */1 * * *"))

二進制
test/测ss试.xls


二進制
test/测ss试.xlsx