Skip to content

Flask源码分析系列(2) Flask源码分析

fuzhengwei edited this page Jun 11, 2020 · 1 revision

@[TOC]

转载请注明出处即可 源码地址github flask 主要参考文档为flask 环境为MacOS, Python 3.7+, IDE Pycharm

注意:文章中的源码存在删减,主要是为了减少篇幅和去除非核心逻辑,但不会影响对执行流程的理解。

如果对Werkzeug不是很了解,请先看Flask源码分析系列(1) -Werkzeug源码分析这篇文章

一、从一个最简单的Demo开始

Flask是Python语言编写的一个优秀的开源Web框架。我们先从一个最小的Demo开始,逐步来分析Flask是如何实现相关功能的。

from flask import Flask

app = Flask(__name__)


@app.route('/')
def hello_world():
    return 'Hello, World!'


def main():
    app.run(host='0.0.0.0', port=8080, debug=True)


if __name__ == '__main__':
    main()

首先app变量或者说Flask类创建的对象,其实是一个WSGI Application,也就是说是一个符合上篇文件中描述的一个符合WSGI规则的一个函数,具体是Flask类的wsgi_app方法来实现。

# app.py 2366行, Flask类的方法
def __call__(self, environ, start_response):
    return self.wsgi_app(environ, start_response)

# app.py 2323行, Flask类的方法
def wsgi_app(self, environ, start_response):
    pass

虽然app.run方法提供了Werkzeug的serving.make_server的实现,但是你依然可以选择其他支持WSGI协议的Server来运行Flask应用,比如gunicorn等。在实践中,我们在开发环境可以选择一些基本的WSGI Server用于本地调试。而在生产环境中在使用gunicorn等来实现多进程运行。当然这都直接取决于你自己根据实际的环境进行选择。以下代码是使用tornado的httpserver的一个例子。

from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from demo import app

import sys

reload(sys)
sys.setdefaultencoding("utf-8")


def main():
    http_server = HTTPServer(WSGIContainer(app))
    http_server.listen(8080)
    IOLoop.instance().start()


app.config['SESSION_TYPE'] = 'filesystem'
app.config['APIURL'] = '/api'

if __name__ == "__main__":
    main()

如果使用gunicorn,那么可以通过以下指令来运行Flask应用。

export FLASK_ENV=development
THREAD_COUNT=8
gunicorn -k gevent -w $THREAD_COUNT -b 0.0.0.0:8080 demo:app -t 6000000

扯了一些基本应用,下面开始进入正题。

二、Route实现原理

Route的实际作用是将url path和具体要执行的函数进行映射。Flask并没有把这些能力自己实现,而是使用了Werkzeug的Map、Rule和MapAdapter来实现。

首先先看下@app.route('/')装饰器的实现。 (Python的装饰器在这里不详细解释,如果不明白请查看廖雪峰的Python教程)

def route(self, rule, **options):
    def decorator(f):
        endpoint = options.pop("endpoint", None)
        self.add_url_rule(rule, endpoint, f, **options)
        return f

    return decorator

代码很简洁,route方法的参数rule是url path,而options则对应着Werkzeug中Rule类的参数,比如endpoint,methods等。除了endpoint做了一些特殊的处理以外,其他的参数原封不动的传到了Rule的__init__。 在decorator函数中的第一行从options dict中pop出了endpoint,这里是因为在add_url_rule进行了一些其他处理(其实就是判断是否是None,然后选择是否使用函数名称而已)。 add_url_rule方法的第三个参数f,则为被装饰的函数,在Demo的例子中就是hello_world函数。 Flask默认使用的endpoint是方法的名称,但依然保留了这个参数,方便用户自定义endpoint。

然后我们来看下Flask.add_url_rule方法的实现。具体源码在app.py的1099行。由于方法略长,我们来拆分即可来分析。方法的参数列表没有什么需要过多解释的。

def add_url_rule(
    self,
    rule,
    endpoint=None,
    view_func=None,
    provide_automatic_options=None,
    **options,
):
  pass

函数的第一段,是处理endpoint,如果用户没有在route中设置endpoint参数的话,则默认使用了view_func.__name__来获取函数的名称。然后获取了methods的参数。

