WSGI web框架的实现

2020-12-18 23:33

阅读:468

标签:pass   移除   ascii   div   orm   sgi   escape   art   pat   

  

from wsgiref.simple_server import make_server
from webob import Response, Request, exc, dec
import re, traceback, logging


class Dict2Obj(object):
    def __init__(self, d: dict):
        # self.__dict__={_dict:d}  # 覆盖实例创建时的字典,依然会调用__setattr__
        if not isinstance(d, (dict,)):
            self.__dict__[_dict] = {}
        else:
            self.__dict__[_dict] = d

    def __getattr__(self, item):
        try:
            return self._dict[item]
        except KeyError:
            raise AttributeError(Attribute {} not found!.format(item))

    def __setattr__(self, key, value):
        raise NotImplementedError


class Context(dict):
    def __getattr__(self, item):
        try:
            return self[item]
        except KeyError:
            raise AttributeError(Attribute {} not found!.format(item))

    def __setattr__(self, key, value):
        self[key] = value


class NestedContext(Context):
    # def __init__(self,globalcontext:Context=None):
    #     super().__init__()
    #     self.relate(globalcontext)

    def relate(self, globalcontext: Context = None):
        self.globalcontext = globalcontext

    def __getattr__(self, item):
        if item in self.keys():
            return self[item]
        return getattr(self.globalcontext, item)


class Router:
    mold = re.compile(/({[^{}:]+:?[^{}:]*}))
    TYPEPATTERNS = {
        str: r[^/]+,  # exclude / in url
        word: r\w+,
        int: r[+-]?\d+\b,  # illegal escape sequence \ will stay the same
        float: r[+-]?\d+\.\d+\b,
        any: r.+
    }
    TYPECAST = {
        str: str,
        word: str,
        int: int,
        float: float,
        any: str
    }

    def transform(self, tmp: str):
        designation, _, genre = tmp.strip(/{}).partition(:)
        return /(?P{}).format(designation, self.TYPEPATTERNS.get(genre, r\w+)), designation, self.TYPECAST.get(
            genre, str)

    def parse(self, src: str):
        start = 0
        result = ‘‘
        translator = {}
        while True:
            matcher = self.mold.search(src, start)
            if matcher:
                result += matcher.string[start:matcher.start(0)]
                tmp = self.transform(matcher.string[matcher.start(0):matcher.end(0)])  # 匹配整体调用transform
                result += tmp[0]
                translator.setdefault(tmp[1], tmp[2])
                start = matcher.end(0)
                print(result =, result)
                print(tmp =, tmp)
                print(matcher.pos =, matcher.pos)
                print(matcher.endpos =, matcher.endpos)
            else:
                break
        if result:
            return result, translator
        else:
            return src, translator

    def __init__(self, prefix: str = ‘‘):
        self._prefix = prefix.rstrip(/\\)  # such as /product
        self._routetable = []  # triple tuple

        # interceptors
        self.pre_interceptors = []
        self.post_interceptors = []

        # context
        self.ctx = NestedContext({Router: I\‘m router NestedContext})  # 未关联全局,注册时注入
        # self.ctx.router=self  # 在向Application类注册时指定了

    def register_pre_interceptor(self, fn):
        self.pre_interceptors.append(fn)
        return fn

    def register_post_interceptor(self, fn):
        self.post_interceptors.append(fn)
        return fn

    def route(self, rule, *methods):
        def wrapper(handler):
            pattern, translator = self.parse(rule)
            print(parse pattern translator, pattern, translator)
            for r_methods, r_pattern, r_translator, r_handler in self._routetable:
                print(r_methods, r_pattern, r_translator, r_handler)
                if handler == r_handler and re.compile(pattern) == r_pattern:
                    translator.update(r_translator)
                    self._routetable.append((r_methods + methods, r_pattern, translator, r_handler))
                    self._routetable.remove((r_methods, r_pattern, r_translator, r_handler))  # 移除原item
                    break
            else:
                print(else)
                self._routetable.append((methods, re.compile(pattern), translator, handler))
                print(self._routetable)
            return handler

        return wrapper

    @property
    def prefix(self):
        return self._prefix

    def get(self, pattern):
        return self.route(pattern, GET)

    def post(self, pattern):
        return self.route(pattern, POST)

    def head(self, pattern):
        return self.route(pattern, HEAD)

    def match(self, request: Request) -> Response:
        print(request.method = {}, request.path = {}, self.prefix = {!r}.format(request.method, request.path,
                                                                                  self.prefix))
        # 判断prefix
        if self._prefix == ‘‘:
            if request.path == /:
                pass
            else:
                return None
        else:
            if request.path.startswith(self._prefix):  # /uranus and /uranuss is not the same prefix
                if request.path.startswith(self._prefix + /) or request.path == self.prefix:
                    # /uranus & /uranus/xxx situation
                    pass
                else:  # /uranuss situation
                    return None
            else:
                return None

        # 依次执行拦截请求
        for fn in self.pre_interceptors:
            logging.critical(in router interceptor)
            request = fn(self.ctx, request)

        for methods, pattern, translator, handler in self._routetable:
            print(prefix {} is right, methods = {}, pattern = {}, translator = {} handler = {}.format(self._prefix,
                                                                                                        methods,
                                                                                                        pattern.pattern,
                                                                                                        translator,
                                                                                                        handler))
            matcher = pattern.match(request.path.replace(self._prefix, ‘‘, 1))
            # request.path.replace(self.prefix,‘‘,1) == ‘‘ is /uranus circumstance
            if matcher or request.path.replace(self.prefix, ‘‘, 1) == ‘‘:
                print(request.path = {} matched {}.format(request.path, pattern, pattern))
                if not methods or request.method.upper() in methods:
                    if request.path != self.prefix:  # /uranus circumstance matcher is None
                        request.args = matcher.groups()
                        _dict = {}
                        logging.error(matcher.groupdict())
                        for k, v in matcher.groupdict().items():  # 对/{name:str}/{id:int}中name&id匹配的内容进行typecast
                            _dict.setdefault(k, translator.get(k)(v))
                        request.kwargs = Dict2Obj(matcher.groupdict())
                        request.vars = Dict2Obj(_dict)
                        logging.error(request.args)
                        logging.warning(request.kwargs.__dict__)
                    else:
                        request.args = ()
                        request.kwargs = Dict2Obj({})
                        request.vars = Dict2Obj({})
                        logging.error(matcher is None,set default value in case of error)

                    response = handler(self.ctx, request)
                    # 执行response拦截请求
                    for fn in self.post_interceptors:
                        response = fn(self.ctx, request, response)
                    return response

                else:
                    print({} is illegal.format(request.method))
                    raise exc.HTTPBadRequest(request {} is illegal.format(request.method))
            else:
                print(request.path {} mismatched {}.format(request.path.replace(self.prefix, ‘‘, 1), pattern.pattern))
                continue
        else:
            raise exc.HTTPNotFound(prefix {} is right, request.path {} not found.format(self._prefix, request.path))


