Files
sh-enterprise-wechat-paymen…/WXPay.py
jefferyzhao 506dceb0f7 first commit
2025-07-31 15:53:14 +08:00

101 lines
3.4 KiB
Python

import json
import random
import socket
import string
import time
from config import *
from Cryptodome.PublicKey import RSA
from Cryptodome.Signature import pkcs1_15
from Cryptodome.Hash import SHA256
import requests
# 生成签名函数
from base64 import b64encode, b64decode
import requests
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15, PKCS1_v1_5
# 获取加密
def get_sign(sign_str, gs):
if gs == "金银建":
rsa_key = RSA.importKey(open('jyj.pem').read())
elif gs == "华建":
rsa_key = RSA.importKey(open('hj.pem').read())
elif gs == "银建":
rsa_key = RSA.importKey(open('yj.pem').read())
elif "新能源" in gs:
rsa_key = RSA.importKey(open('xny.pem').read())
else:
rsa_key = RSA.importKey(open('apiclient_key.pem').read())
signer = pkcs1_15.new(rsa_key)
digest = SHA256.new(sign_str.encode('utf8'))
sign = b64encode(signer.sign(digest)).decode('utf-8')
return sign
class WXPay:
"""
微信 H5支付
"""
def __init__(self, serial_no, mchid):
self.appid = corpid
self.mchid = mchid
self.url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/h5'
self.notify_url = "http://web.jiyuankeshang.com/api/paymentCallBack"
self.serial_no = serial_no # 商户号证书序列号
# 统一下单
def unified_order(self, order_no, total, description, gs):
data = {
"mchid": self.mchid,
"out_trade_no": order_no,
"appid": self.appid,
"description": description,
"notify_url": self.notify_url,
"amount": {
"total": total,
"currency": "CNY"
},
"scene_info": {
"payer_client_ip": ip,
"h5_info": {
"type": "Wap"
}
}
}
data = json.dumps(data) # 只能序列化一次
random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
time_stamps = str(int(time.time()))
sign_str = f"POST\n{'/v3/pay/transactions/h5'}\n{time_stamps}\n{random_str}\n{data}\n"
sign = get_sign(sign_str, gs)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' + f'mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{self.serial_no}"'
}
response = requests.post(self.url, data=data, headers=headers)
print("=============", response.text, self.notify_url)
return response
def decode_notify_data(self, res_json):
try:
ciphertext = res_json['resource']['ciphertext']
nonce = res_json['resource']['nonce']
associated_data = res_json['resource']['associated_data']
cipher = AES.new(v3_SECRET.encode(), AES.MODE_GCM, nonce=nonce.encode())
cipher.update(associated_data.encode())
en_data = b64decode(ciphertext.encode('utf-8'))
auth_tag = en_data[-16:]
_en_data = en_data[:-16]
plaintext = cipher.decrypt_and_verify(_en_data, auth_tag)
decodejson = json.loads(plaintext.decode())
except Exception as e:
print(f"解密回调失败:{e}")
return None
return decodejson