Web框架的引入
2021-06-10 07:05
标签:set 初始化 mat open insert 执行 end 用户 str 有了上一篇内容,静态、动态web服务器的实现,已经掌握了客户端请求到服务器处理的机制。在动态资源处理中,根据请求 .py 导入模块应用,然后调用应用入口程序实现动态处理。但是在真实的项目中,肯定有很多应用(.py文件),按照这种处理机制(每个应用中都要写一个入口,设置状态码、headers)不符合架构设计原则,这一部分肯定是可以复用的,所用应用应该公用一个入口,这就引入一个概念web框架。 入口实在WebFramework.py 中,直接启动该文件,客户端请求指定路径即可看到效果。但是对于大多框架都是通过命令行启动,我们不可能在程序中这样启动,而且对于webserver来说始终是不变得,变的是web框架,所以改造成命令行启动,代码如下: 在命令行中,执行如下命令即可启动webserver服务,模块名称:应用框架名, 其中这里的app指的是WebFramework.py中Application类的实例,其实也就是应用框架的实例。后续自定义框架的时候,直接调用即可。 代码已上传到github:python 小程序 Web框架的引入 标签:set 初始化 mat open insert 执行 end 用户 str 原文地址:http://www.cnblogs.com/tianboblog/p/7296045.html为什么会有web框架
设计思路
代码如下:
- WebFramework.py
`
# coding:utf-8
import time
from MyWebServer import WebServer
HTML_ROOT_DIR = "./static"
class Application(object):
‘‘‘自定义通用的web框架‘‘‘
# 初始化路由信息
def __init__(self,urls):
self.urls = urls
# 匹配路由
def __call__(self, env, start_response):
path = env.get("PATH_INFO", "/")
# /static/index.html
if path.startswith("/static"):
file_name = path[7:]
# 打开文件,读取内容
try:
file = open(HTML_ROOT_DIR + file_name, "rb")
except IOError:
# 代表未找到路由信息,404错误
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
else:
file_data = file.read()
file.close()
status = "200 OK"
headers = []
start_response(status, headers)
return file_data.decode("utf-8")
for url,handler in self.urls:
if path == url:
return handler(env,start_response)
# 未匹配到
status = ‘404 Not Found‘
headers = []
start_response(status,headers)
return ‘not found‘
def showtime(env,start_response):
status = ‘200 OK‘
headers = [
(‘Content-Type‘, ‘text/plain‘)
]
start_response(status, headers)
return str(time.time())
def sayhello(env,start_response):
status = ‘200 OK‘
headers = [
(‘Content-Type‘,‘text/plain‘)
]
start_response(status,headers)
return ‘say hello‘
def helloworld(env,start_response):
status = ‘200 OK‘
headers =[
(‘Content-Type‘,‘text/plain‘)
]
start_response(status,headers)
return ‘hello world‘
if __name__ == ‘__main__‘:
urls = [
(‘/‘, showtime),
(‘/sayhello‘,sayhello),
(‘/helloworld‘,helloworld)
]
app = Application(urls)
webServer = WebServer(app)
webServer.bind(8000)
webServer.start()
`
- MyWebServer.py
`
# coding:utf-8
import socket
import re
import sys
from multiprocessing import Process
HTML_ROOT_DIR = ‘./static‘
WSGI_PY = ‘./wsgipy‘
class WebServer(object):
‘‘‘
简单的webserver
‘‘‘
def __init__(self,application):
‘‘‘application:框架‘‘‘
self.sock_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.app = application
def start(self):
self.sock_server.listen(128)
while True:
sock_client, addr = self.sock_server.accept()
print(‘[%s,%s]用户已连接......‘ % addr)
handle_client_process = Process(target=self.handle_client, args=(sock_client,))
handle_client_process.start()
sock_client.close()
def start_response(self, status, headers):
"""
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
star
"""
resp_headers = ‘HTTP/1.1 ‘ + status + ‘\r\n‘
for header in headers:
resp_headers += ‘%s: %s\r\n‘ % header
self.resp_headers = resp_headers
def handle_client(self, sock_client):
‘‘‘处理客户端请求‘‘‘
recv_data = sock_client.recv(1024)
#print(‘请求数据:‘, recv_data)
req_lines = recv_data.splitlines()
#for line in req_lines:
# print(line)
req_start_line = req_lines[0]
#print(req_start_line.decode(‘utf-8‘))
file_name = re.match(r"\w+ +(/[^ ]*) ", req_start_line.decode("utf-8")).group(1)
method = re.match(r"(\w+) +/[^ ]* ", req_start_line.decode("utf-8")).group(1)
env = {
"PATH_INFO": file_name,
"METHOD": method
}
response_body = self.app(env, self.start_response)
response = self.resp_headers + "\r\n" + response_body
# 向客户端返回响应数据
sock_client.send(bytes(response, "utf-8"))
# 关闭客户端连接
sock_client.close()
def bind(self, port):
self.sock_server.bind((‘‘, port))
def main():
sys.path.insert(1,WSGI_PY)
webServer = WebServer()
webServer.bind(8000)
webServer.start()
if __name__ == ‘__main__‘:
main()
`
启动
- WebFramework.py
`
# coding:utf-8
import time
HTML_ROOT_DIR = "./static"
class Application(object):
‘‘‘自定义通用的web框架‘‘‘
# 初始化路由信息
def __init__(self,urls):
self.urls = urls
# 匹配路由
def __call__(self, env, start_response):
path = env.get("PATH_INFO", "/")
# /static/index.html
if path.startswith("/static"):
file_name = path[7:]
# 打开文件,读取内容
try:
file = open(HTML_ROOT_DIR + file_name, "rb")
except IOError:
# 代表未找到路由信息,404错误
status = "404 Not Found"
headers = []
start_response(status, headers)
return "not found"
else:
file_data = file.read()
file.close()
status = "200 OK"
headers = []
start_response(status, headers)
return file_data.decode("utf-8")
for url,handler in self.urls:
if path == url:
return handler(env,start_response)
# 未匹配到
status = ‘404 Not Found‘
headers = []
start_response(status,headers)
return ‘not found‘
def showtime(env,start_response):
status = ‘200 OK‘
headers = [
(‘Content-Type‘, ‘text/plain‘)
]
start_response(status, headers)
return str(time.time())
def sayhello(env,start_response):
status = ‘200 OK‘
headers = [
(‘Content-Type‘,‘text/plain‘)
]
start_response(status,headers)
return ‘say hello‘
def helloworld(env,start_response):
status = ‘200 OK‘
headers =[
(‘Content-Type‘,‘text/plain‘)
]
start_response(status,headers)
return ‘hello world‘
urls = [
(‘/‘, showtime),
(‘/sayhello‘,sayhello),
(‘/helloworld‘,helloworld)
]
app = Application(urls)
# if __name__ == ‘__main__‘:
# urls = [
# (‘/‘, showtime),
# (‘/sayhello‘,sayhello),
# (‘/helloworld‘,helloworld)
# ]
# app = Application(urls)
#
# webServer = WebServer(app)
# webServer.bind(8000)
# webServer.start()
`
- MyWebServer.py
`
# coding:utf-8
import socket
import re
import sys
from multiprocessing import Process
HTML_ROOT_DIR = ‘./static‘
WSGI_PY = ‘./wsgipy‘
class WebServer(object):
‘‘‘
简单的webserver
‘‘‘
def __init__(self,application):
‘‘‘application:框架‘‘‘
self.sock_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.app = application
def start(self):
self.sock_server.listen(128)
while True:
sock_client, addr = self.sock_server.accept()
print(‘[%s,%s]用户已连接......‘ % addr)
handle_client_process = Process(target=self.handle_client, args=(sock_client,))
handle_client_process.start()
sock_client.close()
def start_response(self, status, headers):
"""
status = "200 OK"
headers = [
("Content-Type", "text/plain")
]
star
"""
resp_headers = ‘HTTP/1.1 ‘ + status + ‘\r\n‘
for header in headers:
resp_headers += ‘%s: %s\r\n‘ % header
self.resp_headers = resp_headers
def handle_client(self, sock_client):
‘‘‘处理客户端请求‘‘‘
recv_data = sock_client.recv(1024)
#print(‘请求数据:‘, recv_data)
req_lines = recv_data.splitlines()
#for line in req_lines:
# print(line)
req_start_line = req_lines[0]
#print(req_start_line.decode(‘utf-8‘))
file_name = re.match(r"\w+ +(/[^ ]*) ", req_start_line.decode("utf-8")).group(1)
method = re.match(r"(\w+) +/[^ ]* ", req_start_line.decode("utf-8")).group(1)
env = {
"PATH_INFO": file_name,
"METHOD": method
}
response_body = self.app(env, self.start_response)
response = self.resp_headers + "\r\n" + response_body
# 向客户端返回响应数据
sock_client.send(bytes(response, "utf-8"))
# 关闭客户端连接
sock_client.close()
def bind(self, port):
self.sock_server.bind((‘‘, port))
def main():
sys.path.insert(1,WSGI_PY)
if len(sys.argv)
调用方式
`
python MyWebServer.py WebFramework:app
`
下一篇:iTextCharp c#