D3379: wireprotov2: move response handling out of httppeer

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Sun Apr 15 00:22:37 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  And fix some bugs while we're here.
  
  The code for processing response data from the unified framing
  protocol is mostly peer agnostic. The peer-specific bits are the
  configuration of the client reactor and how I/O is performed. I
  initially implemented things in httppeer for expediency.
  
  This commit establishes a module for holding the peer API level
  code for the framing based protocol. Inside this module we have
  a class to help coordinate higher-level activities, such as managing
  response object.
  
  The client handler bits could be rolled into clientreactor. However,
  I want clientreactor to be sans I/O and I want it to only be
  concerned with protocol-level details, not higher-level concepts
  like how protocol events are converted into peer API concepts. I
  want clientreactor to receive a frame and then tell the caller what
  should probably be done about it. If we start putting things like
  future resolution into clientreactor, we'll constrain how the protocol
  can be used (e.g. by requiring futures).
  
  The new code is loosely based on what was in httppeer before. I
  changed things a bit around response handling. We now buffer the
  entire response "body" and then handle it as one atomic unit. This
  fixed a bug around decoding CBOR data that spanned multiple frames.
  I also fixed an off-by-one bug where we failed to read a single byte
  CBOR value at the end of the stream. That's why tests have changed.
  
  The new state of httppeer is much cleaner. It is largely agnostic
  about framing protocol implementation details. That's how it should
  be: the framing protocol is designed to be largely transport
  agnostic. We want peers merely putting bytes on the wire and telling
  the framing protocol where to read response data from.
  
  There's still a bit of work to be done here, especially for
  representing responses. But at least we're a step closer to having a
  higher-level peer interface that can be plugged into the SSH peer
  someday.
  
  I initially added this class to wireprotoframing. However, we'll
  eventually need version 2 specific functions to convert CBOR responses
  into data structures expected by the code calling commands. This
  needs to live somewhere. Since that code would be shared across peers,
  we need a common module. We have wireprotov1peer for the equivalent
  version 1 code. So I decided to establish wireprotov2peer.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3379

AFFECTED FILES
  mercurial/httppeer.py
  mercurial/wireprotov2peer.py
  tests/test-wireproto-command-known.t
  tests/test-wireproto-command-pushkey.t

CHANGE DETAILS

diff --git a/tests/test-wireproto-command-pushkey.t b/tests/test-wireproto-command-pushkey.t
--- a/tests/test-wireproto-command-pushkey.t
+++ b/tests/test-wireproto-command-pushkey.t
@@ -53,7 +53,7 @@
   received frame(size=*; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor) (glob)
   s>     0\r\n
   s>     \r\n
-  response: []
+  response: [True]
 
   $ sendhttpv2peer << EOF
   > command listkeys
diff --git a/tests/test-wireproto-command-known.t b/tests/test-wireproto-command-known.t
--- a/tests/test-wireproto-command-known.t
+++ b/tests/test-wireproto-command-known.t
@@ -50,7 +50,7 @@
   received frame(size=1; request=1; stream=2; streamflags=stream-begin; type=bytes-response; flags=eos|cbor)
   s>     0\r\n
   s>     \r\n
