使用方法
Pywss 基于 ctx.next
实现了 中间件 能力。
在 pywss 应用中,我们可以通过以下两种方式来注册中间件:
- app use 全局注册,针对全部路由生效
- route bind 局部注册,针对指定路由生效
分级路由场景
这对于分级路由场景来说,同样适用。
全局注册
import pywss
import time
def logMiddleware(ctx: pywss.Context):
entryTime = time.time()
ctx.next()
cost = time.time() - entryTime
print(f"{ctx.method} - {ctx.route} - cost: {cost: .2f}")
app = pywss.App()
app.use(logMiddleware)
app.get("/hi", lambda ctx: ctx.write("Hi~"))
app.get("/hello", lambda ctx: ctx.write("HelloWorld"))
app.run()
局部注册
import pywss
import time
def logMiddleware(ctx: pywss.Context):
entryTime = time.time()
ctx.next()
cost = time.time() - entryTime
print(f"{ctx.method} - {ctx.route} - cost: {cost: .2f}")
app = pywss.App()
app.get("/hi", lambda ctx: ctx.write("Hi~")) # 未注册日志中间件
app.get("/hello", logMiddleware, lambda ctx: ctx.write("HelloWorld"))
app.run()
中间件开发
前提说明
本节源代码取自 v0.1.18 版本,最新代码请参考 Github-Pywss
本节举例 cors 和 jwt 中间件
服务端
import pywss
def main():
app = pywss.App()
app.use(
pywss.NewCORSHandler(),
pywss.NewJWTHandler(),
)
app.run()
if __name__ == '__main__':
main()
CORS
https://github.com/czasg/pywss/blob/master/pywss/handler/cors.py
import pywss
from pywss.constant import *
def NewCORSHandler(
allow_origins: tuple = ("*",),
allow_methods: tuple = ("*",),
allow_headers: tuple = ("*",),
allow_credentials: bool = True,
):
allowOrigins = ",".join(allow_origins)
allowMethods = ",".join(allow_methods)
allowHeaders = ",".join(allow_headers)
allowCredentials = "true" if allow_credentials else "false"
def corsHandler(ctx: pywss.Context):
ctx.set_header(HeaderAccessControlAllowOrigin, allowOrigins)
ctx.set_header(HeaderAccessControlAllowMethods, allowMethods)
ctx.set_header(HeaderAccessControlAllowHeaders, allowHeaders)
ctx.set_header(HeaderAccessControlAllowCredentials, allowCredentials)
if ctx.method == MethodOptions:
return
ctx.next()
return corsHandler
JWT
https://github.com/czasg/pywss/blob/master/pywss/handler/jwt.py
import json
import time
import pywss
import base64
import hashlib
from pywss.constant import *
from typing import Union
def NewJWTHandler(
secret: str = "pywss",
expire: int = 3600,
ignore_route: Union[list, tuple] = (),
ignore_startswith: tuple = (),
ignore_endswith: tuple = (),
ignore_method_route: Union[list, tuple] = (),
):
jwt = JWT(secret, expire)
def jwtHandler(ctx: pywss.Context):
ctx.data.jwt = jwt
if ignore_route and ctx.route in ignore_route:
ctx.next()
return
if ignore_startswith and ctx.route.startswith(ignore_startswith):
ctx.next()
return
if ignore_endswith and ctx.route.endswith(ignore_endswith):
ctx.next()
return
for m, r in ignore_method_route:
if m == ctx.method and ctx.route in r:
ctx.next()
return
try:
token = ctx.headers.get(HeaderAuthorization)
ctx.data.jwt_payload = jwt.decrypt(token)
except:
ctx.set_status_code(StatusForbidden)
return
ctx.next()
return jwtHandler
class JWT:
def __init__(self, secret: str, expire: int = 3600):
self.secret = secret.encode()
self.expire = expire
self.header = base64.b64encode(json.dumps({
"alg": "HS256",
"typ": "JWT",
}, ensure_ascii=False).encode())
def encrypt(self, **kwargs):
kwargs.update(exp=int(time.time()) + self.expire)
payload = base64.b64encode(json.dumps(kwargs, ensure_ascii=False).encode())
sha256 = hashlib.sha256()
sha256.update(self.header)
sha256.update(payload)
sha256.update(self.secret)
signature = sha256.hexdigest()
return f"{self.header.decode()}.{payload.decode()}.{signature}"
def decrypt(self, token: str) -> dict:
if token.startswith("Bearer"):
token = token.replace("Bearer", "", 1).strip()
tokens = token.split(".")
if len(tokens) != 3:
raise Exception("Invalid JWT Token")
header, payload, signature = tokens # type: str
sha256 = hashlib.sha256()
sha256.update(header.encode())
sha256.update(payload.encode())
sha256.update(self.secret)
if sha256.hexdigest() != signature:
raise Exception("Invalid JWT Token, Signature Except")
jwt_payload = json.loads(base64.b64decode(payload))
if jwt_payload.get("exp", 0) < int(time.time()):
raise Exception("JWT Token Expire")
return jwt_payload