晨曦's Blog

This is a window to the soul

安装

全局安装 flake8

1
/usr/local/bin/python3 -m pip install -U flake8

全局安装 black

1
/usr/local/bin/python3 -m pip install -U black

配置

1
2
3
4
5
6
"python.languageServer": "Pylance",
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"python.linting.pylintEnabled": false, // 关闭pylint工具
"python.formatting.blackPath": "/usr/local/bin/black",
"python.linting.flake8Path": "/usr/local/bin/flake8",

flake8black 可选参数:

1
2
3
4
5
6
7
"python.linting.flake8Args": [
"--max-line-length=248", // 设置单行最长字符限制
"--ignore=W191, E266, W504"
],
"python.formatting.blackArgs": [
"--line-length=128" // 格式化时单行最长长度
],

排序 import 语句

1
/usr/local/bin/python3 -m pip install -U isort

配置

1
2
3
4
5
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
},

也可以配置用于全局

1
2
3
"editor.codeActionsOnSave": {
"source.organizeImports": true
},

前记

在一些软件中如 ES,时间格式都是 UTC 时间格式。记一记如何将 UTC 时间格式转换为本地北京时间格式。

原理很简单:将UTC转化为datetime时间格式->将转化的datetime时间加8小时->格式化为想要的格式

实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import datetime

'''
UTC转北京时间
'''

def utc_format(utc_time, utc_time_format='%Y-%m-%dT%H:%M:%S.000Z'):
utc_datetime = datetime.datetime.strptime(utc_time, utc_time_format)
local_datetime = utc_datetime + datetime.timedelta(hours=8)
local_time_format = "%Y-%m-%d %H:%M:%S"
local_time = local_datetime.strftime(local_time_format)
return local_time

print(utc_format(utc_time="2021-10-26T09:34:31.000Z"))

前言

ELK 对于日志管理来说毫无疑问是最好的选择,但有的时候觉得 Logstash 比较的笨重,相反 Filebeat 也不失为一个好的选择。

此次安装的版本为 7.15.0

安装

对于几个组件的安装都是非常的简单,可以直接利用官方打好的包就好

Elasticsearch 安装文档

Kibana 安装文档

Filebeat 安装文档

安装时建议通过 RPM 包安装,这样能固定版本减少一些兼容性问题如:

1
2
3
wget https://artifacts.elastic.co/downloads/kibana/kibana-7.15.0-x86_64.rpm
shasum -a 512 kibana-7.15.0-x86_64.rpm
sudo rpm --install kibana-7.15.0-x86_64.rpm

配置

Elasticsearch

如果只是单机版的话 Elasticsearch 配置倒是不用过多修改,接下来配置一下 Elasticsearch 开启密码访问就可以了

设置密码文档

安全配置参考

1
xpack.security.enabled: true # 普通的安全设置

最后再通过下面命令设置各个用户的密码

1
$ /usr/share/elasticsearch/bin/elasticsearch-setup-passwords auto

上面命令会设置各个内置用户的密码

内置用户文档

Kibana

Kibana 只需要配置如下几个项目即可

1
2
3
4
5
6
server.port: 5601 # 端口
server.host: "0.0.0.0" # 允许远程连接
elasticsearch.hosts: ["http://localhost:9200"] # ES的地址
elasticsearch.username: "kibana_system" # 上面设置的用户和密码
elasticsearch.password: "password"
i18n.locale: "zh-CN" # 页面支持中文

Filebeat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["localhost:9200"]
username: "elastic"
password: "elastic"
index: "filebeat-%{+yyyy.MM.dd}"
indices:
- index: "filebeat-nginx-access-%{+yyyy.MM.dd}"
when.equals:
fields.type: nginx.access
- index: "filebeat-nginx-error-%{+yyyy.MM.dd}"
when.equals:
fields.type: nginx.error

setup.ilm.enabled: false # 索引生命周期
setup.ilm.check_exists: false
setup.template.enabled: true # 索引模版
setup.template.name: "filebeat"
setup.template.pattern: "filebeat-*"

