晨曦's Blog

This is a window to the soul

前言

最近遇到一个需求,每 60S 刷新数据库数据到 Redis 缓存中,但应用又不止一个进程。此需求中对原子性并无太大的要求,只是如果每次只有一个进程执行,那么数据库的压力就会小很多。

于是想到用分布式锁来解决这个问题,最终选择了通过 Redis 来实现。

关于 Redis 实现分布式锁可以查看下面的文章:
分布式锁的实现之 redis 篇

Aioredlock

在众多的官方推荐的分布式锁客户端中,我选择了 Aioredlock

Distributed locks with Redis

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def aioredlock():
# retry_count 重试次数,这里配置为1,杜绝进程重复去获取
lock_manager = Aioredlock([{'host': 'localhost', 'port': 6379, 'db': 1}],retry_count=1)

while True:

# 每秒钟去检测锁的状态,如果锁被其他进程占用就跳过
# lock_timeout 锁的时间为 65S,相当于进程拿到锁后最大65秒后释放
# 业务逻辑执行 小于 65秒,业务逻辑执行完后当即释放锁
# 每60秒执行一次 sleep 59s即可,因为检测已经sleep了1s

await asyncio.sleep(1)
if await lock_manager.is_locked("_ztsg_auto_generate_code"):
#logger.info('The resource is already acquired')
continue

try:
async with await lock_manager.lock("_ztsg_auto_generate_code", lock_timeout=65) as lock:

assert lock.valid is True
assert await lock_manager.is_locked("_ztsg_auto_generate_code") is True

# 每60s钟运行一次任务
await asyncio.sleep(59)

