Skip to main content

使用方法

Pywss 基于 ctx.next 实现了 中间件 能力。

在 pywss 应用中,我们可以通过以下两种方式来注册中间件:

  • app use 全局注册,针对全部路由生效
  • route bind 局部注册,针对指定路由生效
分级路由场景

这对于分级路由场景来说,同样适用。

全局注册
import pywss
import time

def logMiddleware(ctx: pywss.Context):
entryTime = time.time()
ctx.next()
cost = time.time() - entryTime
print(f"{ctx.method} - {ctx.route} - cost: {cost: .2f}")

app = pywss.App()
app.use(logMiddleware)
app.get("/hi", lambda ctx: ctx.write("Hi~"))
app.get("/hello", lambda ctx: ctx.write("HelloWorld"))
app.run()
局部注册
import pywss
import time

def logMiddleware(ctx: pywss.Context):
entryTime = time.time()
ctx.next()
cost = time.time() - entryTime
print(f"{ctx.method} - {ctx.route} - cost: {cost: .2f}")

app = pywss.App()
app.get("/hi", lambda ctx: ctx.write("Hi~")) # 未注册日志中间件
app.get("/hello", logMiddleware, lambda ctx: ctx.write("HelloWorld"))
app.run()

中间件开发

前提说明

本节源代码取自 v0.1.18 版本,最新代码请参考 Github-Pywss

本节举例 cors 和 jwt 中间件

服务端
import pywss

def main():
app = pywss.App()
app.use(
pywss.NewCORSHandler(),
pywss.NewJWTHandler(),
)
app.run()

if __name__ == '__main__':
main()

CORS

https://github.com/czasg/pywss/blob/master/pywss/handler/cors.py
import pywss

from pywss.constant import *


def NewCORSHandler(
allow_origins: tuple = ("*",),
allow_methods: tuple = ("*",),
allow_headers: tuple = ("*",),
allow_credentials: bool = True,
):
allowOrigins = ",".join(allow_origins)
allowMethods = ",".join(allow_methods)
allowHeaders = ",".join(allow_headers)
allowCredentials = "true" if allow_credentials else "false"

def corsHandler(ctx: pywss.Context):
ctx.set_header(HeaderAccessControlAllowOrigin, allowOrigins)
ctx.set_header(HeaderAccessControlAllowMethods, allowMethods)
ctx.set_header(HeaderAccessControlAllowHeaders, allowHeaders)
ctx.set_header(HeaderAccessControlAllowCredentials, allowCredentials)
if ctx.method == MethodOptions:
return
ctx.next()

return corsHandler

JWT

https://github.com/czasg/pywss/blob/master/pywss/handler/jwt.py
import json
import time
import pywss
import base64
import hashlib

from pywss.constant import *
from typing import Union


def NewJWTHandler(
secret: str = "pywss",
expire: int = 3600,
ignore_route: Union[list, tuple] = (),
ignore_startswith: tuple = (),
ignore_endswith: tuple = (),
ignore_method_route: Union[list, tuple] = (),
):
jwt = JWT(secret, expire)

def jwtHandler(ctx: pywss.Context):
ctx.data.jwt = jwt
if ignore_route and ctx.route in ignore_route:
ctx.next()
return
if ignore_startswith and ctx.route.startswith(ignore_startswith):
ctx.next()
return
if ignore_endswith and ctx.route.endswith(ignore_endswith):
ctx.next()
return
for m, r in ignore_method_route:
if m == ctx.method and ctx.route in r:
ctx.next()
return
try:
token = ctx.headers.get(HeaderAuthorization)
ctx.data.jwt_payload = jwt.decrypt(token)
except:
ctx.set_status_code(StatusForbidden)
return
ctx.next()

return jwtHandler


class JWT:

def __init__(self, secret: str, expire: int = 3600):
self.secret = secret.encode()
self.expire = expire
self.header = base64.b64encode(json.dumps({
"alg": "HS256",
"typ": "JWT",
}, ensure_ascii=False).encode())

def encrypt(self, **kwargs):
kwargs.update(exp=int(time.time()) + self.expire)
payload = base64.b64encode(json.dumps(kwargs, ensure_ascii=False).encode())

sha256 = hashlib.sha256()
sha256.update(self.header)
sha256.update(payload)
sha256.update(self.secret)
signature = sha256.hexdigest()

return f"{self.header.decode()}.{payload.decode()}.{signature}"

def decrypt(self, token: str) -> dict:
if token.startswith("Bearer"):
token = token.replace("Bearer", "", 1).strip()

tokens = token.split(".")
if len(tokens) != 3:
raise Exception("Invalid JWT Token")

header, payload, signature = tokens # type: str
sha256 = hashlib.sha256()
sha256.update(header.encode())
sha256.update(payload.encode())
sha256.update(self.secret)
if sha256.hexdigest() != signature:
raise Exception("Invalid JWT Token, Signature Except")

jwt_payload = json.loads(base64.b64decode(payload))
if jwt_payload.get("exp", 0) < int(time.time()):
raise Exception("JWT Token Expire")

return jwt_payload