Skip to main content

urllib3 源码分析 I - PoolManager

·791 words·4 mins

本文通过 urllib3 来分析 PoolManager 的设计。所使用代码版本为 1.23,commit sha 7c216f433e39e184b84cbfa49e41135a89e4baa0

urllib3 自带连接池,通过 PoolManager 进行管理。默认创建 10 个连接池

# urllib3/poolmanager.py
class PoolManager(RequestMethods):
    proxy = None

    def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
        RequestMethods.__init__(self, headers)
        self.connection_pool_kw = connection_pool_kw
        self.pools = RecentlyUsedContainer(num_pools, dispose_func=lambda p: p.close())
        # Locally set the pool classes and keys so other PoolManagers can
        # override them.
        self.pool_classes_by_scheme = pool_classes_by_scheme
        self.key_fn_by_scheme = key_fn_by_scheme.copy()

PoolManger 继承自 RequestMethods 这个 Mixin,其为 PoolManager 提供了 request 方法

poolsRecentlyUsedContainer 实例,其具备线程安全的 Dict 访问,并且能够通过 LRU 算法在数据长度达到 maxsize 时将最久未被使用的元素销毁。其中的每个元素都是一个连接池实例

我们主要来看一下 __getitem____setitem__ 的实现

# urllib3/_collections.py
class RecentlyUsedContainer(MutableMapping):

    ContainerCls = OrderedDict

    def __init__(self, maxsize=10, dispose_func=None):
        self._maxsize = maxsize
        self.dispose_func = dispose_func

        self._container = self.ContainerCls()
        self.lock = RLock()

    def __getitem__(self, key):
        # Re-insert the item, moving it to the end of the eviction line.
        with self.lock:
            item = self._container.pop(key)
            self._container[key] = item
            return item

    def __setitem__(self, key, value):
        evicted_value = _Null
        with self.lock:
            # Possibly evict the existing value of 'key'
            evicted_value = self._container.get(key, _Null)  # 需要销毁先前存在的元素
            self._container[key] = value

            # If we didn't evict an existing value, we might have to evict the
            # least recently used item from the beginning of the container.
            if len(self._container) > self._maxsize:
                _key, evicted_value = self._container.popitem(last=False)

        if self.dispose_func and evicted_value is not _Null:
            self.dispose_func(evicted_value)

大致就是内部维护了一个 OrderedDict,每次访问元素时借助 self._container[key] = value 将此元素放到最后,那么最前面的元素就是最久未被使用的。如果长度超过 maxsize 后,就把这个元素移除并通过 dispose_func 进行销毁

这个容器里的元素便是 Connection Pool。在我们调用 PoolManager.reqeust 方法时,若当前没有 Connnection Pool,则会创建。这是一种惰性初始化的方法,我们也可以直接通过 PoolManager.connection_from_host 来直接创建 Connnection Pool

PoolManager.request 方法是继承 Mixin RequestMethods 所获得的。其做了一些预处理,比如 urlencode、生成 HTTP Body 等

# urllib3/request.py
class RequestMethods(object):

    _encode_url_methods = set(['DELETE', 'GET', 'HEAD', 'OPTIONS'])

    def __init__(self, headers=None):
        self.headers = headers or {}

    def request(self, method, url, fields=None, headers=None, **urlopen_kw):
        method = method.upper()
        urlopen_kw['request_url'] = url
        if method in self._encode_url_methods:
            return self.request_encode_url(method, url, fields=fields,
                                           headers=headers,
                                           **urlopen_kw)
        # ...

    def request_encode_url(self, method, url, fields=None, headers=None, **urlopen_kw):
        if headers is None:
            headers = self.headers

        extra_kw = {'headers': headers}
        extra_kw.update(urlopen_kw)

        if fields:
            url += '?' + urlencode(fields)

        return self.urlopen(method, url, **extra_kw)

PoolManager.urllopen 实际上是对 HTTPConnectionPool.urlopen 的一层包裹

# urllib3/poolmanager.py
class PoolManager(RequestMethods):

    def urlopen(self, method, url, redirect=True, **kw):
        u = parse_url(url)
        conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)

        kw['assert_same_host'] = False
        kw['redirect'] = False

        if 'headers' not in kw:
            kw['headers'] = self.headers.copy()

        if self.proxy is not None and u.scheme == "http":
            response = conn.urlopen(method, url, **kw)
        else:
            response = conn.urlopen(method, u.request_uri, **kw)  # 调用连接池的 urlopen 方法

        redirect_location = redirect and response.get_redirect_location()
        if not redirect_location:
            return response
        # ... 递归调用 urlopen 处理 Redirect

