from datetime import datetime from Crypto.PublicKey import RSA import base64 from Crypto.Cipher import PKCS1_v1_5 import requests, time, json, uuid, datetime from flask import jsonify from config import * from dateutil.relativedelta import relativedelta import threading import urllib3 from Crypto.Cipher import AES from base64 import b64encode, b64decode urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) lock = threading.Lock() def format_uct(origin_date_str): origin_date_str = origin_date_str.split('.')[0] utc_date = datetime.datetime.strptime(origin_date_str, "%Y-%m-%dT%H:%M:%S") local_date = utc_date + datetime.timedelta(hours=8) local_date_str = datetime.datetime.strftime(local_date, '%Y-%m-%d %H:%M:%S') return local_date_str def queryUser(id): # 查询充值明细 # user = get_user_info(id) url = f'{base_path}/api/v5/app/entry/data/list' filter_data = { "app_id": pay_app, "entry_id": pay_details_id, "limit": 100, "filter": { "rel": "and", "cond": [ { "field": "order_status", "method": "eq", "value": ["已完成"] }, { "field": "id_card", "method": "eq", "value": [id] } ] } } pay_detail = turn_page(url, filter_data) for i in pay_detail: i['pay_time'] = format_uct(i['pay_time']) balance = 0 balance = float("%.2f" % balance) return {"balance": balance, "pay_detail": pay_detail} def filter_jdy_ht(id): """筛选简道云合同表单中驾驶员对应信息""" url = f'{base_path}/api/v5/app/entry/data/list' filter_data = { "app_id": pay_app, "entry_id": ht_id, "limit": 100, "filter": { "rel": "and", "cond": [ { "field": "id_card", "method": "eq", "value": [id] } ] } } print("开始查询合同信息",filter_data) ht_detail = turn_page(url, filter_data) if not ht_detail: return [] ht_detail = ht_detail[0] # 机构 base_company = ht_detail['base_company'] # 商户号 sh = '0001000F7152040' # 姓名 name = ht_detail['name'] # 车牌号 phone_num = ht_detail['phone_num'] return [base_company, sh, name, phone_num] def querydpUser(id): # 查询充值明细 # user = get_user_info(id) url = f'{base_path}/api/v5/app/entry/data/list' filter_data = { "app_id": pay_app, "entry_id": pay_details_id, "limit": 100, "filter": { "rel": "and", "cond": [ { "field": "order_status", "method": "eq", "value": ["已完成"] }, { "field": "id_card", "method": "eq", "value": [id] } ] } } pay_detail = turn_page(url, filter_data) for i in pay_detail: i['time'] = format_uct(i['time']) # # 查询个人余额 # info = { # "app_id": pay_app, # "entry_id": accout_id, # "filter": {"rel": "and", # "cond": [ # { # "field": "id_card", # "method": "eq", # "value": [id] # }, # { # "field": "date", # "method": "empty", # # } # ]} # } # # 查询个人余额账户表 # res = req_tool_jdy(url, info) # balance = 0 # if res['data']: # balance = res['data'][0]['balance'] # balance = float("%.2f" % balance) return {"balance": 0, "pay_detail": pay_detail} def get_last_8_hour(): _time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time() - 8 * 60 * 60)) return _time def turn_page(url, data): """翻页查询""" info = [] while 1: try: res = req_tool_jdy(url, data) info += res['data'] if len(res['data']) < 100: return info data["data_id"] = res["data"][-1]["_id"] except Exception as e: print("turn_page",e, res) def get_user_info(id, type=None): """获取用户信息""" url = f'{base_path}/api/v5/app/entry/data/list' if type: filter_data = { "app_id": fu_app, "entry_id": fu_id, "filter": { "rel": "and", "cond": [ { "field": "id_card", "method": "eq", "value": [id] } ] } } else: filter_data = { "app_id": fu_app, "entry_id": fu_id, "filter": { "rel": "and", "cond": [ { "field": "user", "method": "eq", "value": [id] } ] } } res = req_tool_jdy(url, filter_data) if res['data']: return res['data'][0] return '' def query_pay_details(id): # 查询某人未收付明细 url = f'{base_path}/api/v5/app/entry/data/list' filter_data = { "app_id": pay_app, "entry_id": uncollected_id, "limit": 100, "filter": { "rel": "and", "cond": [ { "field": "id_card", "method": "eq", "value": [id] }, { "field": "order_status", "method": "eq", "value": "已发起" }, ] } } uncollected_data = turn_page(url, filter_data) undata = {} # 按月份归类 uncollected_data for un in uncollected_data: if un['month1'] in undata.keys(): undata[un['month1']]['data'].append(un) else: undata[un['month1']] = {"data": []} undata[un['month1']]['data'] = [un] # # 按月查询增减项 flowState结束 amount = 0 for un in undata: _th_amount = 0 for index, j in enumerate(undata[un]['data']): amount += j['pay_amount'] _th_amount += j['pay_amount'] undata[un]['_th_amount'] = _th_amount undata['amount'] = float("%.2f" % amount) for i in list(undata.keys()): if i != 'amount' and not undata[i]['data']: del undata[i] return undata def req_tool_jdy(url, data): headers = {"Authorization": f"Bearer {apikey}"} while 1: try: res = requests.post(url, headers=headers, json=data, timeout=10, verify=False).json() if "code" in res.keys(): print("req_tool_jdy", res) time.sleep(2) continue return res except Exception as e: print(e) def req_tool_vx(url, data=None, flag=None): while 1: try: if not flag: res = requests.post(url, json=data, timeout=10, verify=False).json() else: res = requests.get(url, timeout=10, verify=False).json() return res except Exception as e: print(e) def add_payment(data, atype=None): lock.acquire() print("开始========", data, atype) # user = get_user_info(data['id']) # 充值成功后 回写充值明细 now_month = datetime.datetime.now().strftime("%Y-%m") now_time = get_last_8_hour() list_url = f'{base_path}/api/v5/app/entry/data/list' filter_data = { "app_id": pay_app, "entry_id": uncollected_id, "limit": 100, "filter": { "rel": "and", "cond": [ { "field": "id_card", "method": "eq", "value": data['id'] }, { "field": "base_company", "method": "eq", "value": data['base_company'] }, { "field": "order_status", "method": "eq", "value": "已发起" }, ] } } res = req_tool_jdy(list_url, filter_data) print("=========================res=========================",res) print(type(res)) _id = res["data"][-1]["_id"] url = f'{base_path}/api/v5/app/entry/data/update' info = { "app_id": pay_app, "entry_id": pay_details_id, "data_id": _id, "is_start_workflow": True, "is_start_trigger": True, "data": { "id_card": {"value": data['id']}, "name": {"value": data['name']}, "pay_type": {"value": data['pay_type']}, "pay_status": {"value": "已支付"}, "order_status": {"value": "已完成"}, "pay_time": {"value": now_time}, "transaction_id": {"value": data['transaction_id']} } } req_tool_jdy(url, info) lock.release() print("结束========") def rsa_long_decrypt(priv_key, msg, length=256): """ 1024bit的证书用128,2048bit证书用256位 """ try: privobj = PKCS1_v1_5.new(priv_key) res = [] b64_msg = base64.b64decode(msg) for i in range(0, len(b64_msg), length): chunk = b64_msg[i:i + length] decrypted_chunk = privobj.decrypt(chunk, 'xyz').decode('GBK') res.append(decrypted_chunk) print("解密回调完成:",res) except Exception as e: print(f"解密回调失败:{e}") return jsonify(error=str(e)), 500 return "".join(res) def decode_notify_data(res_json): try: ciphertext = res_json['resource']['ciphertext'] nonce = res_json['resource']['nonce'] associated_data = res_json['resource']['associated_data'] cipher = AES.new(v3_SECRET.encode(), AES.MODE_GCM, nonce=nonce.encode()) cipher.update(associated_data.encode()) en_data = b64decode(ciphertext.encode('utf-8')) auth_tag = en_data[-16:] _en_data = en_data[:-16] plaintext = cipher.decrypt_and_verify(_en_data, auth_tag) decodejson = json.loads(plaintext.decode()) except Exception as e: print(f"解密回调失败:{e}") return None return decodejson def get_wx_token(): url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={SECRET}' res = req_tool_vx(url, flag=1) return res['access_token'] def query_wx_fj_info(id): """查询企微中的附加信息""" token = get_wx_token() url = f'https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={token}&userid={id}' res = req_tool_vx(url, flag=1) val = "" print("query_wx_fj_info", res,id) for i in res['extattr']['attrs']: if i['name'] == '附加信息': val = i['value'] return val def query_wx_userid(code): """查询企微中的userid""" token = get_wx_token() url = f'https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo?access_token={token}&code={code}' res = req_tool_vx(url, flag=1) return res['userid']