WSGI web框架的实现
2020-12-18 23:33
标签:pass 移除 ascii div orm sgi escape art pat WSGI web框架的实现 标签:pass 移除 ascii div orm sgi escape art pat 原文地址:https://www.cnblogs.com/dissipate/p/14131057.htmlfrom wsgiref.simple_server import make_server
from webob import Response, Request, exc, dec
import re, traceback, logging
class Dict2Obj(object):
def __init__(self, d: dict):
# self.__dict__={‘_dict‘:d} # 覆盖实例创建时的字典,依然会调用__setattr__
if not isinstance(d, (dict,)):
self.__dict__[‘_dict‘] = {}
else:
self.__dict__[‘_dict‘] = d
def __getattr__(self, item):
try:
return self._dict[item]
except KeyError:
raise AttributeError(‘Attribute {} not found!‘.format(item))
def __setattr__(self, key, value):
raise NotImplementedError
class Context(dict):
def __getattr__(self, item):
try:
return self[item]
except KeyError:
raise AttributeError(‘Attribute {} not found!‘.format(item))
def __setattr__(self, key, value):
self[key] = value
class NestedContext(Context):
# def __init__(self,globalcontext:Context=None):
# super().__init__()
# self.relate(globalcontext)
def relate(self, globalcontext: Context = None):
self.globalcontext = globalcontext
def __getattr__(self, item):
if item in self.keys():
return self[item]
return getattr(self.globalcontext, item)
class Router:
mold = re.compile(‘/({[^{}:]+:?[^{}:]*})‘)
TYPEPATTERNS = {
‘str‘: r‘[^/]+‘, # exclude / in url
‘word‘: r‘\w+‘,
‘int‘: r‘[+-]?\d+\b‘, # illegal escape sequence \ will stay the same
‘float‘: r‘[+-]?\d+\.\d+\b‘,
‘any‘: r‘.+‘
}
TYPECAST = {
‘str‘: str,
‘word‘: str,
‘int‘: int,
‘float‘: float,
‘any‘: str
}
def transform(self, tmp: str):
designation, _, genre = tmp.strip(‘/{}‘).partition(‘:‘)
return ‘/(?P{})‘.format(designation, self.TYPEPATTERNS.get(genre, r‘\w+‘)), designation, self.TYPECAST.get(
genre, str)
def parse(self, src: str):
start = 0
result = ‘‘
translator = {}
while True:
matcher = self.mold.search(src, start)
if matcher:
result += matcher.string[start:matcher.start(0)]
tmp = self.transform(matcher.string[matcher.start(0):matcher.end(0)]) # 匹配整体调用transform
result += tmp[0]
translator.setdefault(tmp[1], tmp[2])
start = matcher.end(0)
print(‘result =‘, result)
print(‘tmp =‘, tmp)
print(‘matcher.pos =‘, matcher.pos)
print(‘matcher.endpos =‘, matcher.endpos)
else:
break
if result:
return result, translator
else:
return src, translator
def __init__(self, prefix: str = ‘‘):
self._prefix = prefix.rstrip(‘/\\‘) # such as /product
self._routetable = [] # triple tuple
# interceptors
self.pre_interceptors = []
self.post_interceptors = []
# context
self.ctx = NestedContext({‘Router‘: ‘I\‘m router NestedContext‘}) # 未关联全局,注册时注入
# self.ctx.router=self # 在向Application类注册时指定了
def register_pre_interceptor(self, fn):
self.pre_interceptors.append(fn)
return fn
def register_post_interceptor(self, fn):
self.post_interceptors.append(fn)
return fn
def route(self, rule, *methods):
def wrapper(handler):
pattern, translator = self.parse(rule)
print(‘parse pattern translator‘, pattern, translator)
for r_methods, r_pattern, r_translator, r_handler in self._routetable:
print(r_methods, r_pattern, r_translator, r_handler)
if handler == r_handler and re.compile(pattern) == r_pattern:
translator.update(r_translator)
self._routetable.append((r_methods + methods, r_pattern, translator, r_handler))
self._routetable.remove((r_methods, r_pattern, r_translator, r_handler)) # 移除原item
break
else:
print(‘else‘)
self._routetable.append((methods, re.compile(pattern), translator, handler))
print(self._routetable)
return handler
return wrapper
@property
def prefix(self):
return self._prefix
def get(self, pattern):
return self.route(pattern, ‘GET‘)
def post(self, pattern):
return self.route(pattern, ‘POST‘)
def head(self, pattern):
return self.route(pattern, ‘HEAD‘)
def match(self, request: Request) -> Response:
print(‘request.method = {}, request.path = {}, self.prefix = {!r}‘.format(request.method, request.path,
self.prefix))
# 判断prefix
if self._prefix == ‘‘:
if request.path == ‘/‘:
pass
else:
return None
else:
if request.path.startswith(self._prefix): # /uranus and /uranuss is not the same prefix
if request.path.startswith(self._prefix + ‘/‘) or request.path == self.prefix:
# /uranus & /uranus/xxx situation
pass
else: # /uranuss situation
return None
else:
return None
# 依次执行拦截请求
for fn in self.pre_interceptors:
logging.critical(‘in router interceptor‘)
request = fn(self.ctx, request)
for methods, pattern, translator, handler in self._routetable:
print(‘prefix {} is right, methods = {}, pattern = {}, translator = {} handler = {}‘.format(self._prefix,
methods,
pattern.pattern,
translator,
handler))
matcher = pattern.match(request.path.replace(self._prefix, ‘‘, 1))
# request.path.replace(self.prefix,‘‘,1) == ‘‘ is /uranus circumstance
if matcher or request.path.replace(self.prefix, ‘‘, 1) == ‘‘:
print(‘request.path = {} matched {}‘.format(request.path, pattern, pattern))
if not methods or request.method.upper() in methods:
if request.path != self.prefix: # /uranus circumstance matcher is None
request.args = matcher.groups()
_dict = {}
logging.error(matcher.groupdict())
for k, v in matcher.groupdict().items(): # 对/{name:str}/{id:int}‘中name&id匹配的内容进行typecast
_dict.setdefault(k, translator.get(k)(v))
request.kwargs = Dict2Obj(matcher.groupdict())
request.vars = Dict2Obj(_dict)
logging.error(request.args)
logging.warning(request.kwargs.__dict__)
else:
request.args = ()
request.kwargs = Dict2Obj({})
request.vars = Dict2Obj({})
logging.error(‘matcher is None,set default value in case of error‘)
response = handler(self.ctx, request)
# 执行response拦截请求
for fn in self.post_interceptors:
response = fn(self.ctx, request, response)
return response
else:
print(‘{} is illegal‘.format(request.method))
raise exc.HTTPBadRequest(‘request {} is illegal‘.format(request.method))
else:
print(‘request.path {} mismatched {}‘.format(request.path.replace(self.prefix, ‘‘, 1), pattern.pattern))
continue
else:
raise exc.HTTPNotFound(‘prefix {} is right, request.path {} not found‘.format(self._prefix, request.path))
class Application:
ctx = Context({‘application‘: ‘I\‘m application Context‘}) # 全局上下文对象
@classmethod
def extend(cls,name,ext):
cls.ctx[name]=ext
def __init__(self, **kwargs):
# 创建上下文对象,共享信息
self.ctx.app = self
for k, v in kwargs.items():
self.ctx[k] = v
# interceptors
PRE_INTERCEPTORS = []
POST_INTERCEPTORS = []
# interceptors 注册函数
@classmethod
def register_pre_interceptors(cls, fn):
cls.PRE_INTERCEPTORS.append(fn)
return fn
@classmethod
def register_post_interceptors(cls, fn):
cls.POST_INTERCEPTORS.append(fn)
return fn
ROUTERS = [] # 注册的Router对象
@classmethod
def register(cls, router: Router): # 为Router实例注入全局上下文
router.ctx.relate(cls.ctx)
router.ctx.router = router # 为router的NestedContext对象之属性设置属性指向self
cls.ROUTERS.append(router)
@dec.wsgify
def __call__(self, request: Request) -> Response:
# 全局拦截
for fn in self.PRE_INTERCEPTORS:
logging.critical(‘in Application interceptors‘)
request = fn(self.ctx, request)
for router in self.ROUTERS:
response = router.match(request)
print(‘class Application response =‘, response)
# 全局拦截response
for fn in self.POST_INTERCEPTORS:
response = fn(self.ctx, request, response)
if response:
return response
else:
raise exc.HTTPNotFound(‘prefix not found, request.path = {}‘.format(request.path))
idx = Router() # prefix = ‘‘
uranus = Router(‘/uranus‘) # prefix = /uranus
Application.register(idx)
Application.register(uranus)
@idx.get(‘^/$‘)
@idx.post(‘^/$‘)
def index(ctx: NestedContext, request: Request) -> Response: # response=handler(self.ctx,request) add ctx parameter
response = Response()
response.status_code = 303
response.content_type = ‘application/json‘
response.charset = ‘utf8‘
response.body = ‘request.path = {} \n{} \n{} \n{} \nctx = {}‘.format(request.path, request.args,
request.kwargs.__dict__,
request.vars.__dict__, ctx).encode()
return response
# @uranus.route(‘/(?P
上一篇:文件上传