import json import random import socket import string import time from config import * from Cryptodome.PublicKey import RSA from Cryptodome.Signature import pkcs1_15 from Cryptodome.Hash import SHA256 import requests # 生成签名函数 from base64 import b64encode, b64decode import requests from Crypto.Cipher import AES from Crypto.Hash import SHA256 from Crypto.PublicKey import RSA from Crypto.Signature import pkcs1_15, PKCS1_v1_5 # 获取加密 def get_sign(sign_str, gs): if gs == "金银建": rsa_key = RSA.importKey(open('jyj.pem').read()) elif gs == "华建": rsa_key = RSA.importKey(open('hj.pem').read()) elif gs == "银建": rsa_key = RSA.importKey(open('yj.pem').read()) elif "新能源" in gs: rsa_key = RSA.importKey(open('xny.pem').read()) else: rsa_key = RSA.importKey(open('apiclient_key.pem').read()) signer = pkcs1_15.new(rsa_key) digest = SHA256.new(sign_str.encode('utf8')) sign = b64encode(signer.sign(digest)).decode('utf-8') return sign class WXPay: """ 微信 H5支付 """ def __init__(self, serial_no, mchid): self.appid = corpid self.mchid = mchid self.url = 'https://api.mch.weixin.qq.com/v3/pay/transactions/h5' self.notify_url = "http://web.jiyuankeshang.com/api/paymentCallBack" self.serial_no = serial_no # 商户号证书序列号 # 统一下单 def unified_order(self, order_no, total, description, gs): data = { "mchid": self.mchid, "out_trade_no": order_no, "appid": self.appid, "description": description, "notify_url": self.notify_url, "amount": { "total": total, "currency": "CNY" }, "scene_info": { "payer_client_ip": ip, "h5_info": { "type": "Wap" } } } data = json.dumps(data) # 只能序列化一次 random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32)) time_stamps = str(int(time.time())) sign_str = f"POST\n{'/v3/pay/transactions/h5'}\n{time_stamps}\n{random_str}\n{data}\n" sign = get_sign(sign_str, gs) headers = { 'Content-Type': 'application/json; charset=UTF-8', 'Authorization': 'WECHATPAY2-SHA256-RSA2048 ' + f'mchid="{self.mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{self.serial_no}"' } response = requests.post(self.url, data=data, headers=headers) print("=============", response.text, self.notify_url) return response def decode_notify_data(self, res_json): try: ciphertext = res_json['resource']['ciphertext'] nonce = res_json['resource']['nonce'] associated_data = res_json['resource']['associated_data'] cipher = AES.new(v3_SECRET.encode(), AES.MODE_GCM, nonce=nonce.encode()) cipher.update(associated_data.encode()) en_data = b64decode(ciphertext.encode('utf-8')) auth_tag = en_data[-16:] _en_data = en_data[:-16] plaintext = cipher.decrypt_and_verify(_en_data, auth_tag) decodejson = json.loads(plaintext.decode()) except Exception as e: print(f"解密回调失败:{e}") return None return decodejson