logger.info('Start Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
# 业务代码
logger.info('Stop Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
except LockAcquiringError:
print('Something happened during normal operation. We just log it.')
except LockError:
print('Something is really wrong and we prefer to raise the exception')
except Exception as e:
print(e)
print('Something else went wrong')

通过上面代码就能实现,需求所要求

前记

在使用 Tortoise Orm 的过程中发现数据库自动插入、更新的时间是 UTC 时区时间,通过官网文档发现可以在连接时对时区进行设置

修改

通过连接配置来修改时区,默认情况下的连接配置

1
2
3
register_tortoise(
app, db_url=db_url, modules={"models": ["app.models"]}, generate_schemas=False
)

如果需要修改配置,则不能用 db_url 模式连接需改为 config 模式连接,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
register_tortoise(
app,
config={
'connections': {
'default': db_url
},
'apps': {
'models': {
"models": ["app.models"],
'default_connection': 'default',
}
},
"use_tz": False,
"timezone": "Asia/Shanghai",
},
generate_schemas=False
)

通过以上配置就能将数据库时区设置为上海时区

前言

当原子系统很多 (非微服务),而且各个原子系统之间需要相互调用,这时就需要保证两个系统之间的认证、以及数据加密。

这个时候就需要用到对称加解密了

选择

对于如何选择合适的加密算法,可以参考一下下面这篇文章

如何选择 AES 加密模式(CBC ECB CTR OCB CFB)?

最后我选择了两种方式分别来测试和实现 CBCOCB

另外 pycrypto 已经不再安全,建议使用 pycryptodome,它是 pycrypto 的分支,在安全性方面有较大提升。

1
pip install pycryptodome

CBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes

class AesCbc:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_CBC

def encrypt(self, data):
cipher = AES.new(self.key, self.mode)
ct_bytes = cipher.encrypt(pad(data, self.bs))
iv = b64encode(cipher.iv).decode('utf-8')
ct = b64encode(ct_bytes).decode('utf-8')
return json.dumps({'iv': iv, 'ciphertext': ct})

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
iv = b64decode(b64['iv'])
ct = b64decode(b64['ciphertext'])
cipher = AES.new(self.key, self.mode, iv)
plaintext = unpad(cipher.decrypt(ct), self.bs)
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
print("Incorrect decryption ", err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes

aes_cipher = AesCbc(key)

encrypt_reuslt = aes_cipher.encrypt(data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"iv": "K8xL41sI3UoXaeWohUuZEA==", "ciphertext": "fLGcOq43vTZc9x3HX8Q9Nv82cwVT6WNTj5mcpuPEckw="}
原文: 需要加密的数据

OCB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

class AesOcb:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_OCB
self.json_k = ['nonce', 'header', 'ciphertext', 'tag']

def encrypt(self, header, data):
header = header
cipher = AES.new(self.key, self.mode)
cipher.update(header)
ciphertext, tag = cipher.encrypt_and_digest(data)
json_v = [b64encode(x).decode('utf-8') for x in [cipher.nonce, header, ciphertext, tag]]
return json.dumps(dict(zip(self.json_k, json_v)))

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
jv = {k: b64decode(b64[k]) for k in self.json_k}
cipher = AES.new(self.key, self.mode, nonce=jv['nonce'])
cipher.update(jv['header'])
plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
# 解密错误
print(err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes
header = b'header'

aes_cipher = AesOcb(key)

encrypt_reuslt = aes_cipher.encrypt(header, data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"nonce": "9Wd6sA1QGSdjXHu1zACA", "header": "aGVhZGVy", "ciphertext": "tyaCFrLuriy6F3xJqs0CehNWe3g7", "tag": "IqDrP9zX00aZMRe7DuCRzQ=="}
原文: 需要加密的数据

前言

API 为什么需要设计签名验证?为了有一定的数据抓取防御能力。
需要考虑的点:

请求参数是否已被篡改
请求来源是否合法
请求是否具有唯一性

原理

参数签名方式:它要求客户端按照约定好的算法生成签名字符串,作为请求的一部分像接口请求,服务端验算签名即可知是否合法。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests
import hashlib
import time

key = "xxx" # 密钥,由服务端颁发

client = "local" # 客户端
server = "kafka" # 服务端
env = "test" # 环境
timestamp = str(time.time())[:10] # 时间
sign_str = client+server+env+timestamp+key
sign = hashlib.md5(sign_str.encode(encoding='utf-8')).hexdigest() # md5

headers = dict(client=client, server=server, env=env, timestamp=timestamp, sign=sign)

r1 = requests.get("https://www.xxx.com/hello", headers=headers)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sanic import Sanic
from sanic.response import text, json
import hashlib

app = Sanic("My Hello, world app")

@app.get("/hello")
async def hello_world(request):

key = "xxx" # 密钥

client = request.headers.get("client")
server = request.headers.get("server")
env = request.headers.get("env")
timestamp = request.headers.get("timestamp")
sign = request.headers.get("sign")

_sign_str = client+server+env+timestamp+key
_sign = hashlib.md5(_sign_str.encode(encoding='utf-8')).hexdigest()

if sign==_sign:
return json(list(request.headers.items()))
else:
return text("验证失败")
app.run(host='0.0.0.0',debug=True,auto_reload=True)

前言

目前 Docker Hub 上普通用户已经不能自动构建了,于是想通过 Github Actions 来实现打包并推送到 Docker Hub

实现

第一步

第一步先在项目下建一个 yml 文件,路径:.github/workflows/push2hub.yml

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
name: Publish Docker image
on:
push:
branches:
- master
jobs:
push_to_registry:
name: Push Docker image to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v2
- name: Log in to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Push to Docker Hub
uses: docker/build-push-action@v2
with:
push: true
tags: jakehu/scripts:latest

上面 Yaml 的功能是,在每一次推送到 Master 分支上的时候,就将项目推送到 Docker Hub 上的 jakehu 用户下的 scripts 仓库里的 latest Tag

第二步

第二步需要在 github 上设置 DOCKER_USERNAMEDOCKER_PASSWORD 两个变量
UMJEbb

第三步

第三步只需要对 Master 分支进行推送即可,然后我们就能在 Actions 里面看到对应的流水线信息
kCVfE8


Github Actions 文档

前言

CentOS 8 今年开始已经停止维护了,难道就没有可以免费使用的 RHEL 了?

答案是否定的

Rocky Linux

Rocky Linux 是由 CentOS 项目的创始人 Gregory Kurtzer 领导构建的,他与原来的 Centos 一样位于 RHEL 下游。

如图所示:

微软、谷歌、亚马逊等公有云都已经接入,希望国内的公有云也能尽快的接入

Rocky Linux 官网

前记

在使用 Sanic 的过程中对 ORM 的选择可谓是痛苦的,用过官方推荐的 Tortoise ORM,也用过 SQLAlchemy

比起使用 ORM 我更喜欢原生 SQL,当我看见 databases 的时候我发现它满足了的我所有要求,支持异步驱动 aiomysql,支持原生 SQL 写法,还封装进了 SQLAlchemy,只要你想你也可以把 databases 当作 SQLAlchemy 使用

安装

安装 databases

1
pip install databases

安装数据库驱动

1
pip install aiomysql

在安装 databases 的时候会自动的安装 SQLAlchemy 目前已经支持 1.4 版本

配置

利用监听器去控制数据库的连接和断开,并将句柄放入到应用上下文中 app.ctx.db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
./server.py

from sanic import Sanic
from databases import Database

app = Sanic('jakehu')

# 数据库
database = Database('mysql://localhost/example')
# 服务开启之前
@app.listener("before_server_start")
async def startup_db(app, loop):
await database.connect()
# 服务结束之后
@app.listener("after_server_stop")
async def shutdown_db(app, loop):
await database.disconnect()
app.ctx.db = database

@app.get("/")
async def foo_handler(request):
return text("Hello World!")

app.run(host='0.0.0.0', debug=True, auto_reload=True)

使用

最后看如何在函数中使用,利用应用上下文中的句柄进行操作,最后在利用_mapping 属性进行转换

1
2
3
4
5
6
7
@app.get("/")
async def foo_handler(request):
database= request.app.ctx.db
query = "SELECT * FROM table"
rows = await database.fetch_all(query=query)
result = [dict(row._mapping) for row in rows]
return json(result)

关于如何将 sqlalchemy.engine.row.Row 转化为 dict 可以参考

示例

前言

希望能像 npm install 那样自动的将安装的包加入到 requirements.txt 文件中,但是同时又不希望把子依赖加入其中

虽然我们能通过 pip freeze > requirements.txt 将依赖导出,但是这样导出的依赖,会把包的其他子依赖也导出,导致重新安装的时候总是提示包的版本不对

解决

通过 bashalias 或者函数来解决,在.zshrc 中添加以下函数

1
2
3
function pip-install {
pip install $1 && pip freeze | grep -w "${1}=" >> requirements.txt
}

使用

1
pip-install sanic

效果

1
2
cat requirements.txt
sanic==21.6.2

从上面文件可以看出 requirements.txt 中并没有 sanic 的其他子依赖,至于 sanic 的其他子依赖会在安装 sanic 时自动安装就不用管它了

最后

最后我们在其他地方使用项目的时候只需要安装 requirements.txt 中的包就行了

1
pip install -r requirements.txt

前言

Mac 系统中,如果利用 Homebrew 安装 LuaRocks,默认只会安装最新版本的 Lua。鉴于 lapisluajit 都只兼容 lua@5.1 版本,所以就需要自行安装 lua@5.1

兼容

第一步:利用 Homebrew 安装 luarocks

1
brew install luarocks

第二步:利用 Homebrew 安装 lua@5.1

1
brew install lua@5.1

第三步:查看 lua@5.1 的安装目录

1
2
3
brew info lua@5.1

/usr/local/Cellar/lua@5.1/5.1.5_8

第四步:利用参数 --lua-dir 以及 --lua-version 使用 5.1 版本,两个参数可以同时设置,也可以只设置一个

1
2
3
luarocks --lua-dir=/usr/local/Cellar/lua@5.1/5.1.5_8 --lua-version=5.1 install lapis

luarocks --lua-version=5.1 install lapis

通过上面设置就能兼容不同版本的 Lua

错误

安装 luaossl 时出现以下错误:

1
2
3
4
5
6
7
8
Installing https://luarocks.org/luaossl-20200709-0.src.rock

Error: Failed installing dependency: https://luarocks.org/luaossl-20200709-0.src.rock - Could not find header file for CRYPTO
No file openssl/crypto.h in /usr/local/include
No file openssl/crypto.h in /usr/include
No file openssl/crypto.h in /include
You may have to install CRYPTO in your system and/or pass CRYPTO_DIR or CRYPTO_INCDIR to the luarocks command.
Example: luarocks install luaossl CRYPTO_DIR=/usr/local

解决如下:
设置 OPENSSL_DIR 以及 CRYPTO_DIR

1
luarocks --lua-version=5.1 OPENSSL_DIR=/usr/local/Cellar/openssl@1.1/1.1.1k/ CRYPTO_DIR=/usr/local/Cellar/openssl@1.1/1.1.1k/ install lapis

最后

配置 LUA_PATHLUA_CPATH 以及 PATH,在终端中输入

1
luarocks --lua-version=5.1 path --bin

取得 LUA_PATHLUA_CPATH 写入到 ~/.zshrc

1
2
export LUA_PATH=''
export LUA_CPATH=''

最后再将.luarocks/bin 导入 PATH

1
export PATH="$HOME/.luarocks/bin:$PATH"

如果不做上面操作就会出现下面错误

1
lua entry thread aborted: runtime error: content_by_lua(nginx.conf.compiled:22):2: module 'lapis' not found:

Python 框架 Sanic 实现文件上传功能

实现

判断允许上传的类型,同时利用 UUID 生成新的文件名存储到对应的文件夹中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@app.route("/upload", methods=['POST'])
async def upload(request):
allow_type = ['.jpg', '.png', '.gif'] # 允许上传的类型
file = request.files.get('file')
type = os.path.splitext(file.name)

if type[1] not in allow_type:
return json({"code": -1, "msg": "只允许上传.jpg.png.gif类型文件"})

name = str(uuid4())+type[1]
path = "/user/data/web/upload" # 这里注意path是绝对路径

async with aiofiles.open(path+"/"+name, 'wb') as f:
await f.write(file.body)
f.close()
return json({"code": 0, "msg": "上传成功", "data": {
"name": name,
"url": "/upload/"+name
}})

上传

可以利用 Postman 上传测试,需要注意的是 header 头中的 Content-Type:multipart/form-data; 必须设置

访问

需要访问上传过后的文件,这就需要用到 Sanic 静态文件代理

1
2
path = "/user/data/web/upload" # 这里注意path是绝对路径
app.static("/upload", path)

最后访问路径为:

1
https://www.域名.com/upload/uuid.png
0%