Browse Source

提交批量打标的脚本

Signed-off-by: BinrenZhang <zhangbr@elab-plus.com>
BinrenZhang 4 years ago
parent
commit
2c34356d25

+ 6 - 0
test/Crawler.py

@@ -0,0 +1,6 @@
+from pprint import pprint
+from wechatsogou import WechatSogouAPI, WechatSogouConst
+ws_api = WechatSogouAPI()
+gzh_articles = ws_api.get_gzh_info('央视新闻',unlock_callback='unlock_callback')
+for i in gzh_articles:
+    pprint(i)

+ 58 - 0
test/__init__.py

@@ -0,0 +1,58 @@
+import requests
+import time
+import random
+
+
+# 获取网页原数据
+def get_json(url):
+    headers = {
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'}
+    params = {
+        'page_size': '10',
+        'next_offset': str(num),
+        'tag': '今日热门',
+        'platform': 'pc',
+    }
+    try:
+        html = requests.get(url, headers=headers, params=params)
+        return html.json()
+    except:
+        print("请求错误")
+        pass
+
+
+# 下载视频
+def downloader(url, path):
+    start = time.time()  # 开始时间
+    size = 0
+    headers = {
+        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36'}
+    response = requests.get(url, headers, stream=True)  # stream属性必须带上
+    chunk_size = 1024  # 每次下载的数据大小
+    content_size = int(response.headers['content-length'])  # 总大小
+    if response.status_code == 200:
+        print('[文件大小]:%0.2fMB' % (content_size / chunk_size / 1024))  # 换算单位
+        with open(path, 'wb')as f:
+            for data in response.iter_content(chunk_size=chunk_size):
+                f.write(data)
+                size += len(data)
+
+
+if __name__ == '__main__':
+    for i in range(10):
+        url = 'http://101.133.210.230:5555/elab-marketing-user//activityProduction/like'
+        num = i * 10 + 1
+        html = get_json(url)
+        infos = html['data']['items']
+        for info in infos:
+            title = info['item']['description']  # 小视频的标题
+            video_url = info['item']['video_playurl']  # 视频地址
+            print(title, video_url)
+            # 为了防止视频没有video_url
+            try:
+                downloader(video_url, path="%s.mp4" % title)
+                print("成功下载一个")
+            except BaseException:
+                print("下载失败")
+                pass
+        time.sleep(int(format(random.randint(2, 8))))  # 设置随机等待时间

+ 27 - 0
test/baidu.py

@@ -0,0 +1,27 @@
+# encoding:utf-8
+import requests
+
+
+def assess_token():
+    # client_id 为官网获取的AK, client_secret 为官网获取的SK
+    host = 'https://aip.baidubce.com/oauth/2.0/token?grant_type=client_credentials&client_id=orZUHE5R2gTiPzzY1X4cRaqi&client_secret=48ObBtvIoYNWdFQ8RTZGR6DsQq9VLBpb'
+    response = requests.get(host)
+    if response:
+        return response.json()
+
+
+def similarity(assess_token):
+    url = 'https://aip.baidubce.com/rpc/2.0/nlp/v2/simnet?charset=UTF-8&access_token=' + assess_token
+    body = {
+        "text_1": "张斌仁",
+        "text_2": "张斌",
+        "model": "CNN"
+    }
+    response = requests.post(url, json=body)
+    if response:
+        print(response.json())
+
+
+if __name__ == '__main__':
+    # print(assess_token())
+    similarity('24.2b77398439546b3a983e9b93b84dbe14.2592000.1610182288.282335-23131081')

+ 16 - 0
test/chart_test.py

@@ -0,0 +1,16 @@
+from random import randrange
+from pyecharts import options as opts
+from pyecharts.charts import Bar
+
+
+class Chart(object):
+
+    def bar_base(self) -> []:
+        c = (
+            Bar()
+                .add_xaxis(["衬衫", "羊毛衫", "雪纺衫", "裤子", "高跟鞋", "袜子"])
+                .add_yaxis("商家A", [randrange(0, 101) for _ in range(6)])
+                .set_global_opts(title_opts=opts.TitleOpts(title="Bar-基本示例", subtitle="我是副标题"))
+        )
+
+        return c

+ 1 - 0
test/crawler_test.py

@@ -0,0 +1 @@
+import request

+ 1 - 0
test/dictionary_data.py

@@ -0,0 +1 @@
+import openpyxl as ox

+ 28 - 0
test/eureka_test.py

@@ -0,0 +1,28 @@
+from flask import Flask
+import py_eureka_client.eureka_client as eureka_client
+
+app = Flask(__name__)
+
+
+def setEureka():
+    server_host = "localhost"
+    server_port = 5000
+    eureka_client.init(eureka_server="http://localhost:5100/eureka/",
+                       app_name="flask_server",
+                       # 当前组件的主机名,可选参数,如果不填写会自动计算一个,如果服务和 eureka 服务器部署在同一台机器,请必须填写,否则会计算出 127.0.0.1
+                       instance_host=server_host,
+                       instance_port=server_port,
+                       # 调用其他服务时的高可用策略,可选,默认为随机
+                       ha_strategy=eureka_client.HA_STRATEGY_RANDOM)
+
+
+setEureka()
+
+
+@app.route('/')
+def hello_world():
+    return 'Hello World!'
+
+
+if __name__ == '__main__':
+    app.run(debug=True, threaded=True, port=5000, host="0.0.0.0")

File diff suppressed because it is too large
+ 4102 - 0
test/html.html


+ 36 - 0
test/http_test.py

@@ -0,0 +1,36 @@
+import requests
+from mysql_db import MysqlDB
+headers_1 = {'content-type': "application/json", 'Authorization': 'APP appid = 4abf1a,token = 9480295ab2e2eddb8',
+             'dsNo': 'source1'}
+headers_2 = {'content-type': "application/json",
+             'Authorization': 'APP appid = 4abf1a,token = 9480295ab2e2eddb8',
+             'dsNo': 'source2',
+             'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, '
+                                             'like Gecko) Chrome/86.0.4240.111 Safari/537.36',
+             'Host': '101.132.43.32:5308',
+             'POST': '/news/queryNewsByPage HTTP/1.1',
+             'Connection': 'keep-alive',
+             'Accept': 'application/json;charset=UTF-8'
+             }
+
+
+if __name__ == '__main__':
+    import time
+    mysql = MysqlDB('marketing_db')
+    url = "https://api-uat3.elaber.cn/elab-marketing-content/materialLabel/markingForMaterial"
+    house = mysql.select('select a.id from house a LEFT JOIN brand_house_rlat b on a.id = b.house_id where a.status = 1 and b.status = 1 and b.brand_id = 1')
+    count = 0
+    for x in house:
+        print(x[0])
+        params = {
+            "deleteStatus": 2,
+            "materialId": x[0],
+            "materialType": 1,
+            "user": "batch"
+        }
+        response = requests.post(url, json=params, headers=headers_1)
+        print(response)
+        time.sleep(1)
+        count += 1
+        if count == 1:
+            break

+ 153 - 0
test/linked_list.py

@@ -0,0 +1,153 @@
+class Node(object):
+    def __init__(self, value):
+        self.value = value
+        self.next = None
+
+
+class LinkedListOneway(object):
+    def __init__(self, node=None):
+        self.__head = node
+
+    def get_head(self):
+        return self.__head
+
+    def __len__(self):
+        cur = self.__head
+        count = 0
+        while cur:
+            count += 1
+            cur = cur.next
+        return count
+
+    def is_empty(self):
+        return self.__head == None
+
+    def add(self, value):
+        """
+        头插法
+        先让新节点的next指向头节点
+        再将头节点替换为新节点
+        顺序不可错,要先保证原链表的链不断,否则头节点后面的链会丢失
+        """
+        node = Node(value)
+        node.next = self.__head
+        self.__head = node
+
+    def append(self, value):
+        """
+            尾插法
+        :param value:
+        :return:
+        """
+        node = Node(value)
+        cur = self.__head
+        if self.is_empty():
+            self.__head = node
+        else:
+            while cur.next:
+                cur = cur.next
+            cur.next = node
+
+    def insert(self, pos, value):
+        # 插入指定位置
+        if pos <= 0:
+            self.add(value)
+        elif pos > len(self) - 1:
+            self.append(value)
+        else:
+            node = Node(value)
+            prior = self.__head
+            count = 0
+            # 在插入位置的前一个节点停下
+            while count < (pos - 1):
+                prior = prior.next
+                count += 1
+            # 先将插入节点与节点后的节点连接,防止链表断掉,先链接后面的,再链接前面的
+            node.next = prior.next
+            prior.next = node
+
+    def remove_value(self, value):
+        """
+        删除值
+        :param value:
+        :return:
+        """
+        curr = self.__head
+        if curr is None:
+            raise Exception("链表为空,无法删除值")
+        i = 0
+        while curr:
+            if curr.value == value:
+                break
+            curr = curr.next
+            i += 1
+        prior = self.__head
+        j = 0
+        k = i
+        while prior:
+            if j == k - 1:
+                print(j)
+                break
+            prior = prior.next
+            j += 1
+        if i == 0:
+            self.__head = self.__head.next
+        elif i > self.__len__() - 1:
+            raise Exception("未查询到此数据")
+        elif 0 < i < self.__len__() - 1:
+            nextnext = prior.next
+            prior.next = nextnext.next
+        else:
+            pass
+        print("删除的值为:%s 位置为:%s" % (value, i))
+
+    def remove_index(self, index):
+        cur = self.__head
+        if cur is None:
+            raise Exception('链表为空,无法删除')
+        if index > self.__len__():
+            raise Exception('下标越界')
+        stu = self.__head
+        for i in range(index - 1):
+            stu = stu.nex
+        temp = stu.next
+        stu.next = temp.next
+
+    def search(self, value):
+        cur = self.__head
+        if cur is None:
+            raise Exception('链表为空')
+        while cur is not None:
+            if cur.value == value:
+                return True
+            else:
+                cur = cur.next
+        return False
+
+    def reverse(self):
+        cur = self.__head
+        pre = None
+        while cur:
+            prenext = cur.next
+            cur.next = pre
+            pre = cur
+            cur = prenext
+        self.__head = pre
+
+    def show(self):
+        cur = self.__head
+        while cur:
+            print(cur.value)
+            cur = cur.next
+
+
+if __name__ == '__main__':
+    head = LinkedListOneway()
+    head.append(1)
+    head.append(2)
+    head.append(3)
+    head.insert(2, 5)
+    head.remove_value(3)
+    # head.reverse()
+    head.show()
+

+ 31 - 0
test/mongodb_test.py

@@ -0,0 +1,31 @@
+from pymongo import MongoClient
+
+
+class MongoDB(object):
+    conn_addr1 = 'dds-uf6da0fedc9881d41.mongodb.rds.aliyuncs.com:3717'
+    conn_addr2 = 'dds-uf6da0fedc9881d42.mongodb.rds.aliyuncs.com:3717'
+    replicat_set = 'mgset-12835903'
+    username = 'brand'
+    password = 'brand'
+
+    def __init__(self, col_name, db_name=None):
+
+        self.client = MongoClient([self.conn_addr1, self.conn_addr2], replicaSet=self.replicat_set)
+        self.client.diaoyanbao.authenticate(self.username, self.password)
+        if db_name:
+            self.mydb = self.client[db_name]
+        else:
+            self.mydb = self.client['diaoyanbao']
+        self.mycol = self.mydb[col_name]
+
+    def find(self, query):
+        data = self.mycol.find(query)
+        return [x for x in data]
+
+
+if __name__ == '__main__':
+    mongodb = MongoDB('answers')
+    query = {'testcaseId': 84}
+    datas = mongodb.find(query)
+    print(datas[0])
+

+ 208 - 0
test/stock_test.py

@@ -0,0 +1,208 @@
+import smtplib
+from email.mime.text import MIMEText
+from email.header import Header
+
+"""
+    回溯算法(试探法)
+    在搜索尝试过程中寻找问题的解,当发现已不满足求解条件时,就“回溯”返回,尝试别的路径。回溯法是一种选优搜索法,按选优条件向前搜索,以达到目标。但当探索到某一步时,发现原先选择并不优或达不到目标,就退回一步重新选择,这种走不通就退回再走的技术为回溯法,而满足回溯条件的某个状态的点称为“回溯点”。
+
+    回溯算法解决问题的
+    针对所给问题,定义问题的解空间,它至少包含问题的一个(最优)解。
+    确定易于搜索的解空间结构,使得能用回溯法方便地搜索整个解空间 。
+    以深度优先的方式搜索解空间,并且在搜索过程中用剪枝函数避免无效搜索。
+"""
+
+
+class StockTest(object):
+    """
+        栈练习
+    """
+
+    def queen(self, A, cur=0):
+        """
+            八皇后问题
+        :param A:
+        :param cur:
+        :return:
+        """
+        if cur == len(A):
+            print(A)
+            # return 0
+        else:
+            for col in range(len(A)):
+                A[cur], flag = col, True
+                # 判断当前与之前的所有是否规则冲突
+                for row in range(cur):
+                    if A[row] == col or abs(col - A[row]) == cur - row:
+                        flag = False
+                        break
+                if flag:
+                    self.queen(A, cur + 1)
+
+    def movingCount(self, threshold, rows, cols):
+        "产生 0 矩阵 "
+        board = [[0 for i in range(cols)] for j in range(rows)]
+        global acc
+        acc = 0
+        "下标之和,若大于threshold则TRUE,否则Folse"
+
+        def block(r, c):
+            s = sum(map(int, str(r) + str(c)))
+            return s > threshold
+
+        def traverse(r, c):
+            global acc
+            if not (0 <= r < rows and 0 <= c < cols):  # 超出角标范围挑出
+                return
+            if board[r][c] != 0:  # 不等于0 跳出
+                return
+            if board[r][c] == -1 or block(r, c):
+                board[r][c] = -1  # 超出门限的点记录-1
+                return
+
+            board[r][c] = 1  # 符合规定的点记录1,并计数加一
+            acc += 1
+            traverse(r + 1, c)
+            traverse(r - 1, c)
+            traverse(r, c + 1)
+            traverse(r, c - 1)
+
+        traverse(0, 0)
+        return acc
+
+    def bubble_sort(self, numbers):
+        """
+            时间复杂度:O(n^2)
+        :param numbers:
+        :return:
+        """
+        n = len(numbers)
+        for i in range(n):
+            for j in range(0, n - i - 1):
+                if numbers[j] > numbers[j + 1]:
+                    numbers[j], numbers[j + 1] = numbers[j + 1], numbers[j]
+
+        return numbers
+
+    def choose_sort(self, numbers):
+        """
+            选择排序,时间复杂度O(n^2)
+        :param numbers:
+        :return:
+        """
+        n = len(numbers)
+        for i in range(n):
+            min_index = i
+            min_value = numbers[i]
+            for j in range(i + 1, n):
+                if min_value > numbers[j]:
+                    min_value = numbers[j]
+                    min_index = j
+            if min_index != i:
+                numbers[min_index] = numbers[i]
+                numbers[i] = min_value
+
+        return numbers
+
+    def insert_sort(self, numbers):
+        """
+            插入排序,时间复杂度O(n^2)
+        :return:
+        """
+        n = len(numbers)
+        for i in range(1, n):
+            insert_index = i - 1
+            insert_value = numbers[i]
+            while insert_index >= 0 and insert_value < numbers[insert_index]:
+                numbers[insert_index + 1] = numbers[insert_index]
+                insert_index -= 1
+            if insert_index + 1 != i:
+                numbers[insert_index + 1] = insert_value
+        return numbers
+
+    def shell_sort(self, numbers):
+        """
+            希尔排序,时间复杂度O(n^2)
+        :param numbers:
+        :return:
+        """
+        n = len(numbers)
+        gap = int(n / 2)
+        while gap > 0:
+            for i in range(gap, n):
+                temp = numbers[i]
+                j = i
+                while j >= gap and numbers[j - gap] > temp:
+                    numbers[j] = numbers[j - gap]
+                    j -= gap
+                numbers[i] = temp
+            gap = int(gap / 2)
+        return numbers
+
+    def quick_sort(self, numbers):
+        """
+            快速排序,时间复杂度O(n^2)
+        :param numbers:
+        :return:
+        """
+        if len(numbers) <= 1:
+            return numbers
+        pivot = numbers[int(len(numbers) / 2)]
+        left = [x for x in numbers if x < pivot]
+        middle = [pivot]
+        right = [x for x in numbers if x > pivot]
+        return self.quick_sort(left) + middle + self.quick_sort(right)
+
+    def merge_sort(self, numbers):
+        """
+            归并排序
+        :param numbers:
+        :return:
+        """
+
+        pass
+
+    def radix_sort(self, numbers):
+        """
+            基数排序,时间复杂度O(n*k)
+        :param number:
+        :return:
+        """
+        # 记录当前正在排拿一位,最低位为1
+        i = 0
+        max_num = max(numbers)
+        # 记录最大值的位数
+        j = len(str(max_num))
+        while i < j:
+            # 初始化桶数组
+            bucket_list = [[] for _ in range(10)]
+            for x in numbers:
+                # 找到位置放入桶数组
+                bucket_list[int(x / (10 ** i)) % 10].append(x)
+
+            numbers.clear()
+            # 放回原序列
+            for x in bucket_list:
+                numbers.extend([a for a in x])
+            i += 1
+        return numbers
+
+    def send_mail(self):
+        sender = '1285211525@qq.com'
+        receivers = '1285211525@qq.com'
+        message = MIMEText('Python 邮件发送测试...', 'plain', 'utf-8')
+        message['From'] = Header('binren')
+        message['To'] = Header('zhangbr')
+        subject = 'test mail'
+        message['Subject'] = Header(subject, 'utf-8')
+        try:
+            stmp_obj = smtplib.SMTP('localhost')
+            stmp_obj.sendmail(sender, receivers, message.as_string())
+            print('send successful')
+        except Exception as e:
+            print(e)
+
+if __name__ == '__main__':
+    st = StockTest()
+    nums = [1, 2, 5, 3, 1, 101, 10, 1111, 2]
+    print(st.send_mail())

File diff suppressed because it is too large
+ 40 - 0
test/text.txt


+ 109 - 0
test/threading_test.py

@@ -0,0 +1,109 @@
+# -*-coding:utf-8-*-
+
+import requests
+import random
+import json
+import time
+import threading
+
+
+class MyThread(threading.Thread):
+    def __init__(self, func, args, name=''):
+        threading.Thread.__init__(self)
+        self.name = name
+        self.func = func
+        self.args = args
+
+    def run(self):
+        self.result = self.func(self.args)
+
+    def get_result(self):
+        try:
+            return self.result
+        except Exception:
+            return None
+
+
+url = 'http://106.14.187.241:5555/elab-marketing-user//zhidi/activity/concurrent/turntableActivity'
+headers_2 = {'content-type': "application/json", 'Authorization': 'APP appid = 4abf1a,token = 9480295ab2e2eddb8',
+             'dsNo': 'source2'}
+
+
+def loop(nloop):
+    response = requests.post(url, json=nloop, headers=headers_2)
+    return response.text
+
+
+# 15
+def main():
+    result_list = []
+    threads = []
+    people = 200
+    times = range(people)
+    for i in times:
+        params = {
+            "actionType": 1,
+            "activityId": 8939,
+            "brandId": 1,
+            "customerId": 223222 + random.randint(5000, 9000),
+            "houseId": 10110,
+            "mobile": "15622363" + str(random.randint(1000, 9999)),
+            "shareActivityId": 0,
+            "uuid": ""
+        }
+        t = MyThread(loop, params, ' 第' + str(i + range(i + 1)[0]) + '线程')
+        threads.append(t)
+    for i in range(people):  # start threads 此处并不会执行线程,而是将任务分发到每个线程,同步线程。等同步完成后再开始执行start方法
+        threads[i].start()
+    for i in range(people):  # jion()方法等待线程完成
+        threads[i].join()
+        result = threads[i].get_result()
+        result_list.append(json.loads(result))
+    return result_list
+
+
+def analysis_result(result_list):
+    # 抽奖结果统计
+    dict_data = {}
+    # 中奖
+    prize_count = 0
+    # 谢谢参与
+    think_prize = 0
+    # 抽奖失败
+    failed_prize = 0
+    for x in result_list:
+        success = x['success']
+        if success is True:
+            # 抽奖成功,统计具体的抽奖情况
+            single = x['single']
+            name = single['name']
+            if name in dict_data.keys():
+                count = dict_data[name]
+                dict_data[name] = count + 1
+            else:
+                dict_data[name] = 1
+            thinks = single['thinks']
+            if str(thinks) == str(1):
+                think_prize += 1
+            else:
+                prize_count += 1
+
+        else:
+            # 抽奖失败统计
+            failed_prize += 1
+    all = prize_count + think_prize
+    for key in dict_data.keys():
+        count = dict_data[key]
+        if all > 0:
+            print(key + ':   ' + str(count), ",占比: " + str(count/all))
+        else:
+            print(key + ':   ' + str(count))
+    print('------------,参与抽奖人数:' + str(all))
+    print("中奖:" + str(prize_count), ",中奖率:" + str(prize_count / all))
+    print("谢谢参与; " + str(think_prize), ",谢谢参与率: " + str(think_prize / all))
+    # print("抽奖失败:" + str(failed_prize), ",抽奖失败率; " + str(failed_prize / len(result_list)))
+
+
+if __name__ == '__main__':
+    result = main()
+    analysis_result(result)

+ 13 - 0
test/weixin_token_test.py

@@ -0,0 +1,13 @@
+from apscheduler.triggers.cron import CronTrigger
+from flask_apscheduler import APScheduler
+
+keys = {
+            'args': '',
+            'trigger': 'cron',
+            'day_of_week': 'mon',
+            'hour': 10,
+            'minute': 50
+        }
+
+APScheduler().add_job('', id=1, func="", **keys)
+print(CronTrigger.from_crontab("0 */1 * * *"))

BIN
test/测ss试.xls


BIN
test/测ss试.xlsx