对于 Filebeat 的设置,这里的 username 需要用 elastic 用户。如果用 beats_system 用户的话会提示 403 无权限。

具体可以参考:Security error with beats_system account and Filebeat with system module

Filebeat Module

nginx 为例打开 nginx.accessnginx.error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
vi /etc/filebeat/modules.d/nginx.yml

access:
enabled: true
var.paths: ["/var/log/nginx/access.log"]
input:
fields:
type: nginx.access

error:
enabled: true
var.paths: ["/var/log/nginx/error.log"]
input:
fields:
type: nginx.error

索引

进入 Kibana 管理页面,新建索引
Management->Stack Management->索引模式->创建索引模式

Pipeline

配置到索引时就已经可以在 Kibana 中看到有 nginx 默认的日志进来,但如果我们有自定义的日志格式,就需要用到 Pipeline

Pipeline 所存放的位置 /usr/share/filebeat/module/nginx/access/ingest/pipeline.yml

也可通过开发工具查询 GET _ingest/pipeline/filebeat-7.15.0-nginx-access-pipeline

Grok Debugger

前言

最近遇到一个需求,每 60S 刷新数据库数据到 Redis 缓存中,但应用又不止一个进程。此需求中对原子性并无太大的要求,只是如果每次只有一个进程执行,那么数据库的压力就会小很多。

于是想到用分布式锁来解决这个问题,最终选择了通过 Redis 来实现。

关于 Redis 实现分布式锁可以查看下面的文章:
分布式锁的实现之 redis 篇

Aioredlock

在众多的官方推荐的分布式锁客户端中,我选择了 Aioredlock

Distributed locks with Redis

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
async def aioredlock():
# retry_count 重试次数,这里配置为1,杜绝进程重复去获取
lock_manager = Aioredlock([{'host': 'localhost', 'port': 6379, 'db': 1}],retry_count=1)

while True:

# 每秒钟去检测锁的状态,如果锁被其他进程占用就跳过
# lock_timeout 锁的时间为 65S,相当于进程拿到锁后最大65秒后释放
# 业务逻辑执行 小于 65秒,业务逻辑执行完后当即释放锁
# 每60秒执行一次 sleep 59s即可,因为检测已经sleep了1s

await asyncio.sleep(1)
if await lock_manager.is_locked("_ztsg_auto_generate_code"):
#logger.info('The resource is already acquired')
continue

try:
async with await lock_manager.lock("_ztsg_auto_generate_code", lock_timeout=65) as lock:

assert lock.valid is True
assert await lock_manager.is_locked("_ztsg_auto_generate_code") is True

# 每60s钟运行一次任务
await asyncio.sleep(59)

