Sanic 实现简单的 API 接口签名认证

前言

API 为什么需要设计签名验证?为了有一定的数据抓取防御能力。
需要考虑的点:

请求参数是否已被篡改
请求来源是否合法
请求是否具有唯一性

原理

参数签名方式:它要求客户端按照约定好的算法生成签名字符串,作为请求的一部分像接口请求,服务端验算签名即可知是否合法。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests
import hashlib
import time

key = "xxx" # 密钥,由服务端颁发

client = "local" # 客户端
server = "kafka" # 服务端
env = "test" # 环境
timestamp = str(time.time())[:10] # 时间
sign_str = client+server+env+timestamp+key
sign = hashlib.md5(sign_str.encode(encoding='utf-8')).hexdigest() # md5

headers = dict(client=client, server=server, env=env, timestamp=timestamp, sign=sign)

r1 = requests.get("https://www.xxx.com/hello", headers=headers)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sanic import Sanic
from sanic.response import text, json
import hashlib

app = Sanic("My Hello, world app")

@app.get("/hello")
async def hello_world(request):

key = "xxx" # 密钥

client = request.headers.get("client")
server = request.headers.get("server")
env = request.headers.get("env")
timestamp = request.headers.get("timestamp")
sign = request.headers.get("sign")

_sign_str = client+server+env+timestamp+key
_sign = hashlib.md5(_sign_str.encode(encoding='utf-8')).hexdigest()

if sign==_sign:
return json(list(request.headers.items()))
else:
return text("验证失败")
app.run(host='0.0.0.0',debug=True,auto_reload=True)