D4474: wireprotov2peer: stream decoded responses
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Wed Sep 5 16:23:14 UTC 2018
indygreg updated this revision to Diff 10792.
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D4474?vs=10771&id=10792
REVISION DETAIL
https://phab.mercurial-scm.org/D4474
AFFECTED FILES
mercurial/debugcommands.py
mercurial/wireprotov2peer.py
tests/test-http-api-httpv2.t
tests/test-wireproto-command-capabilities.t
CHANGE DETAILS
diff --git a/tests/test-wireproto-command-capabilities.t b/tests/test-wireproto-command-capabilities.t
--- a/tests/test-wireproto-command-capabilities.t
+++ b/tests/test-wireproto-command-capabilities.t
@@ -349,10 +349,7 @@
s> 0\r\n
s> \r\n
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- response: [
- {
- b'status': b'ok'
- },
+ response: gen[
{
b'commands': {
b'branchmap': {
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -225,10 +225,7 @@
s> 0\r\n
s> \r\n
received frame(size=0; request=1; stream=2; streamflags=; type=command-response; flags=eos)
- response: [
- {
- b'status': b'ok'
- },
+ response: gen[
b'customreadonly bytes response'
]
diff --git a/mercurial/wireprotov2peer.py b/mercurial/wireprotov2peer.py
--- a/mercurial/wireprotov2peer.py
+++ b/mercurial/wireprotov2peer.py
@@ -7,11 +7,12 @@
from __future__ import absolute_import
+import threading
+
from .i18n import _
from . import (
encoding,
error,
- util,
wireprotoframing,
)
from .utils import (
@@ -34,20 +35,101 @@
return b''.join(chunks)
class commandresponse(object):
- """Represents the response to a command request."""
+ """Represents the response to a command request.
+
+ Instances track the state of the command and hold its results.
+
+ An external entity is required to update the state of the object when
+ events occur.
+ """
def __init__(self, requestid, command):
self.requestid = requestid
self.command = command
- self.b = util.bytesio()
+ # Whether all remote input related to this command has been
+ # received.
+ self._inputcomplete = False
+
+ # We have a lock that is acquired when important object state is
+ # mutated. This is to prevent race conditions between 1 thread
+ # sending us new data and another consuming it.
+ self._lock = threading.RLock()
+
+ # An event is set when state of the object changes. This event
+ # is waited on by the generator emitting objects.
+ self._serviceable = threading.Event()
+
+ self._pendingevents = []
+ self._decoder = cborutil.bufferingdecoder()
+ self._seeninitial = False
+
+ def _oninputcomplete(self):
+ with self._lock:
+ self._inputcomplete = True
+ self._serviceable.set()
+
+ def _onresponsedata(self, data):
+ available, readcount, wanted = self._decoder.decode(data)
+
+ if not available:
+ return
+
+ with self._lock:
+ for o in self._decoder.getavailable():
+ if not self._seeninitial:
+ self._handleinitial(o)
+ continue
+
+ self._pendingevents.append(o)
+
+ self._serviceable.set()
- def cborobjects(self):
- """Obtain decoded CBOR objects from this response."""
- self.b.seek(0)
+ def _handleinitial(self, o):
+ self._seeninitial = True
+ if o[b'status'] == 'ok':
+ return
+
+ atoms = [{'msg': o[b'error'][b'message']}]
+ if b'args' in o[b'error']:
+ atoms[0]['args'] = o[b'error'][b'args']
+
+ raise error.RepoError(formatrichmessage(atoms))
+
+ def objects(self):
+ """Obtained decoded objects from this response.
+
+ This is a generator of data structures that were decoded from the
+ command response.
+
+ Obtaining the next member of the generator may block due to waiting
+ on external data to become available.
- for v in cborutil.decodeall(self.b.getvalue()):
- yield v
+ If the server encountered an error in the middle of serving the data
+ or if another error occurred, an exception may be raised when
+ advancing the generator.
+ """
+ while True:
+ # TODO this can infinite loop if self._inputcomplete is never
+ # set. We likely want to tie the lifetime of this object/state
+ # to that of the background thread receiving frames and updating
+ # our state.
+ self._serviceable.wait(1.0)
+
+ with self._lock:
+ self._serviceable.clear()
+
+ # Make copies because objects could be mutated during
+ # iteration.
+ stop = self._inputcomplete
+ pending = list(self._pendingevents)
+ self._pendingevents[:] = []
+
+ for o in pending:
+ yield o
+
+ if stop:
+ break
class clienthandler(object):
"""Object to handle higher-level client activities.
@@ -80,6 +162,8 @@
rid = request.requestid
self._requests[rid] = request
self._futures[rid] = f
+ # TODO we need some kind of lifetime on response instances otherwise
+ # objects() may deadlock.
self._responses[rid] = commandresponse(rid, command)
return iter(())
@@ -119,8 +203,12 @@
if action == 'error':
e = error.RepoError(meta['message'])
+ if frame.requestid in self._responses:
+ self._responses[frame.requestid]._oninputcomplete()
+
if frame.requestid in self._futures:
self._futures[frame.requestid].set_exception(e)
+ del self._futures[frame.requestid]
else:
raise e
@@ -141,39 +229,32 @@
self._processresponsedata(frame, meta, response)
except BaseException as e:
self._futures[frame.requestid].set_exception(e)
+ del self._futures[frame.requestid]
+ response._oninputcomplete()
else:
raise error.ProgrammingError(
'unhandled action from clientreactor: %s' % action)
def _processresponsedata(self, frame, meta, response):
- # This buffers all data until end of stream is received. This
- # is bad for performance.
- # TODO make response data streamable
- response.b.write(meta['data'])
+ # This can raise. The caller can handle it.
+ response._onresponsedata(meta['data'])
if meta['eos']:
- # If the command has a decoder, resolve the future to the
- # decoded value. Otherwise resolve to the rich response object.
- decoder = COMMAND_DECODERS.get(response.command)
-
- # TODO consider always resolving the overall status map.
- if decoder:
- objs = response.cborobjects()
-
- overall = next(objs)
+ response._oninputcomplete()
+ del self._requests[frame.requestid]
- if overall['status'] == 'ok':
- self._futures[frame.requestid].set_result(decoder(objs))
- else:
- atoms = [{'msg': overall['error']['message']}]
- if 'args' in overall['error']:
- atoms[0]['args'] = overall['error']['args']
- e = error.RepoError(formatrichmessage(atoms))
- self._futures[frame.requestid].set_exception(e)
- else:
- self._futures[frame.requestid].set_result(response)
+ # If the command has a decoder, we wait until all input has been
+ # received before resolving the future. Otherwise we resolve the
+ # future immediately.
+ if frame.requestid not in self._futures:
+ return
- del self._requests[frame.requestid]
+ if response.command not in COMMAND_DECODERS:
+ self._futures[frame.requestid].set_result(response.objects())
+ del self._futures[frame.requestid]
+ elif response._inputcomplete:
+ decoded = COMMAND_DECODERS[response.command](response.objects())
+ self._futures[frame.requestid].set_result(decoded)
del self._futures[frame.requestid]
def decodebranchmap(objs):
diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py
--- a/mercurial/debugcommands.py
+++ b/mercurial/debugcommands.py
@@ -3240,7 +3240,7 @@
res = e.callcommand(command, args).result()
if isinstance(res, wireprotov2peer.commandresponse):
- val = list(res.cborobjects())
+ val = res.objects()
ui.status(_('response: %s\n') %
stringutil.pprint(val, bprefix=True, indent=2))
else:
To: indygreg, #hg-reviewers
Cc: mjpieters, mercurial-devel
More information about the Mercurial-devel
mailing list