Files
jefferyzhao 9565e29ae0 first commit
2025-07-31 17:39:37 +08:00

395 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from Crypto.PublicKey import RSA
import base64
from Crypto.Cipher import PKCS1_v1_5
import requests, time, json, uuid, datetime
from flask import jsonify
from config import *
from dateutil.relativedelta import relativedelta
import threading
import urllib3
from Crypto.Cipher import AES
from base64 import b64encode, b64decode
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
lock = threading.Lock()
def format_uct(origin_date_str):
origin_date_str = origin_date_str.split('.')[0]
utc_date = datetime.datetime.strptime(origin_date_str, "%Y-%m-%dT%H:%M:%S")
local_date = utc_date + datetime.timedelta(hours=8)
local_date_str = datetime.datetime.strftime(local_date, '%Y-%m-%d %H:%M:%S')
return local_date_str
def queryUser(id):
# 查询充值明细
# user = get_user_info(id)
url = f'{base_path}/api/v5/app/entry/data/list'
filter_data = {
"app_id": pay_app,
"entry_id": pay_details_id,
"limit": 100,
"filter": {
"rel": "and",
"cond": [
{
"field": "order_status",
"method": "eq",
"value": ["已完成"]
},
{
"field": "id_card",
"method": "eq",
"value": [id]
}
]
}
}
pay_detail = turn_page(url, filter_data)
for i in pay_detail:
i['pay_time'] = format_uct(i['pay_time'])
balance = 0
balance = float("%.2f" % balance)
return {"balance": balance, "pay_detail": pay_detail}
def filter_jdy_ht(id):
"""筛选简道云合同表单中驾驶员对应信息"""
url = f'{base_path}/api/v5/app/entry/data/list'
filter_data = {
"app_id": pay_app,
"entry_id": ht_id,
"limit": 100,
"filter": {
"rel": "and",
"cond": [
{
"field": "id_card",
"method": "eq",
"value": [id]
}
]
}
}
print("开始查询合同信息",filter_data)
ht_detail = turn_page(url, filter_data)
if not ht_detail:
return []
ht_detail = ht_detail[0]
# 机构
base_company = ht_detail['base_company']
# 商户号
sh = '0001000F7152040'
# 姓名
name = ht_detail['name']
# 车牌号
phone_num = ht_detail['phone_num']
return [base_company, sh, name, phone_num]
def querydpUser(id):
# 查询充值明细
# user = get_user_info(id)
url = f'{base_path}/api/v5/app/entry/data/list'
filter_data = {
"app_id": pay_app,
"entry_id": pay_details_id,
"limit": 100,
"filter": {
"rel": "and",
"cond": [
{
"field": "order_status",
"method": "eq",
"value": ["已完成"]
},
{
"field": "id_card",
"method": "eq",
"value": [id]
}
]
}
}
pay_detail = turn_page(url, filter_data)
for i in pay_detail:
i['time'] = format_uct(i['time'])
# # 查询个人余额
# info = {
# "app_id": pay_app,
# "entry_id": accout_id,
# "filter": {"rel": "and",
# "cond": [
# {
# "field": "id_card",
# "method": "eq",
# "value": [id]
# },
# {
# "field": "date",
# "method": "empty",
#
# }
# ]}
# }
# # 查询个人余额账户表
# res = req_tool_jdy(url, info)
# balance = 0
# if res['data']:
# balance = res['data'][0]['balance']
# balance = float("%.2f" % balance)
return {"balance": 0, "pay_detail": pay_detail}
def get_last_8_hour():
_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time() - 8 * 60 * 60))
return _time
def turn_page(url, data):
"""翻页查询"""
info = []
while 1:
try:
res = req_tool_jdy(url, data)
info += res['data']
if len(res['data']) < 100:
return info
data["data_id"] = res["data"][-1]["_id"]
except Exception as e:
print("turn_page",e, res)
def get_user_info(id, type=None):
"""获取用户信息"""
url = f'{base_path}/api/v5/app/entry/data/list'
if type:
filter_data = {
"app_id": fu_app,
"entry_id": fu_id,
"filter": {
"rel": "and",
"cond": [
{
"field": "id_card",
"method": "eq",
"value": [id]
}
]
}
}
else:
filter_data = {
"app_id": fu_app,
"entry_id": fu_id,
"filter": {
"rel": "and",
"cond": [
{
"field": "user",
"method": "eq",
"value": [id]
}
]
}
}
res = req_tool_jdy(url, filter_data)
if res['data']:
return res['data'][0]
return ''
def query_pay_details(id):
# 查询某人未收付明细
url = f'{base_path}/api/v5/app/entry/data/list'
filter_data = {
"app_id": pay_app,
"entry_id": uncollected_id,
"limit": 100,
"filter": {
"rel": "and",
"cond": [
{
"field": "id_card",
"method": "eq",
"value": [id]
},
{
"field": "order_status",
"method": "eq",
"value": "已发起"
},
]
}
}
uncollected_data = turn_page(url, filter_data)
undata = {}
# 按月份归类 uncollected_data
for un in uncollected_data:
if un['month1'] in undata.keys():
undata[un['month1']]['data'].append(un)
else:
undata[un['month1']] = {"data": []}
undata[un['month1']]['data'] = [un]
# # 按月查询增减项 flowState结束
amount = 0
for un in undata:
_th_amount = 0
for index, j in enumerate(undata[un]['data']):
amount += j['pay_amount']
_th_amount += j['pay_amount']
undata[un]['_th_amount'] = _th_amount
undata['amount'] = float("%.2f" % amount)
for i in list(undata.keys()):
if i != 'amount' and not undata[i]['data']:
del undata[i]
return undata
def req_tool_jdy(url, data):
headers = {"Authorization": f"Bearer {apikey}"}
while 1:
try:
res = requests.post(url, headers=headers, json=data, timeout=10, verify=False).json()
if "code" in res.keys():
print("req_tool_jdy", res)
time.sleep(2)
continue
return res
except Exception as e:
print(e)
def req_tool_vx(url, data=None, flag=None):
while 1:
try:
if not flag:
res = requests.post(url, json=data, timeout=10, verify=False).json()
else:
res = requests.get(url, timeout=10, verify=False).json()
return res
except Exception as e:
print(e)
def add_payment(data, atype=None):
lock.acquire()
print("开始========", data, atype)
# user = get_user_info(data['id'])
# 充值成功后 回写充值明细
now_month = datetime.datetime.now().strftime("%Y-%m")
now_time = get_last_8_hour()
list_url = f'{base_path}/api/v5/app/entry/data/list'
filter_data = {
"app_id": pay_app,
"entry_id": uncollected_id,
"limit": 100,
"filter": {
"rel": "and",
"cond": [
{
"field": "id_card",
"method": "eq",
"value": data['id']
},
{
"field": "base_company",
"method": "eq",
"value": data['base_company']
},
{
"field": "order_status",
"method": "eq",
"value": "已发起"
},
]
}
}
res = req_tool_jdy(list_url, filter_data)
print("=========================res=========================",res)
print(type(res))
_id = res["data"][-1]["_id"]
url = f'{base_path}/api/v5/app/entry/data/update'
info = {
"app_id": pay_app,
"entry_id": pay_details_id,
"data_id": _id,
"is_start_workflow": True,
"is_start_trigger": True,
"data": {
"id_card": {"value": data['id']}, "name": {"value": data['name']},
"pay_type": {"value": data['pay_type']},
"pay_status": {"value": "已支付"}, "order_status": {"value": "已完成"},
"pay_time": {"value": now_time}, "transaction_id": {"value": data['transaction_id']}
}
}
req_tool_jdy(url, info)
lock.release()
print("结束========")
def rsa_long_decrypt(priv_key, msg, length=256):
"""
1024bit的证书用1282048bit证书用256位
"""
try:
privobj = PKCS1_v1_5.new(priv_key)
res = []
b64_msg = base64.b64decode(msg)
for i in range(0, len(b64_msg), length):
chunk = b64_msg[i:i + length]
decrypted_chunk = privobj.decrypt(chunk, 'xyz').decode('GBK')
res.append(decrypted_chunk)
print("解密回调完成:",res)
except Exception as e:
print(f"解密回调失败:{e}")
return jsonify(error=str(e)), 500
return "".join(res)
def decode_notify_data(res_json):
try:
ciphertext = res_json['resource']['ciphertext']
nonce = res_json['resource']['nonce']
associated_data = res_json['resource']['associated_data']
cipher = AES.new(v3_SECRET.encode(), AES.MODE_GCM, nonce=nonce.encode())
cipher.update(associated_data.encode())
en_data = b64decode(ciphertext.encode('utf-8'))
auth_tag = en_data[-16:]
_en_data = en_data[:-16]
plaintext = cipher.decrypt_and_verify(_en_data, auth_tag)
decodejson = json.loads(plaintext.decode())
except Exception as e:
print(f"解密回调失败:{e}")
return None
return decodejson
def get_wx_token():
url = f'https://qyapi.weixin.qq.com/cgi-bin/gettoken?corpid={corpid}&corpsecret={SECRET}'
res = req_tool_vx(url, flag=1)
return res['access_token']
def query_wx_fj_info(id):
"""查询企微中的附加信息"""
token = get_wx_token()
url = f'https://qyapi.weixin.qq.com/cgi-bin/user/get?access_token={token}&userid={id}'
res = req_tool_vx(url, flag=1)
val = ""
print("query_wx_fj_info", res,id)
for i in res['extattr']['attrs']:
if i['name'] == '附加信息':
val = i['value']
return val
def query_wx_userid(code):
"""查询企微中的userid"""
token = get_wx_token()
url = f'https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo?access_token={token}&code={code}'
res = req_tool_vx(url, flag=1)
return res['userid']