HTTP keepalive? (Was Re: Pullable Mercurial kernel repo)
Matt Mackall
mpm at selenic.com
Thu Jun 16 05:48:47 UTC 2005
On Wed, Jun 15, 2005 at 10:27:11PM -0700, Arun Sharma wrote:
> Matt Mackall wrote:
> >There's now a public Mercurial repo synced with git on kernel.org:
> >
> >$ time hg -v pull http://www.kernel.org/hg/
> >requesting all changes
> >adding changesets
> >adding manifests
> >adding file revisions
> >122216970 bytes of data transfered
> >modified 23433 files, added 30060 changesets and 203475 new revisions
> >
> >real 4m20.664s
> >user 1m25.236s
> >sys 0m7.802s
> >
>
> I tried this from work this morning (actually I downloaded the 100+MB
> tarball you posted after hg-0.5b) and did a pull on top of it and it was
> slow as hell..
>
> On debugging a little bit, it looked like hg was repeatedly opening lots of
> connections to our http_proxy (non-authenticating).
If you didn't have a tip Mercurial, you might have run into some bugs
in finding the outstanding changesets. It would in fact go on forever.
But with those fixes in, it should make ~20 small requests to figure
out that there are 1800 new changesets. And then a single request
after that for the bulk data transfer (about 3.5M).
> But I can't reproduce the same behavior at home (without the proxy). I'm
> not sure if the attached patch will help (I'll test it tomorrow). Any other
> suggestions?
Well, hmm. I'd like keepalive. It'll cut the number of packet round
trips in half or more. And it's also good for speeding up old-http, of
course.
But I kind of shy away from pulling in big chunks of other packages
unless it's unavoidable. Can you run the test at work with -d on up
until it says "adding changesets" and figure out if it's doing an
unreasonable number of connections?
>
> -Arun
> Add keepalive support.
>
> --- a/mercurial/hg.py Thu Jun 16 03:52:41 2005
> +++ b/mercurial/hg.py Thu Jun 16 04:51:32 2005
> @@ -1343,7 +1343,8 @@
> # Note: urllib2 takes proxy values from the environment and those will
> # take precedence
>
> - proxy_handler = urllib2.BaseHandler()
> + import keepalive
> + proxy_handler = keepalive.HTTPHandler()
> if host and not no_proxy:
> proxy_handler = urllib2.ProxyHandler({"http" : "http://" + host})
>
> --- /dev/null Thu Jun 16 03:52:41 2005
> +++ b/mercurial/keepalive.py Thu Jun 16 04:51:32 2005
> @@ -0,0 +1,587 @@
> +# This library is free software; you can redistribute it and/or
> +# modify it under the terms of the GNU Lesser General Public
> +# License as published by the Free Software Foundation; either
> +# version 2.1 of the License, or (at your option) any later version.
> +#
> +# This library is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> +# Lesser General Public License for more details.
> +#
> +# You should have received a copy of the GNU Lesser General Public
> +# License along with this library; if not, write to the
> +# Free Software Foundation, Inc.,
> +# 59 Temple Place, Suite 330,
> +# Boston, MA 02111-1307 USA
> +
> +# This file is part of urlgrabber, a high-level cross-protocol url-grabber
> +# Copyright 2002-2004 Michael D. Stenner, Ryan Tomayko
> +
> +"""An HTTP handler for urllib2 that supports HTTP 1.1 and keepalive.
> +
> +>>> import urllib2
> +>>> from keepalive import HTTPHandler
> +>>> keepalive_handler = HTTPHandler()
> +>>> opener = urllib2.build_opener(keepalive_handler)
> +>>> urllib2.install_opener(opener)
> +>>>
> +>>> fo = urllib2.urlopen('http://www.python.org')
> +
> +If a connection to a given host is requested, and all of the existing
> +connections are still in use, another connection will be opened. If
> +the handler tries to use an existing connection but it fails in some
> +way, it will be closed and removed from the pool.
> +
> +To remove the handler, simply re-run build_opener with no arguments, and
> +install that opener.
> +
> +You can explicitly close connections by using the close_connection()
> +method of the returned file-like object (described below) or you can
> +use the handler methods:
> +
> + close_connection(host)
> + close_all()
> + open_connections()
> +
> +NOTE: using the close_connection and close_all methods of the handler
> +should be done with care when using multiple threads.
> + * there is nothing that prevents another thread from creating new
> + connections immediately after connections are closed
> + * no checks are done to prevent in-use connections from being closed
> +
> +>>> keepalive_handler.close_all()
> +
> +EXTRA ATTRIBUTES AND METHODS
> +
> + Upon a status of 200, the object returned has a few additional
> + attributes and methods, which should not be used if you want to
> + remain consistent with the normal urllib2-returned objects:
> +
> + close_connection() - close the connection to the host
> + readlines() - you know, readlines()
> + status - the return status (ie 404)
> + reason - english translation of status (ie 'File not found')
> +
> + If you want the best of both worlds, use this inside an
> + AttributeError-catching try:
> +
> + >>> try: status = fo.status
> + >>> except AttributeError: status = None
> +
> + Unfortunately, these are ONLY there if status == 200, so it's not
> + easy to distinguish between non-200 responses. The reason is that
> + urllib2 tries to do clever things with error codes 301, 302, 401,
> + and 407, and it wraps the object upon return.
> +
> + For python versions earlier than 2.4, you can avoid this fancy error
> + handling by setting the module-level global HANDLE_ERRORS to zero.
> + You see, prior to 2.4, it's the HTTP Handler's job to determine what
> + to handle specially, and what to just pass up. HANDLE_ERRORS == 0
> + means "pass everything up". In python 2.4, however, this job no
> + longer belongs to the HTTP Handler and is now done by a NEW handler,
> + HTTPErrorProcessor. Here's the bottom line:
> +
> + python version < 2.4
> + HANDLE_ERRORS == 1 (default) pass up 200, treat the rest as
> + errors
> + HANDLE_ERRORS == 0 pass everything up, error processing is
> + left to the calling code
> + python version >= 2.4
> + HANDLE_ERRORS == 1 pass up 200, treat the rest as errors
> + HANDLE_ERRORS == 0 (default) pass everything up, let the
> + other handlers (specifically,
> + HTTPErrorProcessor) decide what to do
> +
> + In practice, setting the variable either way makes little difference
> + in python 2.4, so for the most consistent behavior across versions,
> + you probably just want to use the defaults, which will give you
> + exceptions on errors.
> +
> +"""
> +
> +# $Id: keepalive.py,v 1.9 2005/02/14 21:55:07 mstenner Exp $
> +
> +import urllib2
> +import httplib
> +import socket
> +import thread
> +
> +DEBUG = 0
> +def DBPRINT(*args): print ' '.join(args)
> +
> +import sys
> +_python_version = map(int, sys.version.split()[0].split('.'))
> +if _python_version < [2, 4]: HANDLE_ERRORS = 1
> +else: HANDLE_ERRORS = 0
> +
> +class ConnectionManager:
> + """
> + The connection manager must be able to:
> + * keep track of all existing
> + """
> + def __init__(self):
> + self._lock = thread.allocate_lock()
> + self._hostmap = {} # map hosts to a list of connections
> + self._connmap = {} # map connections to host
> + self._readymap = {} # map connection to ready state
> +
> + def add(self, host, connection, ready):
> + self._lock.acquire()
> + try:
> + if not self._hostmap.has_key(host): self._hostmap[host] = []
> + self._hostmap[host].append(connection)
> + self._connmap[connection] = host
> + self._readymap[connection] = ready
> + finally:
> + self._lock.release()
> +
> + def remove(self, connection):
> + self._lock.acquire()
> + try:
> + try:
> + host = self._connmap[connection]
> + except KeyError:
> + pass
> + else:
> + del self._connmap[connection]
> + del self._readymap[connection]
> + self._hostmap[host].remove(connection)
> + if not self._hostmap[host]: del self._hostmap[host]
> + finally:
> + self._lock.release()
> +
> + def set_ready(self, connection, ready):
> + try: self._readymap[connection] = ready
> + except KeyError: pass
> +
> + def get_ready_conn(self, host):
> + conn = None
> + self._lock.acquire()
> + try:
> + if self._hostmap.has_key(host):
> + for c in self._hostmap[host]:
> + if self._readymap[c]:
> + self._readymap[c] = 0
> + conn = c
> + break
> + finally:
> + self._lock.release()
> + return conn
> +
> + def get_all(self, host=None):
> + if host:
> + return list(self._hostmap.get(host, []))
> + else:
> + return dict(self._hostmap)
> +
> +class HTTPHandler(urllib2.HTTPHandler):
> + def __init__(self):
> + self._cm = ConnectionManager()
> +
> + #### Connection Management
> + def open_connections(self):
> + """return a list of connected hosts and the number of connections
> + to each. [('foo.com:80', 2), ('bar.org', 1)]"""
> + return [(host, len(li)) for (host, li) in self._cm.get_all().items()]
> +
> + def close_connection(self, host):
> + """close connection(s) to <host>
> + host is the host:port spec, as in 'www.cnn.com:8080' as passed in.
> + no error occurs if there is no connection to that host."""
> + for h in self._cm.get_all(host):
> + self._cm.remove(h)
> + h.close()
> +
> + def close_all(self):
> + """close all open connections"""
> + for host, conns in self._cm.get_all().items():
> + for h in conns:
> + self._cm.remove(h)
> + h.close()
> +
> + def _request_closed(self, request, host, connection):
> + """tells us that this request is now closed and the the
> + connection is ready for another request"""
> + self._cm.set_ready(connection, 1)
> +
> + def _remove_connection(self, host, connection, close=0):
> + if close: connection.close()
> + self._cm.remove(connection)
> +
> + #### Transaction Execution
> + def http_open(self, req):
> + return self.do_open(HTTPConnection, req)
> +
> + def do_open(self, http_class, req):
> + host = req.get_host()
> + if not host:
> + raise urllib2.URLError('no host given')
> +
> + try:
> + h = self._cm.get_ready_conn(host)
> + while h:
> + r = self._reuse_connection(h, req, host)
> +
> + # if this response is non-None, then it worked and we're
> + # done. Break out, skipping the else block.
> + if r: break
> +
> + # connection is bad - possibly closed by server
> + # discard it and ask for the next free connection
> + h.close()
> + self._cm.remove(h)
> + h = self._cm.get_ready_conn(host)
> + else:
> + # no (working) free connections were found. Create a new one.
> + h = http_class(host)
> + if DEBUG: DBPRINT("creating new connection to %s (%d)" % \
> + (host, id(h)))
> + self._cm.add(host, h, 0)
> + self._start_transaction(h, req)
> + r = h.getresponse()
> + except (socket.error, httplib.HTTPException), err:
> + raise urllib2.URLError(err)
> +
> + # if not a persistent connection, don't try to reuse it
> + if r.will_close: self._cm.remove(h)
> +
> + if DEBUG: DBPRINT("STATUS: %s, %s" % (r.status, r.reason))
> + r._handler = self
> + r._host = host
> + r._url = req.get_full_url()
> + r._connection = h
> + r.code = r.status
> +
> + if r.status == 200 or not HANDLE_ERRORS:
> + return r
> + else:
> + return self.parent.error('http', req, r, r.status, r.reason, r.msg)
> +
> +
> + def _reuse_connection(self, h, req, host):
> + """start the transaction with a re-used connection
> + return a response object (r) upon success or None on failure.
> + This DOES not close or remove bad connections in cases where
> + it returns. However, if an unexpected exception occurs, it
> + will close and remove the connection before re-raising.
> + """
> + try:
> + self._start_transaction(h, req)
> + r = h.getresponse()
> + # note: just because we got something back doesn't mean it
> + # worked. We'll check the version below, too.
> + except (socket.error, httplib.HTTPException):
> + r = None
> + except:
> + # adding this block just in case we've missed
> + # something we will still raise the exception, but
> + # lets try and close the connection and remove it
> + # first. We previously got into a nasty loop
> + # where an exception was uncaught, and so the
> + # connection stayed open. On the next try, the
> + # same exception was raised, etc. The tradeoff is
> + # that it's now possible this call will raise
> + # a DIFFERENT exception
> + if DEBUG: DBPRINT("unexpected exception - " \
> + "closing connection to %s (%d)" % (host, id(h)))
> + self._cm.remove(h)
> + h.close()
> + raise
> +
> + if r is None or r.version == 9:
> + # httplib falls back to assuming HTTP 0.9 if it gets a
> + # bad header back. This is most likely to happen if
> + # the socket has been closed by the server since we
> + # last used the connection.
> + if DEBUG: DBPRINT("failed to re-use connection to %s (%d)" \
> + % (host, id(h)))
> + r = None
> + else:
> + if DEBUG: DBPRINT("re-using connection to %s (%d)" % (host, id(h)))
> +
> + return r
> +
> + def _start_transaction(self, h, req):
> + try:
> + if req.has_data():
> + data = req.get_data()
> + h.putrequest('POST', req.get_selector())
> + if not req.headers.has_key('Content-type'):
> + h.putheader('Content-type',
> + 'application/x-www-form-urlencoded')
> + if not req.headers.has_key('Content-length'):
> + h.putheader('Content-length', '%d' % len(data))
> + else:
> + h.putrequest('GET', req.get_selector())
> + except (socket.error, httplib.HTTPException), err:
> + raise urllib2.URLError(err)
> +
> + for args in self.parent.addheaders:
> + h.putheader(*args)
> + for k, v in req.headers.items():
> + h.putheader(k, v)
> + h.endheaders()
> + if req.has_data():
> + h.send(data)
> +
> +class HTTPResponse(httplib.HTTPResponse):
> + # we need to subclass HTTPResponse in order to
> + # 1) add readline() and readlines() methods
> + # 2) add close_connection() methods
> + # 3) add info() and geturl() methods
> +
> + # in order to add readline(), read must be modified to deal with a
> + # buffer. example: readline must read a buffer and then spit back
> + # one line at a time. The only real alternative is to read one
> + # BYTE at a time (ick). Once something has been read, it can't be
> + # put back (ok, maybe it can, but that's even uglier than this),
> + # so if you THEN do a normal read, you must first take stuff from
> + # the buffer.
> +
> + # the read method wraps the original to accomodate buffering,
> + # although read() never adds to the buffer.
> + # Both readline and readlines have been stolen with almost no
> + # modification from socket.py
> +
> +
> + def __init__(self, sock, debuglevel=0, strict=0, method=None):
> + if method: # the httplib in python 2.3 uses the method arg
> + httplib.HTTPResponse.__init__(self, sock, debuglevel, method)
> + else: # 2.2 doesn't
> + httplib.HTTPResponse.__init__(self, sock, debuglevel)
> + self.fileno = sock.fileno
> + self.code = None
> + self._rbuf = ''
> + self._rbufsize = 8096
> + self._handler = None # inserted by the handler later
> + self._host = None # (same)
> + self._url = None # (same)
> + self._connection = None # (same)
> +
> + _raw_read = httplib.HTTPResponse.read
> +
> + def close(self):
> + if self.fp:
> + self.fp.close()
> + self.fp = None
> + if self._handler:
> + self._handler._request_closed(self, self._host,
> + self._connection)
> +
> + def close_connection(self):
> + self._handler._remove_connection(self._host, self._connection, close=1)
> + self.close()
> +
> + def info(self):
> + return self.msg
> +
> + def geturl(self):
> + return self._url
> +
> + def read(self, amt=None):
> + # the _rbuf test is only in this first if for speed. It's not
> + # logically necessary
> + if self._rbuf and not amt is None:
> + L = len(self._rbuf)
> + if amt > L:
> + amt -= L
> + else:
> + s = self._rbuf[:amt]
> + self._rbuf = self._rbuf[amt:]
> + return s
> +
> + s = self._rbuf + self._raw_read(amt)
> + self._rbuf = ''
> + return s
> +
> + def readline(self, limit=-1):
> + data = ""
> + i = self._rbuf.find('\n')
> + while i < 0 and not (0 < limit <= len(self._rbuf)):
> + new = self._raw_read(self._rbufsize)
> + if not new: break
> + i = new.find('\n')
> + if i >= 0: i = i + len(self._rbuf)
> + self._rbuf = self._rbuf + new
> + if i < 0: i = len(self._rbuf)
> + else: i = i+1
> + if 0 <= limit < len(self._rbuf): i = limit
> + data, self._rbuf = self._rbuf[:i], self._rbuf[i:]
> + return data
> +
> + def readlines(self, sizehint = 0):
> + total = 0
> + list = []
> + while 1:
> + line = self.readline()
> + if not line: break
> + list.append(line)
> + total += len(line)
> + if sizehint and total >= sizehint:
> + break
> + return list
> +
> +
> +class HTTPConnection(httplib.HTTPConnection):
> + # use the modified response class
> + response_class = HTTPResponse
> +
> +#########################################################################
> +##### TEST FUNCTIONS
> +#########################################################################
> +
> +def error_handler(url):
> + global HANDLE_ERRORS
> + orig = HANDLE_ERRORS
> + keepalive_handler = HTTPHandler()
> + opener = urllib2.build_opener(keepalive_handler)
> + urllib2.install_opener(opener)
> + pos = {0: 'off', 1: 'on'}
> + for i in (0, 1):
> + print " fancy error handling %s (HANDLE_ERRORS = %i)" % (pos[i], i)
> + HANDLE_ERRORS = i
> + try:
> + fo = urllib2.urlopen(url)
> + foo = fo.read()
> + fo.close()
> + try: status, reason = fo.status, fo.reason
> + except AttributeError: status, reason = None, None
> + except IOError, e:
> + print " EXCEPTION: %s" % e
> + raise
> + else:
> + print " status = %s, reason = %s" % (status, reason)
> + HANDLE_ERRORS = orig
> + hosts = keepalive_handler.open_connections()
> + print "open connections:", hosts
> + keepalive_handler.close_all()
> +
> +def continuity(url):
> + import md5
> + format = '%25s: %s'
> +
> + # first fetch the file with the normal http handler
> + opener = urllib2.build_opener()
> + urllib2.install_opener(opener)
> + fo = urllib2.urlopen(url)
> + foo = fo.read()
> + fo.close()
> + m = md5.new(foo)
> + print format % ('normal urllib', m.hexdigest())
> +
> + # now install the keepalive handler and try again
> + opener = urllib2.build_opener(HTTPHandler())
> + urllib2.install_opener(opener)
> +
> + fo = urllib2.urlopen(url)
> + foo = fo.read()
> + fo.close()
> + m = md5.new(foo)
> + print format % ('keepalive read', m.hexdigest())
> +
> + fo = urllib2.urlopen(url)
> + foo = ''
> + while 1:
> + f = fo.readline()
> + if f: foo = foo + f
> + else: break
> + fo.close()
> + m = md5.new(foo)
> + print format % ('keepalive readline', m.hexdigest())
> +
> +def comp(N, url):
> + print ' making %i connections to:\n %s' % (N, url)
> +
> + sys.stdout.write(' first using the normal urllib handlers')
> + # first use normal opener
> + opener = urllib2.build_opener()
> + urllib2.install_opener(opener)
> + t1 = fetch(N, url)
> + print ' TIME: %.3f s' % t1
> +
> + sys.stdout.write(' now using the keepalive handler ')
> + # now install the keepalive handler and try again
> + opener = urllib2.build_opener(HTTPHandler())
> + urllib2.install_opener(opener)
> + t2 = fetch(N, url)
> + print ' TIME: %.3f s' % t2
> + print ' improvement factor: %.2f' % (t1/t2, )
> +
> +def fetch(N, url, delay=0):
> + lens = []
> + starttime = time.time()
> + for i in range(N):
> + if delay and i > 0: time.sleep(delay)
> + fo = urllib2.urlopen(url)
> + foo = fo.read()
> + fo.close()
> + lens.append(len(foo))
> + diff = time.time() - starttime
> +
> + j = 0
> + for i in lens[1:]:
> + j = j + 1
> + if not i == lens[0]:
> + print "WARNING: inconsistent length on read %i: %i" % (j, i)
> +
> + return diff
> +
> +def test_timeout(url):
> + global DEBUG, DBPRINT
> + dbp = DBPRINT
> + def DBPRINT(*args): print ' ' + ' '.join(args)
> + DEBUG=1
> + print " fetching the file to establish a connection"
> + fo = urllib2.urlopen(url)
> + data1 = fo.read()
> + fo.close()
> +
> + i = 20
> + print " waiting %i seconds for the server to close the connection" % i
> + while i > 0:
> + sys.stdout.write('\r %2i' % i)
> + sys.stdout.flush()
> + time.sleep(1)
> + i -= 1
> + sys.stderr.write('\r')
> +
> + print " fetching the file a second time"
> + fo = urllib2.urlopen(url)
> + data2 = fo.read()
> + fo.close()
> +
> + if data1 == data2:
> + print ' data are identical'
> + else:
> + print ' ERROR: DATA DIFFER'
> +
> + DEBUG=0
> + DBPRINT = dbp
> +
> +
> +def test(url, N=10):
> + print "checking error hander (do this on a non-200)"
> + try: error_handler(url)
> + except IOError, e:
> + print "exiting - exception will prevent further tests"
> + sys.exit()
> + print
> + print "performing continuity test (making sure stuff isn't corrupted)"
> + continuity(url)
> + print
> + print "performing speed comparison"
> + comp(N, url)
> + print
> + print "performing dropped-connection check"
> + test_timeout(url)
> +
> +if __name__ == '__main__':
> + import time
> + import sys
> + try:
> + N = int(sys.argv[1])
> + url = sys.argv[2]
> + except:
> + print "%s <integer> <url>" % sys.argv[0]
> + else:
> + test(url, N)
--
Mathematics is the supreme nostalgia of our time.
More information about the Mercurial
mailing list