D2947: wireproto: explicit API to create outgoing streams
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Mon Mar 26 21:35:50 UTC 2018
indygreg updated this revision to Diff 7326.
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D2947?vs=7309&id=7326
REVISION DETAIL
https://phab.mercurial-scm.org/D2947
AFFECTED FILES
mercurial/wireprotoframing.py
mercurial/wireprotoserver.py
tests/test-http-api-httpv2.t
tests/test-wireproto-serverreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -375,7 +375,7 @@
"""Multiple fully serviced commands with same request ID is allowed."""
reactor = makereactor()
results = []
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
results.append(self._sendsingleframe(
reactor, ffs(b'1 1 stream-begin command-name eos command')))
result = reactor.onbytesresponseready(outstream, 1, b'response1')
@@ -530,7 +530,7 @@
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
result = reactor.onbytesresponseready(outstream, 1, b'response')
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
@@ -546,7 +546,7 @@
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
result = reactor.onbytesresponseready(outstream, 1, first + second)
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
@@ -559,7 +559,7 @@
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
result = reactor.onapplicationerror(outstream, 1, b'some message')
self.assertaction(result, 'sendframes')
self.assertframesequal(result[1]['framegen'], [
@@ -575,7 +575,7 @@
self.assertEqual(len(results), 1)
self.assertaction(results[0], 'runcommand')
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
result = reactor.onbytesresponseready(outstream, 1, b'response')
self.assertaction(result, 'noop')
result = reactor.oninputeof()
@@ -590,7 +590,7 @@
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
list(sendcommandframes(reactor, instream, 3, b'command2', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
result = reactor.onbytesresponseready(outstream, 1, b'response1')
self.assertaction(result, 'noop')
result = reactor.onbytesresponseready(outstream, 3, b'response2')
@@ -610,7 +610,7 @@
list(sendcommandframes(reactor, instream, 5, b'command3', {}))
# Register results for commands out of order.
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
reactor.onbytesresponseready(outstream, 3, b'response3')
reactor.onbytesresponseready(outstream, 1, b'response1')
reactor.onbytesresponseready(outstream, 5, b'response5')
@@ -640,7 +640,7 @@
reactor = makereactor()
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
reactor.onbytesresponseready(outstream, 1, b'response')
# We've registered the response but haven't sent it. From the
@@ -672,7 +672,7 @@
reactor = makereactor()
instream = framing.stream(1)
list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = framing.stream(2)
+ outstream = reactor.makeoutputstream()
res = reactor.onbytesresponseready(outstream, 1, b'response')
list(res[1]['framegen'])
diff --git a/tests/test-http-api-httpv2.t b/tests/test-http-api-httpv2.t
--- a/tests/test-http-api-httpv2.t
+++ b/tests/test-http-api-httpv2.t
@@ -472,7 +472,7 @@
s> \x1d\x00\x00\x01\x00\x02\x01Bcustomreadonly bytes response
s> \r\n
s> 25\r\n
- s> \x1d\x00\x00\x03\x00\x02\x01Bcustomreadonly bytes response
+ s> \x1d\x00\x00\x03\x00\x02\x00Bcustomreadonly bytes response
s> \r\n
s> 0\r\n
s> \r\n
@@ -511,7 +511,7 @@
s> \x00\x00\x00\x03\x00\x02\x01B
s> \r\n
s> 26\r\n
- s> \x1e\x00\x00\x01\x00\x02\x01Bbookmarks \n
+ s> \x1e\x00\x00\x01\x00\x02\x00Bbookmarks \n
s> namespaces \n
s> phases
s> \r\n
diff --git a/mercurial/wireprotoserver.py b/mercurial/wireprotoserver.py
--- a/mercurial/wireprotoserver.py
+++ b/mercurial/wireprotoserver.py
@@ -432,6 +432,8 @@
reactor = wireprotoframing.serverreactor(deferoutput=True)
seencommand = False
+ outstream = reactor.makeoutputstream()
+
while True:
frame = wireprotoframing.readframe(req.bodyfh)
if not frame:
@@ -444,8 +446,8 @@
continue
elif action == 'runcommand':
sentoutput = _httpv2runcommand(ui, repo, req, res, authedperm,
- reqcommand, reactor, meta,
- issubsequent=seencommand)
+ reqcommand, reactor, outstream,
+ meta, issubsequent=seencommand)
if sentoutput:
return
@@ -476,7 +478,7 @@
% action)
def _httpv2runcommand(ui, repo, req, res, authedperm, reqcommand, reactor,
- command, issubsequent):
+ outstream, command, issubsequent):
"""Dispatch a wire protocol command made from HTTPv2 requests.
The authenticated permission (``authedperm``) along with the original
@@ -546,10 +548,9 @@
res.status = b'200 OK'
res.headers[b'Content-Type'] = FRAMINGTYPE
- stream = wireprotoframing.stream(2)
if isinstance(rsp, wireprototypes.bytesresponse):
- action, meta = reactor.onbytesresponseready(stream,
+ action, meta = reactor.onbytesresponseready(outstream,
command['requestid'],
rsp.data)
else:
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -533,9 +533,11 @@
"""
self._deferoutput = deferoutput
self._state = 'idle'
+ self._nextoutgoingstreamid = 2
self._bufferedframegens = []
# stream id -> stream instance for all active streams from the client.
self._incomingstreams = {}
+ self._outgoingstreams = {}
# request id -> dict of commands that are actively being received.
self._receivingcommands = {}
# Request IDs that have been received and are actively being processed.
@@ -638,6 +640,16 @@
application=True),
}
+ def makeoutputstream(self):
+ """Create a stream to be used for sending data to the client."""
+ streamid = self._nextoutgoingstreamid
+ self._nextoutgoingstreamid += 2
+
+ s = stream(streamid)
+ self._outgoingstreams[streamid] = s
+
+ return s
+
def _makeerrorresult(self, msg):
return 'error', {
'message': msg,
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list