D4916: wireprotov2: handle sender protocol settings frames

indygreg (Gregory Szorc) phabricator at mercurial-scm.org
Tue Oct 9 00:30:49 UTC 2018


indygreg created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  We teach the server reactor to handle the optional sender protocol
  settings frames, which can only be sent at the beginning of frame
  exchange.
  
  Right now, we simply decode the data and record the sender protocol
  settings on the server reactor instance: we don't yet do anything
  meaningful with the data.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D4916

AFFECTED FILES
  mercurial/wireprotoframing.py
  tests/test-wireproto-serverreactor.py

CHANGE DETAILS

diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
--- a/tests/test-wireproto-serverreactor.py
+++ b/tests/test-wireproto-serverreactor.py
@@ -9,6 +9,9 @@
     util,
     wireprotoframing as framing,
 )
+from mercurial.utils import (
+    cborutil,
+)
 
 ffs = framing.makeframefromhumanstring
 
@@ -193,7 +196,8 @@
             ffs(b'1 1 stream-begin command-data 0 ignored'))
         self.assertaction(result, b'error')
         self.assertEqual(result[1], {
-            b'message': b'expected command request frame; got 2',
+            b'message': b'expected sender protocol settings or command request '
+                        b'frame; got 2',
         })
 
     def testunexpectedcommanddatareceiving(self):
@@ -494,6 +498,105 @@
         results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
         self.assertaction(results[0], b'runcommand')
 
+    def testprotocolsettingsnoflags(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings 0 '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame must have '
+                        b'continuation or end of stream flag set',
+        })
+
+    def testprotocolsettingsconflictflags(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame cannot have both '
+                        b'continuation and end of stream flags set',
+        })
+
+    def testprotocolsettingsemptypayload(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame did not contain CBOR '
+                        b'data',
+        })
+
+    def testprotocolsettingsmultipleobjects(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '
+                b'\x46foobar\x43foo'))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'sender protocol settings frame contained multiple '
+                        b'CBOR values',
+        })
+
+    def testprotocolsettingscontentencodings(self):
+        reactor = makereactor()
+
+        result = self._sendsingleframe(
+            reactor,
+            ffs(b'0 1 stream-begin sender-protocol-settings eos '
+                b'cbor:{b"contentencodings": [b"a", b"b"]}'))
+        self.assertaction(result, b'wantframe')
+
+        self.assertEqual(reactor._state, b'idle')
+        self.assertEqual(reactor._sendersettings[b'contentencodings'],
+                         [b'a', b'b'])
+
+    def testprotocolsettingsmultipleframes(self):
+        reactor = makereactor()
+
+        data = b''.join(cborutil.streamencode({
+            b'contentencodings': [b'value1', b'value2'],
+        }))
+
+        results = list(sendframes(reactor, [
+            ffs(b'0 1 stream-begin sender-protocol-settings continuation %s' %
+                data[0:5]),
+            ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
+        ]))
+
+        self.assertEqual(len(results), 2)
+
+        self.assertaction(results[0], b'wantframe')
+        self.assertaction(results[1], b'wantframe')
+
+        self.assertEqual(reactor._state, b'idle')
+        self.assertEqual(reactor._sendersettings[b'contentencodings'],
+                         [b'value1', b'value2'])
+
+    def testprotocolsettingsbadcbor(self):
+        result = self._sendsingleframe(
+            makereactor(),
+            ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'))
+        self.assertaction(result, b'error')
+
+    def testprotocolsettingsnoninitial(self):
+        # Cannot have protocol settings frames as non-initial frames.
+        reactor = makereactor()
+
+        stream = framing.stream(1)
+        results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
+        self.assertEqual(len(results), 1)
+        self.assertaction(results[0], b'runcommand')
+
+        result = self._sendsingleframe(
+            reactor,
+            ffs(b'0 1 0 sender-protocol-settings eos '))
+        self.assertaction(result, b'error')
+        self.assertEqual(result[1], {
+            b'message': b'expected command request frame; got 8',
+        })
+
 if __name__ == '__main__':
     import silenttestrunner
     silenttestrunner.main(__name__)
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
--- a/mercurial/wireprotoframing.py
+++ b/mercurial/wireprotoframing.py
@@ -674,6 +674,10 @@
                                      'numbered streams; %d is not even' %
                                      stream.streamid)
 
+DEFAULT_PROTOCOL_SETTINGS = {
+    'contentencodings': [b'identity'],
+}
+
 class serverreactor(object):
     """Holds state of a server handling frame-based protocol requests.
 
@@ -750,7 +754,7 @@
         sender cannot receive until all data has been transmitted.
         """
         self._deferoutput = deferoutput
-        self._state = 'idle'
+        self._state = 'initial'
         self._nextoutgoingstreamid = 2
         self._bufferedframegens = []
         # stream id -> stream instance for all active streams from the client.
@@ -763,6 +767,11 @@
         # set.
         self._activecommands = set()
 
+        self._protocolsettingsdecoder = None
+
+        # Sender protocol settings are optional. Set implied default values.
+        self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
+
     def onframerecv(self, frame):
         """Process a frame that has been received off the wire.
 
@@ -794,6 +803,8 @@
             del self._incomingstreams[frame.streamid]
 
         handlers = {
+            'initial': self._onframeinitial,
+            'protocol-settings-receiving': self._onframeprotocolsettings,
             'idle': self._onframeidle,
             'command-receiving': self._onframecommandreceiving,
             'errored': self._onframeerrored,
@@ -1062,6 +1073,85 @@
                 _('received command request frame with neither new nor '
                   'continuation flags set'))
 
+    def _onframeinitial(self, frame):
+        # Called when we receive a frame when in the "initial" state.
+        if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+            self._state = 'protocol-settings-receiving'
+            self._protocolsettingsdecoder = cborutil.bufferingdecoder()
+            return self._onframeprotocolsettings(frame)
+
+        elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
+            self._state = 'idle'
+            return self._onframeidle(frame)
+
+        else:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected sender protocol settings or command request '
+                  'frame; got %d') % frame.typeid)
+
+    def _onframeprotocolsettings(self, frame):
+        assert self._state == 'protocol-settings-receiving'
+        assert self._protocolsettingsdecoder is not None
+
+        if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('expected sender protocol settings frame; got %d') %
+                frame.typeid)
+
+        more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
+        eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
+
+        if more and eos:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame cannot have both '
+                  'continuation and end of stream flags set'))
+
+        if not more and not eos:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame must have continuation or '
+                  'end of stream flag set'))
+
+        # TODO establish limits for maximum amount of data that can be
+        # buffered.
+        try:
+            self._protocolsettingsdecoder.decode(frame.payload)
+        except Exception as e:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('error decoding CBOR from sender protocol settings frame: %s')
+                % stringutil.forcebytestr(e))
+
+        if more:
+            return self._makewantframeresult()
+
+        assert eos
+
+        decoded = self._protocolsettingsdecoder.getavailable()
+        self._protocolsettingsdecoder = None
+
+        if not decoded:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame did not contain CBOR data'))
+        elif len(decoded) > 1:
+            self._state = 'errored'
+            return self._makeerrorresult(
+                _('sender protocol settings frame contained multiple CBOR '
+                  'values'))
+
+        d = decoded[0]
+
+        if b'contentencodings' in d:
+            self._sendersettings['contentencodings'] = d[b'contentencodings']
+
+        self._state = 'idle'
+
+        return self._makewantframeresult()
+
     def _onframeidle(self, frame):
         # The only frame type that should be received in this state is a
         # command request.



To: indygreg, #hg-reviewers
Cc: mercurial-devel


More information about the Mercurial-devel mailing list