if endpoint is None:
    endpoint = _endpoint_from_view_func(view_func)
options["endpoint"] = endpoint

函数的第二段,是对methods参数的处理,如果用户没有设置methods列表(或元组)的话,默认设置为("Get",)。并且对methods进行了是否是字符串的检查, 最后将所有的method都变成大写和去重(放入了Set中)。

methods = options.pop("methods", None)
if methods is None:
    methods = getattr(view_func, "methods", None) or ("GET",)
if isinstance(methods, str):
    raise TypeError(
        "Allowed methods must be a list of strings, for"
        ' example: @app.route(..., methods=["POST"])'
    )
methods = {item.upper() for item in methods}

函数的第三段是增加了必须要添加的methods的检查,比如在methods中如果没有OPTIONS的话,Flask也会增加默认的OPTIONS到Methods集合中。

required_methods = set(getattr(view_func, "required_methods", ()))

if provide_automatic_options is None:
    provide_automatic_options = getattr(
        view_func, "provide_automatic_options", None
    )

if provide_automatic_options is None:
    if "OPTIONS" not in methods:
        provide_automatic_options = True
        required_methods.add("OPTIONS")
    else:
        provide_automatic_options = False

methods |= required_methods

函数的第四段,主要与Werkzeug的Rule和Map类有关。其中url_rule_class = Rule,而url_map_class = Map,self.url_map = self.url_map_class()。所以这段的最后一行其实就是在Map的rules列表中添加Rule类的对象。

rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options

self.url_map.add(rule)

函数的最后一段的逻辑,如果看过上篇文章的话,也就能猜到还差一个endpoint到view_func的映射关系,在Flask中 self.view_functions = {} 也是通过一个字典来存储的。并且还进行了一个检查,防止一个endpoint映射到多个view_func中。

if view_func is not None:
    old_func = self.view_functions.get(endpoint)
    if old_func is not None and old_func != view_func:
        raise AssertionError(
            "View function mapping is overwriting an existing"
            f" endpoint function: {endpoint}"
        )
    self.view_functions[endpoint] = view_func

最后我们可以看下dispatch_request方法,在app.py的1830行。函数的最后一行是

return self.view_functions[rule.endpoint](**req.view_args)

是不是和上篇文章的一个例子很像^_^。 当然只获取到具体的view_func来执行是不够的,还需要通过finalize_request来构造response,还需要符合WSGI的规范。

三、request、Response、session等对象的实现

其实除了Response, request和session都使用了Werkzeug中的Context Locals。并且request就是Werkzeug中的Request。globals.py中的部分源码如下。

from werkzeug.local import LocalProxy
from werkzeug.local import LocalStack

# context locals
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))

在这里多说下Session,Flask的session默认是客户端session,也就是说session的数据不是存储在内存中的,而是加密后存储在了Cookie中,并且每次请求在解密后还原session。Flask使用的是AES之类的对称加密算法。所以在使用session时,尽量不要将大对象存储在session中,否则后续的每个请求都会携带这些数据。对于session的具体实现,在这里不进行详述,感兴趣的可以看下源码中的sessions.py。 对于Flask的session的实践,可以在公共缓存中存储一个实际的session对象,而在Flask的session中仅存储用户的id,进而减轻用户请求传输的Cookie的数据量。

四、一些简单的封装

(1) 登录校验与拦截

可以通过装饰器来实现,在需要登录的view_func上增加@login_required即可

def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kw):
        user_id = session.get('user_id')
        if user_id is None:
            return BaseError.not_login()

        return f(*args, **kw)

    return decorated_function

(2) 自定义异常与返回值处理

class BusinessException(Exception):
    def __init__(self, code=None, msg=None, func=None):
        self.code = code
        self.msg = msg
        self.func = func
class Error(BaseError):
    @staticmethod
    def custom_error():
        return return_data(code=REQUEST_FAIL, msg=u'自定义异常')
