Ocata Neutron代码分析(一)——Neutron API启动过程分析
2021-07-19 03:20
首先,Neutron Server作为一种服务(neutron-server.service),可以到Neutron项目目录中的setup.cfg配置文件中找到对应的代码入口。
[entry_points] console_scripts = neutron-db-manage = neutron.db.migration.cli:main neutron-debug = neutron.debug.shell:main neutron-dhcp-agent = neutron.cmd.eventlet.agents.dhcp:main neutron-keepalived-state-change = neutron.cmd.keepalived_state_change:main neutron-ipset-cleanup = neutron.cmd.ipset_cleanup:main neutron-l3-agent = neutron.cmd.eventlet.agents.l3:main neutron-linuxbridge-agent = neutron.cmd.eventlet.plugins.linuxbridge_neutron_agent:main neutron-linuxbridge-cleanup = neutron.cmd.linuxbridge_cleanup:main neutron-macvtap-agent = neutron.cmd.eventlet.plugins.macvtap_neutron_agent:main neutron-metadata-agent = neutron.cmd.eventlet.agents.metadata:main neutron-netns-cleanup = neutron.cmd.netns_cleanup:main neutron-ns-metadata-proxy = neutron.cmd.eventlet.agents.metadata_proxy:main neutron-openvswitch-agent = neutron.cmd.eventlet.plugins.ovs_neutron_agent:main neutron-ovs-cleanup = neutron.cmd.ovs_cleanup:main neutron-pd-notify = neutron.cmd.pd_notify:main neutron-server = neutron.cmd.eventlet.server:main neutron-rpc-server = neutron.cmd.eventlet.server:main_rpc_eventlet neutron-rootwrap = oslo_rootwrap.cmd:main neutron-rootwrap-daemon = oslo_rootwrap.cmd:daemon neutron-usage-audit = neutron.cmd.eventlet.usage_audit:main neutron-metering-agent = neutron.cmd.eventlet.services.metering_agent:main neutron-sriov-nic-agent = neutron.cmd.eventlet.plugins.sriov_nic_neutron_agent:main neutron-sanity-check = neutron.cmd.sanity_check:main
neutron-server代码入口:
def main(): server.boot_server(_main_neutron_server)
调用neutron.server.__init__中的boot_server函数:
def boot_server(server_func): # the configuration will be read into the cfg.CONF global data structure config.init(sys.argv[1:]) config.setup_logging() config.set_config_defaults() if not cfg.CONF.config_file: sys.exit(_("ERROR: Unable to find configuration file via the default" " search paths (~/.neutron/, ~/, /etc/neutron/, /etc/) and" " the ‘--config-file‘ option!")) try: server_func() except KeyboardInterrupt: pass except RuntimeError as e: sys.exit(_("ERROR: %s") % e)
该函数在外加了一层初始化日志文件的壳,执行server_func(也就是这里的 _main_neutron_server),并进行异常的捕获。
def _main_neutron_server(): if cfg.CONF.web_framework == ‘legacy‘: wsgi_eventlet.eventlet_wsgi_server() else: wsgi_pecan.pecan_wsgi_server()
根据Neutron配置文件中的web_framework进行判断,默认为legacy。执行neutron.server.wsgi_eventlet.py中的eventlet_wsgi_server。
def eventlet_wsgi_server(): neutron_api = service.serve_wsgi(service.NeutronApiService) # 初始化neutron_api start_api_and_rpc_workers(neutron_api) # 启动neutron_api和rpc
先分析一下neutron_api的启动过程,rpc的启动过程下一节分析。调用neutron.service.py中的serve_wsgi函数,相关的类代码都在一块,一起贴出来分析。
class WsgiService(object): """Base class for WSGI based services. For each api you define, you must also define these flags: :_listen: The address on which to listen :_listen_port: The port on which to listen """ def __init__(self, app_name): # 初始化类变量,wsgi_app变量在start函数中初始化 self.app_name = app_name self.wsgi_app = None def start(self): self.wsgi_app = _run_wsgi(self.app_name) def wait(self): self.wsgi_app.wait() class NeutronApiService(WsgiService): """Class for neutron-api service.""" def __init__(self, app_name): profiler.setup(‘neutron-server‘, cfg.CONF.host) super(NeutronApiService, self).__init__(app_name) # super调用父类的构造函数 @classmethod def create(cls, app_name=‘neutron‘): # Setup logging early config.setup_logging() service = cls(app_name) # 调用构造函数,app_name为neutron return service # 返回一个NeutronApiService的实例 def serve_wsgi(cls): # 传入的cls参数为service.NeutronApiService try: service = cls.create() # 调用NeutronApiService.create service.start() # 初始化service.wsgi_app except Exception: with excutils.save_and_reraise_exception(): LOG.exception(_LE(‘Unrecoverable error: please check log ‘ ‘for details.‘)) registry.notify(resources.PROCESS, events.BEFORE_SPAWN, service) return service
下面分析_run_wsgi函数,传入参数app_name为‘neutron‘。
def _run_wsgi(app_name): app = config.load_paste_app(app_name) # 加载wsgi_app if not app: LOG.error(_LE(‘No known API applications configured.‘)) return return run_wsgi_app(app) # 启动wsgi_server
def load_paste_app(app_name): """Builds and returns a WSGI app from a paste config file. :param app_name: Name of the application to load """ loader = wsgi.Loader(cfg.CONF) app = loader.load_app(app_name) return app
load_paste_app函数首先实例化oslo_service.wsgi.py中的Loader类,返回一个loader对象。然后再调用loader对象的load_app函数来加载wsgi_app。
class Loader(object): """Used to load WSGI applications from paste configurations.""" def __init__(self, conf): """Initialize the loader, and attempt to find the config. :param conf: Application config :returns: None """ conf.register_opts(_options.wsgi_opts) self.config_path = None config_path = conf.api_paste_config # 获取配置文件中api_paste_config指定的api-paste.ini文件路径 if not os.path.isabs(config_path): self.config_path = conf.find_file(config_path) elif os.path.exists(config_path): self.config_path = config_path # 赋值给loader.config_path,供load_app中调用 if not self.config_path: raise ConfigNotFound(path=config_path) def load_app(self, name): """Return the paste URLMap wrapped WSGI application. :param name: Name of the application to load. :returns: Paste URLMap object wrapping the requested application. :raises: PasteAppNotFound """ try: LOG.debug("Loading app %(name)s from %(path)s", {‘name‘: name, ‘path‘: self.config_path}) return deploy.loadapp("config:%s" % self.config_path, name=name) # 调用第三方库paste.deploy的loadapp函数来加载wsgi_app并返回,后续章节分析其配置文件 except LookupError: LOG.exception("Couldn‘t lookup app: %s", name) raise PasteAppNotFound(name=name, path=self.config_path)
完成加载的wsgi_app在_run_wsgi函数中作为参数传入run_wsgi_app函数中,进行wsgi_server的创建。
def run_wsgi_app(app): server = wsgi.Server("Neutron") server.start(app, cfg.CONF.bind_port, cfg.CONF.bind_host, workers=_get_api_workers()) LOG.info(_LI("Neutron service started, listening on %(host)s:%(port)s"), {‘host‘: cfg.CONF.bind_host, ‘port‘: cfg.CONF.bind_port}) return server
run_wsgi_app函数首先实例化Server,然后调用start函数来启动wsgi_server。这里的workers指启动workers个进程来运行Neutron API。先分析_get_api_workers函数再分析Server类中的相关内容。
def _get_api_workers(): workers = cfg.CONF.api_workers if workers is None: workers = processutils.get_worker_count() return workers
_get_api_workers先获取配置文件中的api_workers。如果没有配置,则workers默认为CPU个数。
class Server(object): def start(self, application, port, host=‘0.0.0.0‘, workers=0): """Run a WSGI server with the given application.""" self._host = host self._port = port backlog = CONF.backlog self._socket = self._get_socket(self._host, self._port, backlog=backlog) self._launch(application, workers) def _launch(self, application, workers=0): service = WorkerService(self, application, self.disable_ssl, workers) if workers : # The API service should run in the current process. self._server = service # Dump the initial option values cfg.CONF.log_opt_values(LOG, logging.DEBUG) service.start() systemd.notify_once() else: # dispose the whole pool before os.fork, otherwise there will # be shared DB connections in child processes which may cause # DB errors. api.context_manager.dispose_pool() # The API service runs in a number of child processes. # Minimize the cost of checking for child exit by extending the # wait interval past the default of 0.01s. self._server = common_service.ProcessLauncher(cfg.CONF, wait_interval=1.0) self._server.launch_service(service, workers=service.worker_process_count)
首先,_launch函数会把传入的wsgi_app封装成一个WorkerService,然后对workers进行判断。当workers的值大于或等于1时(默认情况下,workers的值为1,因此会创建一个子进程来运行Neutron API),与接下来分析的rpc服务启动部分和其他服务启动部分一样,先实例化oslo_service.service.py中的ProcessLauncher类,再调用其中的launch_service来启动WorkerService。
class ProcessLauncher(object): def launch_service(self, service, workers=1): """Launch a service with a given number of workers. :param service: a service to launch, must be an instance of :class:`oslo_service.service.ServiceBase` :param workers: a number of processes in which a service will be running """ _check_service_base(service) # 检查传入参数service的基类是否为oslo_service.service.ServiceBase wrap = ServiceWrapper(service, workers) # 将servcie相关参数记录到wrap变量中,包括service和workers LOG.info(‘Starting %d workers‘, wrap.workers) while self.running and len(wrap.children) # 依次启动子进程,数量为wrap.workers self._start_child(wrap)
class WorkerService(neutron_worker.NeutronWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, service, application, disable_ssl=False, worker_process_count=0): super(WorkerService, self).__init__(worker_process_count) self._service = service # service为neutron.wsgi.py中Server的实例server(run_wsgi_app函数中的server) self._application = application # wsgi_app self._disable_ssl = disable_ssl self._server = None def start(self): super(WorkerService, self).start() # When api worker is stopped it kills the eventlet wsgi server which # internally closes the wsgi server socket object. This server socket # object becomes not usable which leads to "Bad file descriptor" # errors on service restart. # Duplicate a socket object to keep a file descriptor usable. dup_sock = self._service._socket.dup() if CONF.use_ssl and not self._disable_ssl: dup_sock = sslutils.wrap(CONF, dup_sock) self._server = self._service.pool.spawn(self._service._run, self._application, dup_sock) # 这里调用self._service也就是上面提到的server对象的pool(eventlet的GreenPool) # 的spawn函数来运行Server的_run函数
def _run(self, application, socket): """Start a WSGI server in a new green thread.""" eventlet.wsgi.server(socket, application, max_size=self.num_threads, log=LOG, keepalive=CONF.wsgi_keep_alive, socket_timeout=self.client_socket_timeout)
上一篇:WinFrom开发小案例
下一篇:WPF中的TreeView入门
文章标题:Ocata Neutron代码分析(一)——Neutron API启动过程分析
文章链接:http://soscw.com/essay/106268.html