connection_from_host 获取了对应的连接池

# urllib3/poolmanager.py
class PoolManager(RequestMethods):

    def connection_from_host(self, host, port=None, scheme='http', pool_kwargs=None):
        if not host:
            raise LocationValueError("No host specified.")

        # 将 pool_kwargs 和初始化 PoolManager 时传入的参数进行 merge
        request_context = self._merge_pool_kwargs(pool_kwargs)
        request_context['scheme'] = scheme or 'http'
        if not port:
            # port_by_scheme = {'http': 80, 'https': 443}
            port = port_by_scheme.get(request_context['scheme'].lower(), 80)
        request_context['port'] = port
        request_context['host'] = host

        return self.connection_from_context(request_context)

    def connection_from_context(self, request_context):
        scheme = request_context['scheme'].lower()
        pool_key_constructor = self.key_fn_by_scheme[scheme]
        pool_key = pool_key_constructor(request_context)
        return self.connection_from_pool_key(pool_key, request_context=request_context)

    def connection_from_pool_key(self, pool_key, request_context=None):
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool

这部分的逻辑是根据请求信息(Request Context)生成一个 key,然后从 pools 中取对应的连接池实例。如果没有对应的连接池,那么便创建一个。这个 key 是一个名为 PoolKeynamedtuple 实例,包含了以下的字段。如果请求中没有字段所需要的信息,那么便为 None,可以参考 _default_key_normalizer 的处理

_key_fields = (
    'key_scheme',  # str
    'key_host',  # str
    'key_port',  # int
    'key_timeout',  # int or float or Timeout
    'key_retries',  # int or Retry
    'key_strict',  # bool
    'key_block',  # bool
    'key_source_address',  # str
    'key_key_file',  # str
    'key_cert_file',  # str
    'key_cert_reqs',  # str
    'key_ca_certs',  # str
    'key_ssl_version',  # str
    'key_ca_cert_dir',  # str
    'key_ssl_context',  # instance of ssl.SSLContext or urllib3.util.ssl_.SSLContext
    'key_maxsize',  # int
    'key_headers',  # dict
    'key__proxy',  # parsed proxy url
    'key__proxy_headers',  # dict
    'key_socket_options',  # list of (level (int), optname (int), value (int or str)) tuples
    'key__socks_options',  # dict
    'key_assert_hostname',  # bool or string
    'key_assert_fingerprint',  # str
)

PoolKey = collections.namedtuple('PoolKey', _key_fields)

但是我们需要注意在 PoolManager.urlopen 中,我们仅向 connection_from_host 传入了 schemehostport 三个参数

conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme)

所以相当于我们以 schemehostport 作为 key

# urllib3/poolmanager.py

pool_classes_by_scheme = {
    'http': HTTPConnectionPool,
    'https': HTTPSConnectionPool,
}

class PoolManager(RequestMethods):

    def _new_pool(self, scheme, host, port, request_context=None):
        pool_cls = self.pool_classes_by_scheme[scheme]
        if request_context is None:
            request_context = self.connection_pool_kw.copy()

        # Although the context has everything necessary to create the pool,
        # this function has historically only used the scheme, host, and port
        # in the positional args. When an API change is acceptable these can
        # be removed.
        for key in ('scheme', 'host', 'port'):
            request_context.pop(key, None)

        if scheme == 'http':
            for kw in SSL_KEYWORDS:
                request_context.pop(kw, None)

        return pool_cls(host, port, **request_context)

_new_pool 就是在创建连接池(ConnectionPool)实例,这个放到下一篇文章中进行分析

PoolManager 和 ConnectionPool 的关系可以表示如下

---------------------------------------------------
|                   PoolManger                    |
| ------------------  ------------------          |
| | ConnectionPool |  | ConnectionPool |          |
| |                |  |                |          |
| | --------       |  | --------       |  ...     |
| | | Conn |  ...  |  | | Conn |  ...  |          |
| | --------       |  | --------       |          |
| ------------------  ------------------          |
---------------------------------------------------