def request_handler(**data_dict):
    def decorator(func):
        @wraps(func)
        def handle_request_data(*args, **kw):
            try:
                check_rule = build_check_rule(str(request.url_rule), get_rule_version(),
                                              list(request.url_rule.methods & set(METHODS)))
                check_func = check_param.get_check_rules().get(check_rule)
                if check_func:
                    check_func(*args, **kw)
            except BusinessException as e:
                if e.func is not None:
                    return e.func()
                elif e.code is not None and e.msg is not None:
                    logger.error('BusinessException, code: %s, msg: %s' % (e.code, e.msg))
                    return return_data(code=e.code, msg=e.msg)
                else:
                    return request_fail()
            except Exception:
                return request_fail()

            try:
                return func(*args, **kw)
            except BusinessException as e:
                if e.func is not None:
                    return e.func()
                elif e.code is not None and e.msg is not None:
                    logger.error('BusinessException, code: %s, msg: %s' % (e.code, e.msg))
                    return return_data(code=e.code, msg=e.msg)
                else:
                    return request_fail()
            except Exception:
                return request_fail()

        return handle_request_data

    return decorator

在具体业务逻辑编写时,则无需在每个view_func中对异常进行处理,只需要raise具体的业务异常即可。

@app.route('/main.json', version=['<=1.3'])
@request_handler()
def main_json():
     raise BusinessException(func=Error.custom_error)

request_handler中的对参数检查的相关函数,是因为笔者之前所写的业务逻辑,需要大量的参数校验,并且还存在着一定的校验逻辑复用,所以将参数校验和具体的业务逻辑进行了分离。具体使用时,类似于下面的形式来使用。check_outer和route的路径相同即可进行一一对应。

@check_outer.check('/main.json', version=versions)
def main_json(*args, **kw):
   raise BusinessException(func=Error.custom_error)

至于具体的实现,笔者简单抄了下Blueprint的源码。

class CheckParam(object):
    def __init__(self):
        self.check_rules = dict()

    def register_check_param(self, check_param=None, url_prefix=''):
        if not isinstance(check_param, SubCheckParam):
            raise RuntimeError('check_param is not a SubCheckParam object. type: %s' % type(check_param))

        check_rules = check_param.get_check_rules()

        for check_rule in check_rules:
            url = check_rule.url
            version = check_rule.version
            methods = check_rule.methods
            f = check_rule.f

            self.check_rules[
                str({'url': url_prefix + url, 'version': sorted(version), 'methods': sorted(methods)})] = f

    def get_check_rules(self):
        return self.check_rules


class CheckRule(object):
    def __init__(self, url, version, methods, f):
        self.url = url
        self.version = version
        self.methods = methods
        self.f = f


class SubCheckParam(object):
    def __init__(self):
        self.check_rules = []

    def check(self, url=None, version=None, methods=None):
        methods = methods if methods is not None else DEFAULT_METHODS

        def decorator(f):
            if not url:
                raise ValueError('A non-empty url is required.')
            if not methods:
                raise ValueError('A non-empty method is required.')

            self.__add_check_rule(url, version, methods, f)
            return f

        return decorator

    def __add_check_rule(self, url, version, methods, f):
        if version and isinstance(version, list):
            version = sorted(version)
        else:
            version = []

        self.check_rules.append(CheckRule(url=url, version=version, methods=methods, f=f))

    def get_check_rules(self):
        return self.check_rules


def build_check_rule(url=None, version=None, methods=None):
    if not url:
        raise ValueError('A non-empty url is required.')
    if not methods:
        raise ValueError('A non-empty method is required.')

    if version and isinstance(version, list):
        version = sorted(version)
    else:
        version = []

    return str({'url': url, 'version': version, 'methods': sorted(methods)})

具体的使用,和前面说的一样,只要url path一致即可。下面的SelfFlask和SelfBlueprint是因为为了支持版本号路由而继承了Flask和Blueprint来进行了扩展。

app = SelfFlask(__name__)
app.config.from_object(configs)

check_inner = SubCheckParam()
check_outer = SubCheckParam()
check_manager = SubCheckParam()
check_owner = SubCheckParam()
check_member = SubCheckParam()
check_third = SubCheckParam()

