import base64
import random
import string
import time
import traceback
from datetime import datetime
import requests
from BaseMethods.log import log
from Crypto.PublicKey import RSA
from Crypto.Signature import pkcs1_15
from Cryptodome.Hash import SHA256
from sqlalchemy.util import b64encode
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
# 各包版本
# django-ratelimit==3.0.1
# SQLAlchemy~=1.4.44
# pycryptodome==3.16.0
# pycryptodomex==3.16.0
# cryptography~=38.0.4
# Django~=3.2.4
# 获取唯一标识
def get_uuid(utype=0):
"""
唯一码
:param utype:
:return:
"""
if utype == 0:
return uuid.uuid1()
elif utype == 1:
return str(uuid.uuid1())
elif utype == 2:
return str(uuid.uuid1().hex)
elif utype == 3:
return str((uuid.uuid5(uuid.NAMESPACE_DNS, str(uuid.uuid1()) + str(random.random()))))
# 获取当前时间
def get_now_time(type=0):
"""
:param type: 类型0-5
:return: yyyy-mm-dd HH:MM:SS;y-m-d H:M:S.f;y-m-d;ymdHMS;y年m月d日h时M分S秒
"""
if type == 0:
return datetime.datetime.now()
elif type == 1:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
elif type == 2:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
elif type == 3:
return datetime.datetime.now().strftime("%Y-%m-%d")
elif type == 4:
return datetime.datetime.now().strftime("%Y%m%d%H%M%S")
elif type == 5:
locale.setlocale(locale.LC_CTYPE, 'chinese')
timestr = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
t = time.strptime(timestr, "%Y-%m-%d %H:%M:%S")
result = (time.strftime("%Y年%m月%d日%H时%M分%S秒", t))
return result
elif type == 6:
return datetime.datetime.now().strftime("%Y%m%d")
# 重构系统jargon类,用于处理时间格式报错问题
class DateEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, bytes):
return str(obj, encoding='utf-8')
elif isinstance(obj, uuid.UUID):
return str(obj)
elif isinstance(obj, datetime.time):
return obj.strftime('%H:%M')
elif isinstance(obj, datetime.timedelta):
return str(obj)
else:
return json.JSONEncoder.default(self, obj)
def decrypt(nonce, ciphertext, associated_data, pay_key):
"""
AES解密
:param nonce:
:param ciphertext:
:param associated_data:
:param pay_key:
:return:
"""
key = pay_key
key_bytes = str.encode(key)
nonce_bytes = str.encode(nonce)
ad_bytes = str.encode(associated_data)
data = base64.b64decode(ciphertext)
aesgcm = AESGCM(key_bytes)
return aesgcm.decrypt(nonce_bytes, data, ad_bytes)
def order_num():
"""
生成订单号
:return:
"""
# 下单时间的年月日毫秒12+随机数8位
now_time = datetime.now()
result = str(now_time.year) + str(now_time.month) + str(now_time.day) + str(now_time.microsecond) + str(
random.randrange(10000000, 99999999))
return result
def get_sign(sign_str):
"""
定义生成签名的函数
:param sign_str:
:return:
"""
try:
with open(r'static/cret/apiclient_key.pem') as f:
private_key = f.read()
rsa_key = RSA.importKey(private_key)
signer = pkcs1_15.new(rsa_key)
digest = SHA256.new(sign_str.encode('utf-8'))
# sign = b64encode(signer.sign(digest)).decode('utf-8')
sign = b64encode(signer.sign(digest))
return sign
except Exception as e:
log.error("生成签名的函数方法报错【func:get_sign;sign_str:%s】:%s ==> %s" % (sign_str, e, traceback.format_exc()))
def check_wx_cert(response, mchid, pay_key, serial_no):
"""
微信平台证书
:param response: 请求微信支付平台所对应的的接口返回的响应值
:param mchid: 商户号
:param pay_key: 商户号秘钥
:param serial_no: 证书序列号
:return:
"""
wechatpay_serial, wechatpay_timestamp, wechatpay_nonce, wechatpay_signature, certificate = None, None, None, None, None
try:
# 11.应答签名验证
wechatpay_serial = response.headers['Wechatpay-Serial'] # 获取HTTP头部中包括回调报文的证书序列号
wechatpay_signature = response.headers['Wechatpay-Signature'] # 获取HTTP头部中包括回调报文的签名
wechatpay_timestamp = response.headers['Wechatpay-Timestamp'] # 获取HTTP头部中包括回调报文的时间戳
wechatpay_nonce = response.headers['Wechatpay-Nonce'] # 获取HTTP头部中包括回调报文的随机串
# 11.1.获取微信平台证书 (等于又把前面的跑一遍,实际上应是获得一次证书就存起来,不用每次都重新获取一次)
url2 = "https://api.mch.weixin.qq.com/v3/certificates"
# 11.2.生成证书请求随机串
random_str2 = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
# 11.3.生成证书请求时间戳
time_stamps2 = str(int(time.time()))
# 11.4.生成请求证书的签名串
data2 = ""
sign_str2 = f"GET\n{'/v3/certificates'}\n{time_stamps2}\n{random_str2}\n{data2}\n"
# 11.5.生成签名
sign2 = get_sign(sign_str2)
# 11.6.生成HTTP请求头
headers2 = {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": 'WECHATPAY2-SHA256-RSA2048 '
+ f'mchid="{mchid}",nonce_str="{random_str2}",signature="{sign2}",timestamp="{time_stamps2}",serial_no="{serial_no}"'
}
# 11.7.发送请求获得证书
response2 = requests.get(url2, headers=headers2) # 只需要请求头
cert = response2.json()
# 11.8.证书解密
nonce = cert["data"][0]['encrypt_certificate']['nonce']
ciphertext = cert["data"][0]['encrypt_certificate']['ciphertext']
associated_data = cert["data"][0]['encrypt_certificate']['associated_data']
serial_no = cert["data"][0]['serial_no']
certificate = decrypt(nonce, ciphertext, associated_data, pay_key)
except Exception as e:
log.error(f"微信平台证书验证报错:{e};{traceback.format_exc()}")
return wechatpay_serial, wechatpay_timestamp, wechatpay_nonce, wechatpay_signature, certificate, serial_no
def verify(check_data, signature, certificate):
"""
验签函数
:param check_data:
:param signature:
:param certificate:
:return:
"""
key = RSA.importKey(certificate) # 这里直接用了解密后的证书,但没有去导出公钥,似乎也是可以的。怎么导公钥还没搞懂。
verifier = pkcs1_15.new(key)
hash_obj = SHA256.new(check_data.encode('utf8'))
return verifier.verify(hash_obj, base64.b64decode(signature))
def make_headers_v3(mchid, serial_num, data='', method='GET'):
"""
定义微信支付请求接口中请求头认证
:param mchid: 商户ID
:param serial_num: 证书序列号
:param data: 请求体内容
:param method: 请求方法
:return: headers(请求头)
"""
# 4.定义生成签名的函数 get_sign(sign_str)
# 5.生成请求随机串
random_str = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(32))
# 6.生成请求时间戳
time_stamps = str(int(time.time()))
# 7.生成签名串
sign_str = f"{method}\n{'/v3/pay/transactions/jsapi'}\n{time_stamps}\n{random_str}\n{data}\n"
# 8.生成签名
sign = get_sign(sign_str)
# 9.生成HTTP请求头
headers = {
'Content-Type': 'application/json',
'Authorization': 'WECHATPAY2-SHA256-RSA2048 '
+ f'mchid="{mchid}",nonce_str="{random_str}",signature="{sign}",timestamp="{time_stamps}",serial_no="{serial_num}"'
}
return headers, random_str, time_stamps
|