logger.info('Start Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
# 业务代码
logger.info('Stop Tick! The time is '+str(os.getpid())+': %s' % datetime.now())
except LockAcquiringError:
print('Something happened during normal operation. We just log it.')
except LockError:
print('Something is really wrong and we prefer to raise the exception')
except Exception as e:
print(e)
print('Something else went wrong')

通过上面代码就能实现,需求所要求

前记

在使用 Tortoise Orm 的过程中发现数据库自动插入、更新的时间是 UTC 时区时间,通过官网文档发现可以在连接时对时区进行设置

修改

通过连接配置来修改时区,默认情况下的连接配置

1
2
3
register_tortoise(
app, db_url=db_url, modules={"models": ["app.models"]}, generate_schemas=False
)

如果需要修改配置,则不能用 db_url 模式连接需改为 config 模式连接,配置如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
register_tortoise(
app,
config={
'connections': {
'default': db_url
},
'apps': {
'models': {
"models": ["app.models"],
'default_connection': 'default',
}
},
"use_tz": False,
"timezone": "Asia/Shanghai",
},
generate_schemas=False
)

通过以上配置就能将数据库时区设置为上海时区

前言

当原子系统很多 (非微服务),而且各个原子系统之间需要相互调用,这时就需要保证两个系统之间的认证、以及数据加密。

这个时候就需要用到对称加解密了

选择

对于如何选择合适的加密算法,可以参考一下下面这篇文章

如何选择 AES 加密模式(CBC ECB CTR OCB CFB)?

最后我选择了两种方式分别来测试和实现 CBCOCB

另外 pycrypto 已经不再安全,建议使用 pycryptodome,它是 pycrypto 的分支,在安全性方面有较大提升。

1
pip install pycryptodome

CBC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes

class AesCbc:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_CBC

def encrypt(self, data):
cipher = AES.new(self.key, self.mode)
ct_bytes = cipher.encrypt(pad(data, self.bs))
iv = b64encode(cipher.iv).decode('utf-8')
ct = b64encode(ct_bytes).decode('utf-8')
return json.dumps({'iv': iv, 'ciphertext': ct})

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
iv = b64decode(b64['iv'])
ct = b64decode(b64['ciphertext'])
cipher = AES.new(self.key, self.mode, iv)
plaintext = unpad(cipher.decrypt(ct), self.bs)
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
print("Incorrect decryption ", err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes

aes_cipher = AesCbc(key)

encrypt_reuslt = aes_cipher.encrypt(data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"iv": "K8xL41sI3UoXaeWohUuZEA==", "ciphertext": "fLGcOq43vTZc9x3HX8Q9Nv82cwVT6WNTj5mcpuPEckw="}
原文: 需要加密的数据

OCB

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import json
from base64 import b64encode, b64decode
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes

class AesOcb:
def __init__(self, key):
self.bs = AES.block_size
self.key = key
self.mode = AES.MODE_OCB
self.json_k = ['nonce', 'header', 'ciphertext', 'tag']

def encrypt(self, header, data):
header = header
cipher = AES.new(self.key, self.mode)
cipher.update(header)
ciphertext, tag = cipher.encrypt_and_digest(data)
json_v = [b64encode(x).decode('utf-8') for x in [cipher.nonce, header, ciphertext, tag]]
return json.dumps(dict(zip(self.json_k, json_v)))

def decrypt(self, json_input):
try:
b64 = json.loads(json_input)
jv = {k: b64decode(b64[k]) for k in self.json_k}
cipher = AES.new(self.key, self.mode, nonce=jv['nonce'])
cipher.update(jv['header'])
plaintext = cipher.decrypt_and_verify(jv['ciphertext'], jv['tag'])
return plaintext.decode('utf-8')
except (ValueError, KeyError) as err:
# 解密错误
print(err)
return None

if __name__ == "__main__":
data = "需要加密的数据".encode('utf-8')
key = get_random_bytes(16) # Bytes
header = b'header'

aes_cipher = AesOcb(key)

encrypt_reuslt = aes_cipher.encrypt(header, data)
print("密文: ", encrypt_reuslt)

plaintext = aes_cipher.decrypt(encrypt_reuslt)
print("原文: ", plaintext)

结果:

1
2
密文:  {"nonce": "9Wd6sA1QGSdjXHu1zACA", "header": "aGVhZGVy", "ciphertext": "tyaCFrLuriy6F3xJqs0CehNWe3g7", "tag": "IqDrP9zX00aZMRe7DuCRzQ=="}
原文: 需要加密的数据

前言

API 为什么需要设计签名验证?为了有一定的数据抓取防御能力。
需要考虑的点:

请求参数是否已被篡改
请求来源是否合法
请求是否具有唯一性

原理

参数签名方式:它要求客户端按照约定好的算法生成签名字符串,作为请求的一部分像接口请求,服务端验算签名即可知是否合法。

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import requests
import hashlib
import time

key = "xxx" # 密钥,由服务端颁发

client = "local" # 客户端
server = "kafka" # 服务端
env = "test" # 环境
timestamp = str(time.time())[:10] # 时间
sign_str = client+server+env+timestamp+key
sign = hashlib.md5(sign_str.encode(encoding='utf-8')).hexdigest() # md5

headers = dict(client=client, server=server, env=env, timestamp=timestamp, sign=sign)

r1 = requests.get("https://www.xxx.com/hello", headers=headers)

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from sanic import Sanic
from sanic.response import text, json
import hashlib

app = Sanic("My Hello, world app")

@app.get("/hello")
async def hello_world(request):

key = "xxx" # 密钥

client = request.headers.get("client")
server = request.headers.get("server")
env = request.headers.get("env")
timestamp = request.headers.get("timestamp")
sign = request.headers.get("sign")

_sign_str = client+server+env+timestamp+key
_sign = hashlib.md5(_sign_str.encode(encoding='utf-8')).hexdigest()

if sign==_sign:
return json(list(request.headers.items()))
else:
return text("验证失败")
app.run(host='0.0.0.0',debug=True,auto_reload=True)

前言

目前 Docker Hub 上普通用户已经不能自动构建了,于是想通过 Github Actions 来实现打包并推送到 Docker Hub

实现

第一步

第一步先在项目下建一个 yml 文件,路径:.github/workflows/push2hub.yml

内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
name: Publish Docker image
on:
push:
branches:
- master
jobs:
push_to_registry:
name: Push Docker image to Docker Hub
runs-on: ubuntu-latest
steps:
- name: Check out the repo
uses: actions/checkout@v2
- name: Log in to Docker Hub
uses: docker/login-action@v1
with:
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- name: Push to Docker Hub
uses: docker/build-push-action@v2
with:
push: true
tags: jakehu/scripts:latest

上面 Yaml 的功能是,在每一次推送到 Master 分支上的时候,就将项目推送到 Docker Hub 上的 jakehu 用户下的 scripts 仓库里的 latest Tag

第二步

第二步需要在 github 上设置 DOCKER_USERNAMEDOCKER_PASSWORD 两个变量
UMJEbb

第三步

第三步只需要对 Master 分支进行推送即可,然后我们就能在 Actions 里面看到对应的流水线信息
kCVfE8


Github Actions 文档

前言

CentOS 8 今年开始已经停止维护了,难道就没有可以免费使用的 RHEL 了?

答案是否定的

Rocky Linux

Rocky Linux 是由 CentOS 项目的创始人 Gregory Kurtzer 领导构建的,他与原来的 Centos 一样位于 RHEL 下游。

如图所示:

微软、谷歌、亚马逊等公有云都已经接入,希望国内的公有云也能尽快的接入

Rocky Linux 官网

前记

在使用 Sanic 的过程中对 ORM 的选择可谓是痛苦的,用过官方推荐的 Tortoise ORM,也用过 SQLAlchemy

比起使用 ORM 我更喜欢原生 SQL,当我看见 databases 的时候我发现它满足了的我所有要求,支持异步驱动 aiomysql,支持原生 SQL 写法,还封装进了 SQLAlchemy,只要你想你也可以把 databases 当作 SQLAlchemy 使用

安装

安装 databases

1
pip install databases

安装数据库驱动

1
pip install aiomysql

在安装 databases 的时候会自动的安装 SQLAlchemy 目前已经支持 1.4 版本

配置

利用监听器去控制数据库的连接和断开,并将句柄放入到应用上下文中 app.ctx.db

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
./server.py

from sanic import Sanic
from databases import Database

app = Sanic('jakehu')

# 数据库
database = Database('mysql://localhost/example')
# 服务开启之前
@app.listener("before_server_start")
async def startup_db(app, loop):
await database.connect()
# 服务结束之后
@app.listener("after_server_stop")
async def shutdown_db(app, loop):
await database.disconnect()
app.ctx.db = database

@app.get("/")
async def foo_handler(request):
return text("Hello World!")

app.run(host='0.0.0.0', debug=True, auto_reload=True)

使用

最后看如何在函数中使用,利用应用上下文中的句柄进行操作,最后在利用_mapping 属性进行转换

1
2
3
4
5
6
7
@app.get("/")
async def foo_handler(request):
database= request.app.ctx.db
query = "SELECT * FROM table"
rows = await database.fetch_all(query=query)
result = [dict(row._mapping) for row in rows]
return json(result)

关于如何将 sqlalchemy.engine.row.Row 转化为 dict 可以参考

示例