inner = SelfBlueprint('inner', __name__)
outer = SelfBlueprint('outer', __name__)
manager = SelfBlueprint('manager', __name__)
owner = SelfBlueprint('owner', __name__)
member = SelfBlueprint('member', __name__)
third = SelfBlueprint('third', __name__)

from backend.versions import *

app.register_blueprint(inner, url_prefix='/inner')
app.register_blueprint(outer, url_prefix='/outer')
app.register_blueprint(manager, url_prefix='/manager')
app.register_blueprint(owner, url_prefix='/owner')
app.register_blueprint(member, url_prefix='/member')
app.register_blueprint(third, url_prefix='/third')

check_param.register_check_param(check_inner, url_prefix='/inner')
check_param.register_check_param(check_outer, url_prefix='/outer')
check_param.register_check_param(check_manager, url_prefix='/manager')
check_param.register_check_param(check_owner, url_prefix='/owner')
check_param.register_check_param(check_member, url_prefix='/member')
check_param.register_check_param(check_third, url_prefix='/third')

比如,对endpoint进行了修改,来支持版本号路由。

class SelfBlueprint(Blueprint):
    def route(self, rule, **options):
        """Like :meth:`Flask.route` but for a blueprint.  The endpoint for the
        :func:`url_for` function is prefixed with the name of the blueprint.
        """

        # set default methods
        methods = options.get('methods')
        if methods is None:
            options['methods'] = DEFAULT_METHODS

        def decorator(f):
            endpoint = options.pop("endpoint", f.__name__ + str(options.get('version')).replace('.', '_'))
            self.add_url_rule(rule, endpoint, f, **options)
            return f

        return decorator

五、结束语

在这里就把Flask主要的部分实现分析完成了,但是还有一些如Blueprint、Jinjia2等还没有说,如果读者感兴趣请自行查看源码。但是在生产环境还是建议不要使用模板引擎来渲染页面逻辑。最好还是做到前后端分离。

📝 首页

🌏 知识星球码农会锁

实战项目:「DDD+RPC分布式抽奖系统」、专属小册、问题解答、简历指导、架构图稿、视频课程

🐲 头条

⛳ 目录

  1. 源码 - :octocat: 公众号:bugstack虫洞栈 文章所涉及到的全部开源代码
  2. Java
  3. Spring
  4. 面向对象
  5. 中间件
  6. Netty 4.x
  7. 字节码编程
  8. 💯实战项目
  9. 部署 Dev-Ops
  10. 📚PDF 下载
  11. 关于

💋 精选

🐾 友链

建立本开源项目的初衷是基于个人学习与工作中对 Java 相关技术栈的总结记录,在这里也希望能帮助一些在学习 Java 过程中遇到问题的小伙伴,如果您需要转载本仓库的一些文章到自己的博客,请按照以下格式注明出处,谢谢合作。

作者:小傅哥
链接:https://bugstack.cn
来源:bugstack虫洞栈

2021年10月24日,小傅哥 的文章全部开源到代码库 CodeGuide 中,与同好同行,一起进步,共同维护。

这里我提供 3 种方式:

  1. 提出 Issue :在 Issue 中指出你觉得需要改进/完善的地方(能够独立解决的话,可以在提出 Issue 后再提交 PR )。
  2. 处理 Issue : 帮忙处理一些待处理的 Issue
  3. 提交 PR: 对于错别字/笔误这类问题可以直接提交PR,无需提交Issue 确认。

详细参考:CodeGuide 贡献指南 - 非常感谢你的支持,这里会留下你的足迹

  • 加群交流 本群的宗旨是给大家提供一个良好的技术学习交流平台,所以杜绝一切广告!由于微信群人满 100 之后无法加入,请扫描下方二维码先添加作者 “小傅哥” 微信(fustack),备注:加群。
微信:fustack

  • 公众号(bugstack虫洞栈) - 沉淀、分享、成长,专注于原创专题案例,以最易学习编程的方式分享知识,让自己和他人都能有所收获。
公众号:bugstack虫洞栈

感谢以下人员对本仓库做出的贡献或者对小傅哥的赞赏,当然不仅仅只有这些贡献者,这里就不一一列举了。如果你希望被添加到这个名单中,并且提交过 Issue 或者 PR,请与我联系。

Clone this wiki locally