class Application:
    ctx = Context({application: I\‘m application Context})  # 全局上下文对象

    @classmethod
    def extend(cls,name,ext):
        cls.ctx[name]=ext
        
    def __init__(self, **kwargs):
        # 创建上下文对象,共享信息
        self.ctx.app = self
        for k, v in kwargs.items():
            self.ctx[k] = v

    # interceptors
    PRE_INTERCEPTORS = []
    POST_INTERCEPTORS = []

    # interceptors 注册函数
    @classmethod
    def register_pre_interceptors(cls, fn):
        cls.PRE_INTERCEPTORS.append(fn)
        return fn

    @classmethod
    def register_post_interceptors(cls, fn):
        cls.POST_INTERCEPTORS.append(fn)
        return fn

    ROUTERS = []  # 注册的Router对象

    @classmethod
    def register(cls, router: Router):  # 为Router实例注入全局上下文
        router.ctx.relate(cls.ctx)
        router.ctx.router = router  # 为router的NestedContext对象之属性设置属性指向self
        cls.ROUTERS.append(router)

    @dec.wsgify
    def __call__(self, request: Request) -> Response:
        # 全局拦截
        for fn in self.PRE_INTERCEPTORS:
            logging.critical(in Application interceptors)
            request = fn(self.ctx, request)

        for router in self.ROUTERS:
            response = router.match(request)
            print(class Application response =, response)
            # 全局拦截response
            for fn in self.POST_INTERCEPTORS:
                response = fn(self.ctx, request, response)

            if response:
                return response
        else:
            raise exc.HTTPNotFound(prefix not found, request.path = {}.format(request.path))


idx = Router()  # prefix = ‘‘
uranus = Router(/uranus)  # prefix = /uranus

Application.register(idx)
Application.register(uranus)


@idx.get(^/$)
@idx.post(^/$)
def index(ctx: NestedContext, request: Request) -> Response:  # response=handler(self.ctx,request) add ctx parameter
    response = Response()
    response.status_code = 303
    response.content_type = application/json
    response.charset = utf8
    response.body = request.path = {} \n{} \n{} \n{} \nctx = {}.format(request.path, request.args,
                                                                         request.kwargs.__dict__,
                                                                         request.vars.__dict__, ctx).encode()
    return response


# @uranus.route(/(?P[a-zA-Z]+))
# @uranus.route(r/(\w+))
@uranus.route(/{name:str}/{id:int})
def neptune(ctx: NestedContext, request: Request) -> Response:  # response=handler(self.ctx,request) add ctx parameter
    response = Response()
    response.status_code = 304
    response.content_type = text/xml
    response.charset = ascii
    response.body = request.path = {} \n{} \n{} \n{} \nctx = {}.format(request.path, request.args,
                                                                         request.kwargs.__dict__,
                                                                         request.vars.__dict__, ctx).encode()
    return response


# interceptor Examples
@Application.register_pre_interceptors
def show_headers(ctx: Context, request: Request) -> Request:
    print(Application interceptor request.path = {}.format(request.path))
    print(Application interceptor request.user_agent = {}.format(request.user_agent))
    return request


@uranus.register_pre_interceptor
def show_prefix(ctx: NestedContext, request: Request) -> Request:  # 传递每个router实例的ctx
    print(router = {}, prefix = {}.format(ctx.router, ctx.router.prefix))
    return request


# print(idx._routetable)
# print(uranus._routetable)

if __name__ == __main__:
    server = make_server(‘‘, 9999, Application())
    try:
        server.serve_forever()
    except:
        traceback.print_exc(limit=None)
    finally:
        server.shutdown()
        server.server_close()

 

WSGI web框架的实现

标签:pass   移除   ascii   div   orm   sgi   escape   art   pat   

原文地址:https://www.cnblogs.com/dissipate/p/14131057.html

上一篇:文件上传

下一篇:package.json 字段说明


评论


亲,登录后才可以留言!