D4916: wireprotov2: handle sender protocol settings frames
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Tue Oct 9 00:30:49 UTC 2018
indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.
REVISION SUMMARY
We teach the server reactor to handle the optional sender protocol
settings frames, which can only be sent at the beginning of frame
exchange.
Right now, we simply decode the data and record the sender protocol
settings on the server reactor instance: we don't yet do anything
meaningful with the data.
REPOSITORY
rHG Mercurial
REVISION DETAIL
https://phab.mercurial-scm.org/D4916
AFFECTED FILES
mercurial/wireprotoframing.py
tests/test-wireproto-serverreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -9,6 +9,9 @@
util,
wireprotoframing as framing,
)
+from mercurial.utils import (
+ cborutil,
+)
ffs = framing.makeframefromhumanstring
@@ -193,7 +196,8 @@
ffs(b'1 1 stream-begin command-data 0 ignored'))
self.assertaction(result, b'error')
self.assertEqual(result[1], {
- b'message': b'expected command request frame; got 2',
+ b'message': b'expected sender protocol settings or command request '
+ b'frame; got 2',
})
def testunexpectedcommanddatareceiving(self):
@@ -494,6 +498,105 @@
results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
self.assertaction(results[0], b'runcommand')
+ def testprotocolsettingsnoflags(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame must have '
+ b'continuation or end of stream flag set',
+ })
+
+ def testprotocolsettingsconflictflags(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame cannot have both '
+ b'continuation and end of stream flags set',
+ })
+
+ def testprotocolsettingsemptypayload(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame did not contain CBOR '
+ b'data',
+ })
+
+ def testprotocolsettingsmultipleobjects(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '
+ b'\x46foobar\x43foo'))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'sender protocol settings frame contained multiple '
+ b'CBOR values',
+ })
+
+ def testprotocolsettingscontentencodings(self):
+ reactor = makereactor()
+
+ result = self._sendsingleframe(
+ reactor,
+ ffs(b'0 1 stream-begin sender-protocol-settings eos '
+ b'cbor:{b"contentencodings": [b"a", b"b"]}'))
+ self.assertaction(result, b'wantframe')
+
+ self.assertEqual(reactor._state, b'idle')
+ self.assertEqual(reactor._sendersettings[b'contentencodings'],
+ [b'a', b'b'])
+
+ def testprotocolsettingsmultipleframes(self):
+ reactor = makereactor()
+
+ data = b''.join(cborutil.streamencode({
+ b'contentencodings': [b'value1', b'value2'],
+ }))
+
+ results = list(sendframes(reactor, [
+ ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
+ data[0:5]),
+ ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
+ ]))
+
+ self.assertEqual(len(results), 2)
+
+ self.assertaction(results[0], b'wantframe')
+ self.assertaction(results[1], b'wantframe')
+
+ self.assertEqual(reactor._state, b'idle')
+ self.assertEqual(reactor._sendersettings[b'contentencodings'],
+ [b'value1', b'value2'])
+
+ def testprotocolsettingsbadcbor(self):
+ result = self._sendsingleframe(
+ makereactor(),
+ ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
+ self.assertaction(result, b'error')
+
+ def testprotocolsettingsnoninitial(self):
+ # Cannot have protocol settings frames as non-initial frames.
+ reactor = makereactor()
+
+ stream = framing.stream(1)
+ results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
+ self.assertEqual(len(results), 1)
+ self.assertaction(results[0], b'runcommand')
+
+ result = self._sendsingleframe(
+ reactor,
+ ffs(b'0 1 0 sender-protocol-settings eos '))
+ self.assertaction(result, b'error')
+ self.assertEqual(result[1], {
+ b'message': b'expected command request frame; got 8',
+ })
+
if __name__ == '__main__':
import silenttestrunner
silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -674,6 +674,10 @@
'numbered streams; %d is not even' %
stream.streamid)
+DEFAULT_PROTOCOL_SETTINGS = {
+ 'contentencodings': [b'identity'],
+}
+
class serverreactor(object):
"""Holds state of a server handling frame-based protocol requests.
@@ -750,7 +754,7 @@
sender cannot receive until all data has been transmitted.
"""
self._deferoutput = deferoutput
- self._state = 'idle'
+ self._state = 'initial'
self._nextoutgoingstreamid = 2
self._bufferedframegens = []
# stream id -> stream instance for all active streams from the client.
@@ -763,6 +767,11 @@
# set.
self._activecommands = set()
+ self._protocolsettingsdecoder = None
+
+ # Sender protocol settings are optional. Set implied default values.
+ self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
+
def onframerecv(self, frame):
"""Process a frame that has been received off the wire.
@@ -794,6 +803,8 @@
del self._incomingstreams[frame.streamid]
handlers = {
+ 'initial': self._onframeinitial,
+ 'protocol-settings-receiving': self._onframeprotocolsettings,
'idle': self._onframeidle,
'command-receiving': self._onframecommandreceiving,
'errored': self._onframeerrored,
@@ -1062,6 +1073,85 @@
_('received command request frame with neither new nor '
'continuation flags set'))
+ def _onframeinitial(self, frame):
+ # Called when we receive a frame when in the "initial" state.
+ if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+ self._state = 'protocol-settings-receiving'
+ self._protocolsettingsdecoder = cborutil.bufferingdecoder()
+ return self._onframeprotocolsettings(frame)
+
+ elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+ self._state = 'idle'
+ return self._onframeidle(frame)
+
+ else:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('expected sender protocol settings or command request '
+ 'frame; got %d') % frame.typeid)
+
+ def _onframeprotocolsettings(self, frame):
+ assert self._state == 'protocol-settings-receiving'
+ assert self._protocolsettingsdecoder is not None
+
+ if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('expected sender protocol settings frame; got %d') %
+ frame.typeid)
+
+ more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
+ eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
+
+ if more and eos:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame cannot have both '
+ 'continuation and end of stream flags set'))
+
+ if not more and not eos:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame must have continuation or '
+ 'end of stream flag set'))
+
+ # TODO establish limits for maximum amount of data that can be
+ # buffered.
+ try:
+ self._protocolsettingsdecoder.decode(frame.payload)
+ except Exception as e:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('error decoding CBOR from sender protocol settings frame: %s')
+ % stringutil.forcebytestr(e))
+
+ if more:
+ return self._makewantframeresult()
+
+ assert eos
+
+ decoded = self._protocolsettingsdecoder.getavailable()
+ self._protocolsettingsdecoder = None
+
+ if not decoded:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame did not contain CBOR data'))
+ elif len(decoded) > 1:
+ self._state = 'errored'
+ return self._makeerrorresult(
+ _('sender protocol settings frame contained multiple CBOR '
+ 'values'))
+
+ d = decoded[0]
+
+ if b'contentencodings' in d:
+ self._sendersettings['contentencodings'] = d[b'contentencodings']
+
+ self._state = 'idle'
+
+ return self._makewantframeresult()
+
def _onframeidle(self, frame):
# The only frame type that should be received in this state is a
# command request.
To: indygreg, #hg-reviewers
Cc: mercurial-devel
More information about the Mercurial-devel
mailing list