-  response: []
+  response: [b'']
 
 Single known node works
 
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
new file mode 100644
--- /dev/null
+++ b/mercurial/wireprotov2peer.py
@@ -0,0 +1,135 @@
+# wireprotov2peer.py - client side code for wire protocol version 2
+#
+# Copyright 2018 Gregory Szorc <gregory.szorc at gmail.com>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+from __future__ import absolute_import
+
+from .i18n import _
+from .thirdparty import (
+    cbor,
+)
+from . import (
+    error,
+    util,
+    wireprotoframing,
+)
+
+class clienthandler(object):
+    """Object to handle higher-level client activities.
+
+    The ``clientreactor`` is used to hold low-level state about the frame-based
+    protocol, such as which requests and streams are active. This type is used
+    for higher-level operations, such as reading frames from a socket, exposing
+    and managing a higher-level primitive for representing command responses,
+    etc. This class is what peers should probably use to bridge wire activity
+    with the higher-level peer API.
+    """
+
+    def __init__(self, ui, clientreactor):
+        self._ui = ui
+        self._reactor = clientreactor
+        self._requests = {}
+        self._futures = {}
+        self._responses = {}
+
+    def callcommand(self, command, args, f):
+        """Register a request to call a command.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        request, action, meta = self._reactor.callcommand(command, args)
+
+        if action != 'noop':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        rid = request.requestid
+        self._requests[rid] = request
+        self._futures[rid] = f
+        self._responses[rid] = {
+            'cbor': False,
+            'b': util.bytesio(),
+        }
+
+        return iter(())
+
+    def flushcommands(self):
+        """Flush all queued commands.
+
+        Returns an iterable of frames that should be sent over the wire.
+        """
+        action, meta = self._reactor.flushcommands()
+
+        if action != 'sendframes':
+            raise error.ProgrammingError('%s not yet supported' % action)
+
+        return meta['framegen']
+
+    def readframe(self, fh):
+        """Attempt to read and process a frame.
+
+        Returns None if no frame was read. Presumably this means EOF.
+        """
+        frame = wireprotoframing.readframe(fh)
+        if frame is None:
+            # TODO tell reactor?
+            return
+
+        self._ui.note(_('received %r\n') % frame)
+        self._processframe(frame)
+
+        return True
+
+    def _processframe(self, frame):
+        """Process a single read frame."""
+
+        action, meta = self._reactor.onframerecv(frame)
+
+        if action == 'error':
+            e = error.RepoError(meta['message'])
+
+            if frame.requestid in self._futures:
+                self._futures[frame.requestid].set_exception(e)
+            else:
+                raise e
+
+        if frame.requestid not in self._requests:
+            raise error.ProgrammingError(
+                'received frame for unknown request; this is either a bug in '
+                'the clientreactor not screening for this or this instance was '
+                'never told about this request: %r' % frame)
+
+        response = self._responses[frame.requestid]
+
+        if action == 'responsedata':
+            response['b'].write(meta['data'])
+
+            if meta['cbor']:
+                response['cbor'] = True
+
+            if meta['eos']:
+                if meta['cbor']:
+                    # If CBOR, decode every object.
+                    b = response['b']
+
+                    size = b.tell()
+                    b.seek(0)
+
+                    decoder = cbor.CBORDecoder(b)
+
+                    result = []
+                    while b.tell() < size:
+                        result.append(decoder.decode())
+                else:
+                    result = [response['b'].getvalue()]
+
+                self._futures[frame.requestid].set_result(result)
+
+                del self._requests[frame.requestid]
+                del self._futures[frame.requestid]
+
+        else:
+            raise error.ProgrammingError(
+                'unhandled action from clientreactor: %s' % action)
diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -13,7 +13,6 @@
 import os
 import socket
 import struct
-import sys
 import tempfile
 import weakref
 
@@ -36,6 +35,7 @@
     wireprotoframing,
     wireprototypes,
     wireprotov1peer,
+    wireprotov2peer,
     wireprotov2server,
 )
 
@@ -522,27 +522,20 @@
     reactor = wireprotoframing.clientreactor(hasmultiplesend=False,
                                              buffersends=True)
 
+    handler = wireprotov2peer.clienthandler(ui, reactor)
+
     url = '%s/%s' % (apiurl, permission)
 
     if len(requests) > 1:
         url += '/multirequest'
     else:
         url += '/%s' % requests[0][0]
 
-    # Request ID to (request, future)
-    requestmap = {}
-
     for command, args, f in requests:
-        request, action, meta = reactor.callcommand(command, args)
-        assert action == 'noop'
-
-        requestmap[request.requestid] = (request, f)
-
-    action, meta = reactor.flushcommands()
-    assert action == 'sendframes'
+        assert not list(handler.callcommand(command, args, f))
 
     # TODO stream this.
-    body = b''.join(map(bytes, meta['framegen']))
+    body = b''.join(map(bytes, handler.flushcommands()))
 
     # TODO modify user-agent to reflect v2
     headers = {
@@ -564,7 +557,7 @@
         ui.traceback()
         raise IOError(None, e)
 
-    return reactor, requestmap, res
+    return handler, res
 
 class queuedcommandfuture(pycompat.futures.Future):
     """Wraps result() on command futures to trigger submission on call."""
@@ -684,17 +677,15 @@
             'pull': 'ro',
         }[permissions.pop()]
 
-        reactor, requests, resp = sendv2request(
+        handler, resp = sendv2request(
             self._ui, self._opener, self._requestbuilder, self._apiurl,
             permission, calls)
 
         # TODO we probably want to validate the HTTP code, media type, etc.
 
         self._responseexecutor = pycompat.futures.ThreadPoolExecutor(1)
         self._responsef = self._responseexecutor.submit(self._handleresponse,
-                                                        reactor,
-                                                        requests,
-                                                        resp)
+                                                        handler, resp)
 
     def close(self):
         if self._closed:
@@ -723,62 +714,11 @@
 
             self._futures = None
 
-    def _handleresponse(self, reactor, requests, resp):
+    def _handleresponse(self, handler, resp):
         # Called in a thread to read the response.
 
-        results = {k: [] for k in requests}
-
-        while True:
-            frame = wireprotoframing.readframe(resp)
-            if frame is None:
-                break
-
-            self._ui.note(_('received %r\n') % frame)
-
-            # Guard against receiving a frame with a request ID that we
-            # didn't issue. This should never happen.
-            request, f = requests.get(frame.requestid, [None, None])
-
-            action, meta = reactor.onframerecv(frame)
-
-            if action == 'responsedata':
-                assert request.requestid == meta['request'].requestid
-
-                result = results[request.requestid]
-
-                if meta['cbor']:
-                    payload = util.bytesio(meta['data'])
-
-                    decoder = cbor.CBORDecoder(payload)
-                    while payload.tell() + 1 < len(meta['data']):
-                        try:
-                            result.append(decoder.decode())
-                        except Exception:
-                            pycompat.future_set_exception_info(
-                                f, sys.exc_info()[1:])
-                            continue
-                else:
-                    result.append(meta['data'])
-
-                if meta['eos']:
-                    f.set_result(result)
-                    del results[request.requestid]
-
-            elif action == 'error':
-                e = error.RepoError(meta['message'])
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
-
-            else:
-                e = error.ProgrammingError('unhandled action: %s' % action)
-
-                if f:
-                    f.set_exception(e)
-                else:
-                    raise e
+        while handler.readframe(resp):
+            pass
 
 # TODO implement interface for version 2 peers
 @zi.implementer(repository.ipeerconnection, repository.ipeercapabilities,



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list