D12370: wireprotov2: delete remnants of wireprotov2
indygreg (Gregory Szorc)
phabricator at mercurial-scm.org
Thu Mar 10 01:08:50 UTC 2022
indygreg created this revision.
Herald added a reviewer: hg-reviewers.
Herald added a subscriber: mercurial-patches.
REVISION SUMMARY
These were missed when wireprotov2 was deleted a few months ago.
REPOSITORY
rHG Mercurial
BRANCH
default
REVISION DETAIL
https://phab.mercurial-scm.org/D12370
AFFECTED FILES
mercurial/debugcommands.py
mercurial/wireprotoframing.py
tests/test-check-pytype.t
tests/test-wireproto-clientreactor.py
tests/test-wireproto-framing.py
tests/test-wireproto-serverreactor.py
CHANGE DETAILS
diff --git a/tests/test-wireproto-serverreactor.py b/tests/test-wireproto-serverreactor.py
deleted file mode 100644
--- a/tests/test-wireproto-serverreactor.py
+++ /dev/null
@@ -1,845 +0,0 @@
-import unittest
-
-from mercurial import (
- ui as uimod,
- util,
- wireprotoframing as framing,
-)
-from mercurial.utils import cborutil
-
-ffs = framing.makeframefromhumanstring
-
-OK = b''.join(cborutil.streamencode({b'status': b'ok'}))
-
-
-def makereactor(deferoutput=False):
- ui = uimod.ui()
- return framing.serverreactor(ui, deferoutput=deferoutput)
-
-
-def sendframes(reactor, gen):
- """Send a generator of frame bytearray to a reactor.
-
- Emits a generator of results from ``onframerecv()`` calls.
- """
- for frame in gen:
- header = framing.parseheader(frame)
- payload = frame[framing.FRAME_HEADER_SIZE :]
- assert len(payload) == header.length
-
- yield reactor.onframerecv(
- framing.frame(
- header.requestid,
- header.streamid,
- header.streamflags,
- header.typeid,
- header.flags,
- payload,
- )
- )
-
-
-def sendcommandframes(reactor, stream, rid, cmd, args, datafh=None):
- """Generate frames to run a command and send them to a reactor."""
- return sendframes(
- reactor, framing.createcommandframes(stream, rid, cmd, args, datafh)
- )
-
-
-class ServerReactorTests(unittest.TestCase):
- def _sendsingleframe(self, reactor, f):
- results = list(sendframes(reactor, [f]))
- self.assertEqual(len(results), 1)
-
- return results[0]
-
- def assertaction(self, res, expected):
- self.assertIsInstance(res, tuple)
- self.assertEqual(len(res), 2)
- self.assertIsInstance(res[1], dict)
- self.assertEqual(res[0], expected)
-
- def assertframesequal(self, frames, framestrings):
- expected = [ffs(s) for s in framestrings]
- self.assertEqual(list(frames), expected)
-
- def test1framecommand(self):
- """Receiving a command in a single frame yields request to run it."""
- reactor = makereactor()
- stream = framing.stream(1)
- results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
- self.assertEqual(len(results), 1)
- self.assertaction(results[0], b'runcommand')
- self.assertEqual(
- results[0][1],
- {
- b'requestid': 1,
- b'command': b'mycommand',
- b'args': {},
- b'redirect': None,
- b'data': None,
- },
- )
-
- result = reactor.oninputeof()
- self.assertaction(result, b'noop')
-
- def test1argument(self):
- reactor = makereactor()
- stream = framing.stream(1)
- results = list(
- sendcommandframes(
- reactor, stream, 41, b'mycommand', {b'foo': b'bar'}
- )
- )
- self.assertEqual(len(results), 1)
- self.assertaction(results[0], b'runcommand')
- self.assertEqual(
- results[0][1],
- {
- b'requestid': 41,
- b'command': b'mycommand',
- b'args': {b'foo': b'bar'},
- b'redirect': None,
- b'data': None,
- },
- )
-
- def testmultiarguments(self):
- reactor = makereactor()
- stream = framing.stream(1)
- results = list(
- sendcommandframes(
- reactor,
- stream,
- 1,
- b'mycommand',
- {b'foo': b'bar', b'biz': b'baz'},
- )
- )
- self.assertEqual(len(results), 1)
- self.assertaction(results[0], b'runcommand')
- self.assertEqual(
- results[0][1],
- {
- b'requestid': 1,
- b'command': b'mycommand',
- b'args': {b'foo': b'bar', b'biz': b'baz'},
- b'redirect': None,
- b'data': None,
- },
- )
-
- def testsimplecommanddata(self):
- reactor = makereactor()
- stream = framing.stream(1)
- results = list(
- sendcommandframes(
- reactor, stream, 1, b'mycommand', {}, util.bytesio(b'data!')
- )
- )
- self.assertEqual(len(results), 2)
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'runcommand')
- self.assertEqual(
- results[1][1],
- {
- b'requestid': 1,
- b'command': b'mycommand',
- b'args': {},
- b'redirect': None,
- b'data': b'data!',
- },
- )
-
- def testmultipledataframes(self):
- frames = [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'mycommand'}"
- ),
- ffs(b'1 1 0 command-data continuation data1'),
- ffs(b'1 1 0 command-data continuation data2'),
- ffs(b'1 1 0 command-data eos data3'),
- ]
-
- reactor = makereactor()
- results = list(sendframes(reactor, frames))
- self.assertEqual(len(results), 4)
- for i in range(3):
- self.assertaction(results[i], b'wantframe')
- self.assertaction(results[3], b'runcommand')
- self.assertEqual(
- results[3][1],
- {
- b'requestid': 1,
- b'command': b'mycommand',
- b'args': {},
- b'redirect': None,
- b'data': b'data1data2data3',
- },
- )
-
- def testargumentanddata(self):
- frames = [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command', b'args': {b'key': b'val',"
- b"b'foo': b'bar'}}"
- ),
- ffs(b'1 1 0 command-data continuation value1'),
- ffs(b'1 1 0 command-data eos value2'),
- ]
-
- reactor = makereactor()
- results = list(sendframes(reactor, frames))
-
- self.assertaction(results[-1], b'runcommand')
- self.assertEqual(
- results[-1][1],
- {
- b'requestid': 1,
- b'command': b'command',
- b'args': {
- b'key': b'val',
- b'foo': b'bar',
- },
- b'redirect': None,
- b'data': b'value1value2',
- },
- )
-
- def testnewandcontinuation(self):
- result = self._sendsingleframe(
- makereactor(),
- ffs(b'1 1 stream-begin command-request new|continuation '),
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'received command request frame with both new and '
- b'continuation flags set',
- },
- )
-
- def testneithernewnorcontinuation(self):
- result = self._sendsingleframe(
- makereactor(), ffs(b'1 1 stream-begin command-request 0 ')
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'received command request frame with neither new nor '
- b'continuation flags set',
- },
- )
-
- def testunexpectedcommanddata(self):
- """Command data frame when not running a command is an error."""
- result = self._sendsingleframe(
- makereactor(), ffs(b'1 1 stream-begin command-data 0 ignored')
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'expected sender protocol settings or command request '
- b'frame; got 2',
- },
- )
-
- def testunexpectedcommanddatareceiving(self):
- """Same as above except the command is receiving."""
- results = list(
- sendframes(
- makereactor(),
- [
- ffs(
- b'1 1 stream-begin command-request new|more '
- b"cbor:{b'name': b'ignored'}"
- ),
- ffs(b'1 1 0 command-data eos ignored'),
- ],
- )
- )
-
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'error')
- self.assertEqual(
- results[1][1],
- {
- b'message': b'received command data frame for request that is not '
- b'expecting data: 1',
- },
- )
-
- def testconflictingrequestidallowed(self):
- """Multiple fully serviced commands with same request ID is allowed."""
- reactor = makereactor()
- results = []
- outstream = reactor.makeoutputstream()
- results.append(
- self._sendsingleframe(
- reactor,
- ffs(
- b'1 1 stream-begin command-request new '
- b"cbor:{b'name': b'command'}"
- ),
- )
- )
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response1']
- )
- self.assertaction(result, b'sendframes')
- list(result[1][b'framegen'])
- results.append(
- self._sendsingleframe(
- reactor,
- ffs(
- b'1 1 stream-begin command-request new '
- b"cbor:{b'name': b'command'}"
- ),
- )
- )
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response2']
- )
- self.assertaction(result, b'sendframes')
- list(result[1][b'framegen'])
- results.append(
- self._sendsingleframe(
- reactor,
- ffs(
- b'1 1 stream-begin command-request new '
- b"cbor:{b'name': b'command'}"
- ),
- )
- )
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response3']
- )
- self.assertaction(result, b'sendframes')
- list(result[1][b'framegen'])
-
- for i in range(3):
- self.assertaction(results[i], b'runcommand')
- self.assertEqual(
- results[i][1],
- {
- b'requestid': 1,
- b'command': b'command',
- b'args': {},
- b'redirect': None,
- b'data': None,
- },
- )
-
- def testconflictingrequestid(self):
- """Request ID for new command matching in-flight command is illegal."""
- results = list(
- sendframes(
- makereactor(),
- [
- ffs(
- b'1 1 stream-begin command-request new|more '
- b"cbor:{b'name': b'command'}"
- ),
- ffs(
- b'1 1 0 command-request new '
- b"cbor:{b'name': b'command1'}"
- ),
- ],
- )
- )
-
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'error')
- self.assertEqual(
- results[1][1],
- {
- b'message': b'request with ID 1 already received',
- },
- )
-
- def testinterleavedcommands(self):
- cbor1 = b''.join(
- cborutil.streamencode(
- {
- b'name': b'command1',
- b'args': {
- b'foo': b'bar',
- b'key1': b'val',
- },
- }
- )
- )
- cbor3 = b''.join(
- cborutil.streamencode(
- {
- b'name': b'command3',
- b'args': {
- b'biz': b'baz',
- b'key': b'val',
- },
- }
- )
- )
-
- results = list(
- sendframes(
- makereactor(),
- [
- ffs(
- b'1 1 stream-begin command-request new|more %s'
- % cbor1[0:6]
- ),
- ffs(b'3 1 0 command-request new|more %s' % cbor3[0:10]),
- ffs(
- b'1 1 0 command-request continuation|more %s'
- % cbor1[6:9]
- ),
- ffs(
- b'3 1 0 command-request continuation|more %s'
- % cbor3[10:13]
- ),
- ffs(b'3 1 0 command-request continuation %s' % cbor3[13:]),
- ffs(b'1 1 0 command-request continuation %s' % cbor1[9:]),
- ],
- )
- )
-
- self.assertEqual(
- [t[0] for t in results],
- [
- b'wantframe',
- b'wantframe',
- b'wantframe',
- b'wantframe',
- b'runcommand',
- b'runcommand',
- ],
- )
-
- self.assertEqual(
- results[4][1],
- {
- b'requestid': 3,
- b'command': b'command3',
- b'args': {b'biz': b'baz', b'key': b'val'},
- b'redirect': None,
- b'data': None,
- },
- )
- self.assertEqual(
- results[5][1],
- {
- b'requestid': 1,
- b'command': b'command1',
- b'args': {b'foo': b'bar', b'key1': b'val'},
- b'redirect': None,
- b'data': None,
- },
- )
-
- def testmissingcommanddataframe(self):
- # The reactor doesn't currently handle partially received commands.
- # So this test is failing to do anything with request 1.
- frames = [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command1'}"
- ),
- ffs(b'3 1 0 command-request new ' b"cbor:{b'name': b'command2'}"),
- ]
- results = list(sendframes(makereactor(), frames))
- self.assertEqual(len(results), 2)
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'runcommand')
-
- def testmissingcommanddataframeflags(self):
- frames = [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command1'}"
- ),
- ffs(b'1 1 0 command-data 0 data'),
- ]
- results = list(sendframes(makereactor(), frames))
- self.assertEqual(len(results), 2)
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'error')
- self.assertEqual(
- results[1][1],
- {
- b'message': b'command data frame without flags',
- },
- )
-
- def testframefornonreceivingrequest(self):
- """Receiving a frame for a command that is not receiving is illegal."""
- results = list(
- sendframes(
- makereactor(),
- [
- ffs(
- b'1 1 stream-begin command-request new '
- b"cbor:{b'name': b'command1'}"
- ),
- ffs(
- b'3 1 0 command-request new|have-data '
- b"cbor:{b'name': b'command3'}"
- ),
- ffs(b'5 1 0 command-data eos ignored'),
- ],
- )
- )
- self.assertaction(results[2], b'error')
- self.assertEqual(
- results[2][1],
- {
- b'message': b'received frame for request that is not receiving: 5',
- },
- )
-
- def testsimpleresponse(self):
- """Bytes response to command sends result frames."""
- reactor = makereactor()
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
-
- outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response']
- )
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b'1 2 stream-begin stream-settings eos cbor:b"identity"',
- b'1 2 encoded command-response continuation %s' % OK,
- b'1 2 encoded command-response continuation cbor:b"response"',
- b'1 2 0 command-response eos ',
- ],
- )
-
- def testmultiframeresponse(self):
- """Bytes response spanning multiple frames is handled."""
- first = b'x' * framing.DEFAULT_MAX_FRAME_SIZE
- second = b'y' * 100
-
- reactor = makereactor()
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
-
- outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [first + second]
- )
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b'1 2 stream-begin stream-settings eos cbor:b"identity"',
- b'1 2 encoded command-response continuation %s' % OK,
- b'1 2 encoded command-response continuation Y\x80d',
- b'1 2 encoded command-response continuation %s' % first,
- b'1 2 encoded command-response continuation %s' % second,
- b'1 2 0 command-response eos ',
- ],
- )
-
- def testservererror(self):
- reactor = makereactor()
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'mycommand', {}))
-
- outstream = reactor.makeoutputstream()
- result = reactor.onservererror(outstream, 1, b'some message')
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b"1 2 stream-begin error-response 0 "
- b"cbor:{b'type': b'server', "
- b"b'message': [{b'msg': b'some message'}]}",
- ],
- )
-
- def test1commanddeferresponse(self):
- """Responses when in deferred output mode are delayed until EOF."""
- reactor = makereactor(deferoutput=True)
- instream = framing.stream(1)
- results = list(
- sendcommandframes(reactor, instream, 1, b'mycommand', {})
- )
- self.assertEqual(len(results), 1)
- self.assertaction(results[0], b'runcommand')
-
- outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response']
- )
- self.assertaction(result, b'noop')
- result = reactor.oninputeof()
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b'1 2 stream-begin stream-settings eos cbor:b"identity"',
- b'1 2 encoded command-response continuation %s' % OK,
- b'1 2 encoded command-response continuation cbor:b"response"',
- b'1 2 0 command-response eos ',
- ],
- )
-
- def testmultiplecommanddeferresponse(self):
- reactor = makereactor(deferoutput=True)
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- list(sendcommandframes(reactor, instream, 3, b'command2', {}))
-
- outstream = reactor.makeoutputstream()
- result = reactor.oncommandresponsereadyobjects(
- outstream, 1, [b'response1']
- )
- self.assertaction(result, b'noop')
- result = reactor.oncommandresponsereadyobjects(
- outstream, 3, [b'response2']
- )
- self.assertaction(result, b'noop')
- result = reactor.oninputeof()
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b'1 2 stream-begin stream-settings eos cbor:b"identity"',
- b'1 2 encoded command-response continuation %s' % OK,
- b'1 2 encoded command-response continuation cbor:b"response1"',
- b'1 2 0 command-response eos ',
- b'3 2 encoded command-response continuation %s' % OK,
- b'3 2 encoded command-response continuation cbor:b"response2"',
- b'3 2 0 command-response eos ',
- ],
- )
-
- def testrequestidtracking(self):
- reactor = makereactor(deferoutput=True)
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- list(sendcommandframes(reactor, instream, 3, b'command2', {}))
- list(sendcommandframes(reactor, instream, 5, b'command3', {}))
-
- # Register results for commands out of order.
- outstream = reactor.makeoutputstream()
- reactor.oncommandresponsereadyobjects(outstream, 3, [b'response3'])
- reactor.oncommandresponsereadyobjects(outstream, 1, [b'response1'])
- reactor.oncommandresponsereadyobjects(outstream, 5, [b'response5'])
-
- result = reactor.oninputeof()
- self.assertaction(result, b'sendframes')
- self.assertframesequal(
- result[1][b'framegen'],
- [
- b'3 2 stream-begin stream-settings eos cbor:b"identity"',
- b'3 2 encoded command-response continuation %s' % OK,
- b'3 2 encoded command-response continuation cbor:b"response3"',
- b'3 2 0 command-response eos ',
- b'1 2 encoded command-response continuation %s' % OK,
- b'1 2 encoded command-response continuation cbor:b"response1"',
- b'1 2 0 command-response eos ',
- b'5 2 encoded command-response continuation %s' % OK,
- b'5 2 encoded command-response continuation cbor:b"response5"',
- b'5 2 0 command-response eos ',
- ],
- )
-
- def testduplicaterequestonactivecommand(self):
- """Receiving a request ID that matches a request that isn't finished."""
- reactor = makereactor()
- stream = framing.stream(1)
- list(sendcommandframes(reactor, stream, 1, b'command1', {}))
- results = list(sendcommandframes(reactor, stream, 1, b'command1', {}))
-
- self.assertaction(results[0], b'error')
- self.assertEqual(
- results[0][1],
- {
- b'message': b'request with ID 1 is already active',
- },
- )
-
- def testduplicaterequestonactivecommandnosend(self):
- """Same as above but we've registered a response but haven't sent it."""
- reactor = makereactor()
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = reactor.makeoutputstream()
- reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
-
- # We've registered the response but haven't sent it. From the
- # perspective of the reactor, the command is still active.
-
- results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- self.assertaction(results[0], b'error')
- self.assertEqual(
- results[0][1],
- {
- b'message': b'request with ID 1 is already active',
- },
- )
-
- def testduplicaterequestaftersend(self):
- """We can use a duplicate request ID after we've sent the response."""
- reactor = makereactor()
- instream = framing.stream(1)
- list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- outstream = reactor.makeoutputstream()
- res = reactor.oncommandresponsereadyobjects(outstream, 1, [b'response'])
- list(res[1][b'framegen'])
-
- results = list(sendcommandframes(reactor, instream, 1, b'command1', {}))
- self.assertaction(results[0], b'runcommand')
-
- def testprotocolsettingsnoflags(self):
- result = self._sendsingleframe(
- makereactor(), ffs(b'0 1 stream-begin sender-protocol-settings 0 ')
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'sender protocol settings frame must have '
- b'continuation or end of stream flag set',
- },
- )
-
- def testprotocolsettingsconflictflags(self):
- result = self._sendsingleframe(
- makereactor(),
- ffs(b'0 1 stream-begin sender-protocol-settings continuation|eos '),
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'sender protocol settings frame cannot have both '
- b'continuation and end of stream flags set',
- },
- )
-
- def testprotocolsettingsemptypayload(self):
- result = self._sendsingleframe(
- makereactor(),
- ffs(b'0 1 stream-begin sender-protocol-settings eos '),
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'sender protocol settings frame did not contain CBOR '
- b'data',
- },
- )
-
- def testprotocolsettingsmultipleobjects(self):
- result = self._sendsingleframe(
- makereactor(),
- ffs(
- b'0 1 stream-begin sender-protocol-settings eos '
- b'\x46foobar\x43foo'
- ),
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'sender protocol settings frame contained multiple '
- b'CBOR values',
- },
- )
-
- def testprotocolsettingscontentencodings(self):
- reactor = makereactor()
-
- result = self._sendsingleframe(
- reactor,
- ffs(
- b'0 1 stream-begin sender-protocol-settings eos '
- b'cbor:{b"contentencodings": [b"a", b"b"]}'
- ),
- )
- self.assertaction(result, b'wantframe')
-
- self.assertEqual(reactor._state, b'idle')
- self.assertEqual(
- reactor._sendersettings[b'contentencodings'], [b'a', b'b']
- )
-
- def testprotocolsettingsmultipleframes(self):
- reactor = makereactor()
-
- data = b''.join(
- cborutil.streamencode(
- {
- b'contentencodings': [b'value1', b'value2'],
- }
- )
- )
-
- results = list(
- sendframes(
- reactor,
- [
- ffs(
- b'0 1 stream-begin sender-protocol-settings continuation %s'
- % data[0:5]
- ),
- ffs(b'0 1 0 sender-protocol-settings eos %s' % data[5:]),
- ],
- )
- )
-
- self.assertEqual(len(results), 2)
-
- self.assertaction(results[0], b'wantframe')
- self.assertaction(results[1], b'wantframe')
-
- self.assertEqual(reactor._state, b'idle')
- self.assertEqual(
- reactor._sendersettings[b'contentencodings'], [b'value1', b'value2']
- )
-
- def testprotocolsettingsbadcbor(self):
- result = self._sendsingleframe(
- makereactor(),
- ffs(b'0 1 stream-begin sender-protocol-settings eos badvalue'),
- )
- self.assertaction(result, b'error')
-
- def testprotocolsettingsnoninitial(self):
- # Cannot have protocol settings frames as non-initial frames.
- reactor = makereactor()
-
- stream = framing.stream(1)
- results = list(sendcommandframes(reactor, stream, 1, b'mycommand', {}))
- self.assertEqual(len(results), 1)
- self.assertaction(results[0], b'runcommand')
-
- result = self._sendsingleframe(
- reactor, ffs(b'0 1 0 sender-protocol-settings eos ')
- )
- self.assertaction(result, b'error')
- self.assertEqual(
- result[1],
- {
- b'message': b'expected command request frame; got 8',
- },
- )
-
-
-if __name__ == '__main__':
- import silenttestrunner
-
- silenttestrunner.main(__name__)
diff --git a/tests/test-wireproto-framing.py b/tests/test-wireproto-framing.py
deleted file mode 100644
--- a/tests/test-wireproto-framing.py
+++ /dev/null
@@ -1,305 +0,0 @@
-import unittest
-
-from mercurial import (
- util,
- wireprotoframing as framing,
-)
-
-ffs = framing.makeframefromhumanstring
-
-
-class FrameHumanStringTests(unittest.TestCase):
- def testbasic(self):
- self.assertEqual(
- ffs(b'1 1 0 1 0 '), b'\x00\x00\x00\x01\x00\x01\x00\x10'
- )
-
- self.assertEqual(
- ffs(b'2 4 0 1 0 '), b'\x00\x00\x00\x02\x00\x04\x00\x10'
- )
-
- self.assertEqual(
- ffs(b'2 4 0 1 0 foo'), b'\x03\x00\x00\x02\x00\x04\x00\x10foo'
- )
-
- def testcborint(self):
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:15'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x0f'
- )
-
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:42'), b'\x02\x00\x00\x01\x00\x01\x00\x10\x18*'
- )
-
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:1048576'),
- b'\x05\x00\x00\x01\x00\x01\x00\x10\x1a' b'\x00\x10\x00\x00',
- )
-
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:0'), b'\x01\x00\x00\x01\x00\x01\x00\x10\x00'
- )
-
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:-1'), b'\x01\x00\x00\x01\x00\x01\x00\x10 '
- )
-
- self.assertEqual(
- ffs(b'1 1 0 1 0 cbor:-342542'),
- b'\x05\x00\x00\x01\x00\x01\x00\x10:\x00\x05:\r',
- )
-
- def testcborstrings(self):
- self.assertEqual(
- ffs(b"1 1 0 1 0 cbor:b'foo'"),
- b'\x04\x00\x00\x01\x00\x01\x00\x10Cfoo',
- )
-
- def testcborlists(self):
- self.assertEqual(
- ffs(b"1 1 0 1 0 cbor:[None, True, False, 42, b'foo']"),
- b'\n\x00\x00\x01\x00\x01\x00\x10\x85\xf6\xf5\xf4' b'\x18*Cfoo',
- )
-
- def testcbordicts(self):
- self.assertEqual(
- ffs(b"1 1 0 1 0 " b"cbor:{b'foo': b'val1', b'bar': b'val2'}"),
- b'\x13\x00\x00\x01\x00\x01\x00\x10\xa2' b'CbarDval2CfooDval1',
- )
-
-
-class FrameTests(unittest.TestCase):
- def testdataexactframesize(self):
- data = util.bytesio(b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
-
- stream = framing.stream(1)
- frames = list(
- framing.createcommandframes(stream, 1, b'command', {}, data)
- )
- self.assertEqual(
- frames,
- [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command'}"
- ),
- ffs(b'1 1 0 command-data continuation %s' % data.getvalue()),
- ffs(b'1 1 0 command-data eos '),
- ],
- )
-
- def testdatamultipleframes(self):
- data = util.bytesio(b'x' * (framing.DEFAULT_MAX_FRAME_SIZE + 1))
-
- stream = framing.stream(1)
- frames = list(
- framing.createcommandframes(stream, 1, b'command', {}, data)
- )
- self.assertEqual(
- frames,
- [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command'}"
- ),
- ffs(
- b'1 1 0 command-data continuation %s'
- % (b'x' * framing.DEFAULT_MAX_FRAME_SIZE)
- ),
- ffs(b'1 1 0 command-data eos x'),
- ],
- )
-
- def testargsanddata(self):
- data = util.bytesio(b'x' * 100)
-
- stream = framing.stream(1)
- frames = list(
- framing.createcommandframes(
- stream,
- 1,
- b'command',
- {
- b'key1': b'key1value',
- b'key2': b'key2value',
- b'key3': b'key3value',
- },
- data,
- )
- )
-
- self.assertEqual(
- frames,
- [
- ffs(
- b'1 1 stream-begin command-request new|have-data '
- b"cbor:{b'name': b'command', b'args': {b'key1': b'key1value', "
- b"b'key2': b'key2value', b'key3': b'key3value'}}"
- ),
- ffs(b'1 1 0 command-data eos %s' % data.getvalue()),
- ],
- )
-
- if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
- # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
- # the regex version.
- assertRaisesRegex = ( # camelcase-required
- unittest.TestCase.assertRaisesRegexp
- )
-
- def testtextoutputformattingstringtype(self):
- """Formatting string must be bytes."""
- with self.assertRaisesRegex(ValueError, 'must use bytes formatting '):
- list(
- framing.createtextoutputframe(
- None, 1, [(b'foo'.decode('ascii'), [], [])]
- )
- )
-
- def testtextoutputargumentbytes(self):
- with self.assertRaisesRegex(ValueError, 'must use bytes for argument'):
- list(
- framing.createtextoutputframe(
- None, 1, [(b'foo', [b'foo'.decode('ascii')], [])]
- )
- )
-
- def testtextoutputlabelbytes(self):
- with self.assertRaisesRegex(ValueError, 'must use bytes for labels'):
- list(
- framing.createtextoutputframe(
- None, 1, [(b'foo', [], [b'foo'.decode('ascii')])]
- )
- )
-
- def testtextoutput1simpleatom(self):
- stream = framing.stream(1)
- val = list(framing.createtextoutputframe(stream, 1, [(b'foo', [], [])]))
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo'}]"
- ),
- ],
- )
-
- def testtextoutput2simpleatoms(self):
- stream = framing.stream(1)
- val = list(
- framing.createtextoutputframe(
- stream,
- 1,
- [
- (b'foo', [], []),
- (b'bar', [], []),
- ],
- )
- )
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo'}, {b'msg': b'bar'}]"
- )
- ],
- )
-
- def testtextoutput1arg(self):
- stream = framing.stream(1)
- val = list(
- framing.createtextoutputframe(
- stream,
- 1,
- [
- (b'foo %s', [b'val1'], []),
- ],
- )
- )
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s', b'args': [b'val1']}]"
- )
- ],
- )
-
- def testtextoutput2arg(self):
- stream = framing.stream(1)
- val = list(
- framing.createtextoutputframe(
- stream,
- 1,
- [
- (b'foo %s %s', [b'val', b'value'], []),
- ],
- )
- )
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s %s', b'args': [b'val', b'value']}]"
- )
- ],
- )
-
- def testtextoutput1label(self):
- stream = framing.stream(1)
- val = list(
- framing.createtextoutputframe(
- stream,
- 1,
- [
- (b'foo', [], [b'label']),
- ],
- )
- )
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo', b'labels': [b'label']}]"
- )
- ],
- )
-
- def testargandlabel(self):
- stream = framing.stream(1)
- val = list(
- framing.createtextoutputframe(
- stream,
- 1,
- [
- (b'foo %s', [b'arg'], [b'label']),
- ],
- )
- )
-
- self.assertEqual(
- val,
- [
- ffs(
- b'1 1 stream-begin text-output 0 '
- b"cbor:[{b'msg': b'foo %s', b'args': [b'arg'], "
- b"b'labels': [b'label']}]"
- )
- ],
- )
-
-
-if __name__ == '__main__':
- import silenttestrunner
-
- silenttestrunner.main(__name__)
diff --git a/tests/test-wireproto-clientreactor.py b/tests/test-wireproto-clientreactor.py
deleted file mode 100644
--- a/tests/test-wireproto-clientreactor.py
+++ /dev/null
@@ -1,765 +0,0 @@
-import sys
-import unittest
-import zlib
-
-from mercurial import (
- error,
- ui as uimod,
- wireprotoframing as framing,
-)
-from mercurial.utils import cborutil
-
-try:
- from mercurial import zstd
-
- zstd.__version__
-except ImportError:
- zstd = None
-
-ffs = framing.makeframefromhumanstring
-
-globalui = uimod.ui()
-
-
-def sendframe(reactor, frame):
- """Send a frame bytearray to a reactor."""
- header = framing.parseheader(frame)
- payload = frame[framing.FRAME_HEADER_SIZE :]
- assert len(payload) == header.length
-
- return reactor.onframerecv(
- framing.frame(
- header.requestid,
- header.streamid,
- header.streamflags,
- header.typeid,
- header.flags,
- payload,
- )
- )
-
-
-class SingleSendTests(unittest.TestCase):
- """A reactor that can only send once rejects subsequent sends."""
-
- if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
- # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
- # the regex version.
- assertRaisesRegex = ( # camelcase-required
- unittest.TestCase.assertRaisesRegexp
- )
-
- def testbasic(self):
- reactor = framing.clientreactor(
- globalui, hasmultiplesend=False, buffersends=True
- )
-
- request, action, meta = reactor.callcommand(b'foo', {})
- self.assertEqual(request.state, b'pending')
- self.assertEqual(action, b'noop')
-
- action, meta = reactor.flushcommands()
- self.assertEqual(action, b'sendframes')
-
- for frame in meta[b'framegen']:
- self.assertEqual(request.state, b'sending')
-
- self.assertEqual(request.state, b'sent')
-
- with self.assertRaisesRegex(
- error.ProgrammingError, 'cannot issue new commands'
- ):
- reactor.callcommand(b'foo', {})
-
- with self.assertRaisesRegex(
- error.ProgrammingError, 'cannot issue new commands'
- ):
- reactor.callcommand(b'foo', {})
-
-
-class NoBufferTests(unittest.TestCase):
- """A reactor without send buffering sends requests immediately."""
-
- def testbasic(self):
- reactor = framing.clientreactor(
- globalui, hasmultiplesend=True, buffersends=False
- )
-
- request, action, meta = reactor.callcommand(b'command1', {})
- self.assertEqual(request.requestid, 1)
- self.assertEqual(action, b'sendframes')
-
- self.assertEqual(request.state, b'pending')
-
- for frame in meta[b'framegen']:
- self.assertEqual(request.state, b'sending')
-
- self.assertEqual(request.state, b'sent')
-
- action, meta = reactor.flushcommands()
- self.assertEqual(action, b'noop')
-
- # And we can send another command.
- request, action, meta = reactor.callcommand(b'command2', {})
- self.assertEqual(request.requestid, 3)
- self.assertEqual(action, b'sendframes')
-
- for frame in meta[b'framegen']:
- self.assertEqual(request.state, b'sending')
-
- self.assertEqual(request.state, b'sent')
-
-
-class BadFrameRecvTests(unittest.TestCase):
- if not getattr(unittest.TestCase, 'assertRaisesRegex', False):
- # Python 3.7 deprecates the regex*p* version, but 2.7 lacks
- # the regex version.
- assertRaisesRegex = ( # camelcase-required
- unittest.TestCase.assertRaisesRegexp
- )
-
- def testoddstream(self):
- reactor = framing.clientreactor(globalui)
-
- action, meta = sendframe(reactor, ffs(b'1 1 0 1 0 foo'))
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta[b'message'], b'received frame with odd numbered stream ID: 1'
- )
-
- def testunknownstream(self):
- reactor = framing.clientreactor(globalui)
-
- action, meta = sendframe(reactor, ffs(b'1 0 0 1 0 foo'))
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta[b'message'],
- b'received frame on unknown stream without beginning '
- b'of stream flag set',
- )
-
- def testunhandledframetype(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for frame in meta[b'framegen']:
- pass
-
- with self.assertRaisesRegex(
- error.ProgrammingError, 'unhandled frame type'
- ):
- sendframe(reactor, ffs(b'1 0 stream-begin text-output 0 foo'))
-
-
-class StreamTests(unittest.TestCase):
- def testmultipleresponseframes(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
-
- self.assertEqual(action, b'sendframes')
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 0 stream-begin command-response 0 foo' % request.requestid
- ),
- )
- self.assertEqual(action, b'responsedata')
-
- action, meta = sendframe(
- reactor, ffs(b'%d 0 0 command-response eos bar' % request.requestid)
- )
- self.assertEqual(action, b'responsedata')
-
-
-class RedirectTests(unittest.TestCase):
- def testredirect(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- redirect = {
- b'targets': [b'a', b'b'],
- b'hashes': [b'sha256'],
- }
-
- request, action, meta = reactor.callcommand(
- b'foo', {}, redirect=redirect
- )
-
- self.assertEqual(action, b'sendframes')
-
- frames = list(meta[b'framegen'])
- self.assertEqual(len(frames), 1)
-
- self.assertEqual(
- frames[0],
- ffs(
- b'1 1 stream-begin command-request new '
- b"cbor:{b'name': b'foo', "
- b"b'redirect': {b'targets': [b'a', b'b'], "
- b"b'hashes': [b'sha256']}}"
- ),
- )
-
-
-class StreamSettingsTests(unittest.TestCase):
- def testnoflags(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 stream-begin stream-settings 0 ')
- )
-
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta,
- {
- b'message': b'stream encoding settings frame must have '
- b'continuation or end of stream flag set',
- },
- )
-
- def testconflictflags(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 stream-begin stream-settings continuation|eos ')
- )
-
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta,
- {
- b'message': b'stream encoding settings frame cannot have both '
- b'continuation and end of stream flags set',
- },
- )
-
- def testemptypayload(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 stream-begin stream-settings eos ')
- )
-
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta,
- {
- b'message': b'stream encoding settings frame did not contain '
- b'CBOR data'
- },
- )
-
- def testbadcbor(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 stream-begin stream-settings eos badvalue')
- )
-
- self.assertEqual(action, b'error')
-
- def testsingleobject(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(b'1 2 stream-begin stream-settings eos cbor:b"identity"'),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- def testmultipleobjects(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- data = b''.join(
- [
- b''.join(cborutil.streamencode(b'identity')),
- b''.join(cborutil.streamencode({b'foo', b'bar'})),
- ]
- )
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 stream-begin stream-settings eos %s' % data)
- )
-
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta,
- {
- b'message': b'error setting stream decoder: identity decoder '
- b'received unexpected additional values',
- },
- )
-
- def testmultipleframes(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- data = b''.join(cborutil.streamencode(b'identity'))
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'1 2 stream-begin stream-settings continuation %s' % data[0:3]
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- action, meta = sendframe(
- reactor, ffs(b'1 2 0 stream-settings eos %s' % data[3:])
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- def testinvalidencoder(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(b'1 2 stream-begin stream-settings eos cbor:b"badvalue"'),
- )
-
- self.assertEqual(action, b'error')
- self.assertEqual(
- meta,
- {
- b'message': b'error setting stream decoder: unknown stream '
- b'decoder: badvalue',
- },
- )
-
- def testzlibencoding(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
- % request.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- result = {
- b'status': b'ok',
- }
- encoded = b''.join(cborutil.streamencode(result))
-
- compressed = zlib.compress(encoded)
- self.assertEqual(zlib.decompress(compressed), encoded)
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request.requestid, compressed)
- ),
- )
-
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], encoded)
-
- def testzlibencodingsinglebyteframes(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
- % request.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- result = {
- b'status': b'ok',
- }
- encoded = b''.join(cborutil.streamencode(result))
-
- compressed = zlib.compress(encoded)
- self.assertEqual(zlib.decompress(compressed), encoded)
-
- chunks = []
-
- for i in range(len(compressed)):
- char = compressed[i : i + 1]
- if char == b'\\':
- char = b'\\\\'
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request.requestid, char)
- ),
- )
-
- self.assertEqual(action, b'responsedata')
- chunks.append(meta[b'data'])
- self.assertTrue(meta[b'expectmore'])
- self.assertFalse(meta[b'eos'])
-
- # zlib will have the full data decoded at this point, even though
- # we haven't flushed.
- self.assertEqual(b''.join(chunks), encoded)
-
- # End the stream for good measure.
- action, meta = sendframe(
- reactor,
- ffs(b'%d 2 stream-end command-response eos ' % request.requestid),
- )
-
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
- self.assertFalse(meta[b'expectmore'])
- self.assertTrue(meta[b'eos'])
-
- def testzlibmultipleresponses(self):
- # We feed in zlib compressed data on the same stream but belonging to
- # 2 different requests. This tests our flushing behavior.
- reactor = framing.clientreactor(
- globalui, buffersends=False, hasmultiplesend=True
- )
-
- request1, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- request2, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- outstream = framing.outputstream(2)
- outstream.setencoder(globalui, b'zlib')
-
- response1 = b''.join(
- cborutil.streamencode(
- {
- b'status': b'ok',
- b'extra': b'response1' * 10,
- }
- )
- )
-
- response2 = b''.join(
- cborutil.streamencode(
- {
- b'status': b'error',
- b'extra': b'response2' * 10,
- }
- )
- )
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zlib"'
- % request1.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- # Feeding partial data in won't get anything useful out.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request1.requestid, outstream.encode(response1))
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
-
- # But flushing data at both ends will get our original data.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request1.requestid, outstream.flush())
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], response1)
-
- # We should be able to reuse the compressor/decompressor for the
- # 2nd response.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request2.requestid, outstream.encode(response2))
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request2.requestid, outstream.flush())
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], response2)
-
- @unittest.skipUnless(zstd, 'zstd not available')
- def testzstd8mbencoding(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
- % request.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- result = {
- b'status': b'ok',
- }
- encoded = b''.join(cborutil.streamencode(result))
-
- encoder = framing.zstd8mbencoder(globalui)
- compressed = encoder.encode(encoded) + encoder.finish()
- self.assertEqual(
- zstd.ZstdDecompressor().decompress(
- compressed, max_output_size=len(encoded)
- ),
- encoded,
- )
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request.requestid, compressed)
- ),
- )
-
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], encoded)
-
- @unittest.skipUnless(zstd, 'zstd not available')
- def testzstd8mbencodingsinglebyteframes(self):
- reactor = framing.clientreactor(globalui, buffersends=False)
-
- request, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
- % request.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- result = {
- b'status': b'ok',
- }
- encoded = b''.join(cborutil.streamencode(result))
-
- compressed = zstd.ZstdCompressor().compress(encoded)
- self.assertEqual(
- zstd.ZstdDecompressor().decompress(compressed), encoded
- )
-
- chunks = []
-
- for i in range(len(compressed)):
- char = compressed[i : i + 1]
- if char == b'\\':
- char = b'\\\\'
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request.requestid, char)
- ),
- )
-
- self.assertEqual(action, b'responsedata')
- chunks.append(meta[b'data'])
- self.assertTrue(meta[b'expectmore'])
- self.assertFalse(meta[b'eos'])
-
- # zstd decompressor will flush at frame boundaries.
- self.assertEqual(b''.join(chunks), encoded)
-
- # End the stream for good measure.
- action, meta = sendframe(
- reactor,
- ffs(b'%d 2 stream-end command-response eos ' % request.requestid),
- )
-
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
- self.assertFalse(meta[b'expectmore'])
- self.assertTrue(meta[b'eos'])
-
- @unittest.skipUnless(zstd, 'zstd not available')
- def testzstd8mbmultipleresponses(self):
- # We feed in zstd compressed data on the same stream but belonging to
- # 2 different requests. This tests our flushing behavior.
- reactor = framing.clientreactor(
- globalui, buffersends=False, hasmultiplesend=True
- )
-
- request1, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- request2, action, meta = reactor.callcommand(b'foo', {})
- for f in meta[b'framegen']:
- pass
-
- outstream = framing.outputstream(2)
- outstream.setencoder(globalui, b'zstd-8mb')
-
- response1 = b''.join(
- cborutil.streamencode(
- {
- b'status': b'ok',
- b'extra': b'response1' * 10,
- }
- )
- )
-
- response2 = b''.join(
- cborutil.streamencode(
- {
- b'status': b'error',
- b'extra': b'response2' * 10,
- }
- )
- )
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 stream-begin stream-settings eos cbor:b"zstd-8mb"'
- % request1.requestid
- ),
- )
-
- self.assertEqual(action, b'noop')
- self.assertEqual(meta, {})
-
- # Feeding partial data in won't get anything useful out.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request1.requestid, outstream.encode(response1))
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
-
- # But flushing data at both ends will get our original data.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request1.requestid, outstream.flush())
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], response1)
-
- # We should be able to reuse the compressor/decompressor for the
- # 2nd response.
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response continuation %s'
- % (request2.requestid, outstream.encode(response2))
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], b'')
-
- action, meta = sendframe(
- reactor,
- ffs(
- b'%d 2 encoded command-response eos %s'
- % (request2.requestid, outstream.flush())
- ),
- )
- self.assertEqual(action, b'responsedata')
- self.assertEqual(meta[b'data'], response2)
-
-
-if __name__ == '__main__':
- if (3, 6, 0) <= sys.version_info < (3, 6, 4):
- # Python 3.6.0 through 3.6.3 inclusive shipped with
- # https://bugs.python.org/issue31825 and we can't run these
- # tests on those specific versions of Python. Sigh.
- sys.exit(80)
- import silenttestrunner
-
- silenttestrunner.main(__name__)
diff --git a/tests/test-check-pytype.t b/tests/test-check-pytype.t
--- a/tests/test-check-pytype.t
+++ b/tests/test-check-pytype.t
@@ -32,7 +32,6 @@
mercurial/unionrepo.py # ui, svfs, unfiltered [attribute-error]
mercurial/utils/memorytop.py # not 3.6 compatible
mercurial/win32.py # [not-callable]
-mercurial/wireprotoframing.py # [unsupported-operands], [attribute-error], [import-error]
mercurial/wireprotov1peer.py # [attribute-error]
mercurial/wireprotov1server.py # BUG?: BundleValueError handler accesses subclass's attrs
@@ -64,7 +63,6 @@
> -x mercurial/unionrepo.py \
> -x mercurial/utils/memorytop.py \
> -x mercurial/win32.py \
- > -x mercurial/wireprotoframing.py \
> -x mercurial/wireprotov1peer.py \
> -x mercurial/wireprotov1server.py \
> > $TESTTMP/pytype-output.txt || cat $TESTTMP/pytype-output.txt
diff --git a/mercurial/wireprotoframing.py b/mercurial/wireprotoframing.py
deleted file mode 100644
--- a/mercurial/wireprotoframing.py
+++ /dev/null
@@ -1,2089 +0,0 @@
-# wireprotoframing.py - unified framing protocol for wire protocol
-#
-# Copyright 2018 Gregory Szorc <gregory.szorc at gmail.com>
-#
-# This software may be used and distributed according to the terms of the
-# GNU General Public License version 2 or any later version.
-
-# This file contains functionality to support the unified frame-based wire
-# protocol. For details about the protocol, see
-# `hg help internals.wireprotocol`.
-
-
-import collections
-import struct
-
-from .i18n import _
-from .pycompat import getattr
-from .thirdparty import attr
-from . import (
- encoding,
- error,
- pycompat,
- util,
- wireprototypes,
-)
-from .utils import (
- cborutil,
- stringutil,
-)
-
-FRAME_HEADER_SIZE = 8
-DEFAULT_MAX_FRAME_SIZE = 32768
-
-STREAM_FLAG_BEGIN_STREAM = 0x01
-STREAM_FLAG_END_STREAM = 0x02
-STREAM_FLAG_ENCODING_APPLIED = 0x04
-
-STREAM_FLAGS = {
- b'stream-begin': STREAM_FLAG_BEGIN_STREAM,
- b'stream-end': STREAM_FLAG_END_STREAM,
- b'encoded': STREAM_FLAG_ENCODING_APPLIED,
-}
-
-FRAME_TYPE_COMMAND_REQUEST = 0x01
-FRAME_TYPE_COMMAND_DATA = 0x02
-FRAME_TYPE_COMMAND_RESPONSE = 0x03
-FRAME_TYPE_ERROR_RESPONSE = 0x05
-FRAME_TYPE_TEXT_OUTPUT = 0x06
-FRAME_TYPE_PROGRESS = 0x07
-FRAME_TYPE_SENDER_PROTOCOL_SETTINGS = 0x08
-FRAME_TYPE_STREAM_SETTINGS = 0x09
-
-FRAME_TYPES = {
- b'command-request': FRAME_TYPE_COMMAND_REQUEST,
- b'command-data': FRAME_TYPE_COMMAND_DATA,
- b'command-response': FRAME_TYPE_COMMAND_RESPONSE,
- b'error-response': FRAME_TYPE_ERROR_RESPONSE,
- b'text-output': FRAME_TYPE_TEXT_OUTPUT,
- b'progress': FRAME_TYPE_PROGRESS,
- b'sender-protocol-settings': FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
- b'stream-settings': FRAME_TYPE_STREAM_SETTINGS,
-}
-
-FLAG_COMMAND_REQUEST_NEW = 0x01
-FLAG_COMMAND_REQUEST_CONTINUATION = 0x02
-FLAG_COMMAND_REQUEST_MORE_FRAMES = 0x04
-FLAG_COMMAND_REQUEST_EXPECT_DATA = 0x08
-
-FLAGS_COMMAND_REQUEST = {
- b'new': FLAG_COMMAND_REQUEST_NEW,
- b'continuation': FLAG_COMMAND_REQUEST_CONTINUATION,
- b'more': FLAG_COMMAND_REQUEST_MORE_FRAMES,
- b'have-data': FLAG_COMMAND_REQUEST_EXPECT_DATA,
-}
-
-FLAG_COMMAND_DATA_CONTINUATION = 0x01
-FLAG_COMMAND_DATA_EOS = 0x02
-
-FLAGS_COMMAND_DATA = {
- b'continuation': FLAG_COMMAND_DATA_CONTINUATION,
- b'eos': FLAG_COMMAND_DATA_EOS,
-}
-
-FLAG_COMMAND_RESPONSE_CONTINUATION = 0x01
-FLAG_COMMAND_RESPONSE_EOS = 0x02
-
-FLAGS_COMMAND_RESPONSE = {
- b'continuation': FLAG_COMMAND_RESPONSE_CONTINUATION,
- b'eos': FLAG_COMMAND_RESPONSE_EOS,
-}
-
-FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION = 0x01
-FLAG_SENDER_PROTOCOL_SETTINGS_EOS = 0x02
-
-FLAGS_SENDER_PROTOCOL_SETTINGS = {
- b'continuation': FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION,
- b'eos': FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
-}
-
-FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION = 0x01
-FLAG_STREAM_ENCODING_SETTINGS_EOS = 0x02
-
-FLAGS_STREAM_ENCODING_SETTINGS = {
- b'continuation': FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION,
- b'eos': FLAG_STREAM_ENCODING_SETTINGS_EOS,
-}
-
-# Maps frame types to their available flags.
-FRAME_TYPE_FLAGS = {
- FRAME_TYPE_COMMAND_REQUEST: FLAGS_COMMAND_REQUEST,
- FRAME_TYPE_COMMAND_DATA: FLAGS_COMMAND_DATA,
- FRAME_TYPE_COMMAND_RESPONSE: FLAGS_COMMAND_RESPONSE,
- FRAME_TYPE_ERROR_RESPONSE: {},
- FRAME_TYPE_TEXT_OUTPUT: {},
- FRAME_TYPE_PROGRESS: {},
- FRAME_TYPE_SENDER_PROTOCOL_SETTINGS: FLAGS_SENDER_PROTOCOL_SETTINGS,
- FRAME_TYPE_STREAM_SETTINGS: FLAGS_STREAM_ENCODING_SETTINGS,
-}
-
-ARGUMENT_RECORD_HEADER = struct.Struct('<HH')
-
-
-def humanflags(mapping, value):
- """Convert a numeric flags value to a human value, using a mapping table."""
- namemap = {v: k for k, v in mapping.items()}
- flags = []
- val = 1
- while value >= val:
- if value & val:
- flags.append(namemap.get(val, b'<unknown 0x%02x>' % val))
- val <<= 1
-
- return b'|'.join(flags)
-
-
- at attr.s(slots=True)
-class frameheader:
- """Represents the data in a frame header."""
-
- length = attr.ib()
- requestid = attr.ib()
- streamid = attr.ib()
- streamflags = attr.ib()
- typeid = attr.ib()
- flags = attr.ib()
-
-
- at attr.s(slots=True, repr=False)
-class frame:
- """Represents a parsed frame."""
-
- requestid = attr.ib()
- streamid = attr.ib()
- streamflags = attr.ib()
- typeid = attr.ib()
- flags = attr.ib()
- payload = attr.ib()
-
- @encoding.strmethod
- def __repr__(self):
- typename = b'<unknown 0x%02x>' % self.typeid
- for name, value in FRAME_TYPES.items():
- if value == self.typeid:
- typename = name
- break
-
- return (
- b'frame(size=%d; request=%d; stream=%d; streamflags=%s; '
- b'type=%s; flags=%s)'
- % (
- len(self.payload),
- self.requestid,
- self.streamid,
- humanflags(STREAM_FLAGS, self.streamflags),
- typename,
- humanflags(FRAME_TYPE_FLAGS.get(self.typeid, {}), self.flags),
- )
- )
-
-
-def makeframe(requestid, streamid, streamflags, typeid, flags, payload):
- """Assemble a frame into a byte array."""
- # TODO assert size of payload.
- frame = bytearray(FRAME_HEADER_SIZE + len(payload))
-
- # 24 bits length
- # 16 bits request id
- # 8 bits stream id
- # 8 bits stream flags
- # 4 bits type
- # 4 bits flags
-
- l = struct.pack('<I', len(payload))
- frame[0:3] = l[0:3]
- struct.pack_into('<HBB', frame, 3, requestid, streamid, streamflags)
- frame[7] = (typeid << 4) | flags
- frame[8:] = payload
-
- return frame
-
-
-def makeframefromhumanstring(s):
- """Create a frame from a human readable string
-
- Strings have the form:
-
- <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
-
- This can be used by user-facing applications and tests for creating
- frames easily without having to type out a bunch of constants.
-
- Request ID and stream IDs are integers.
-
- Stream flags, frame type, and flags can be specified by integer or
- named constant.
-
- Flags can be delimited by `|` to bitwise OR them together.
-
- If the payload begins with ``cbor:``, the following string will be
- evaluated as Python literal and the resulting object will be fed into
- a CBOR encoder. Otherwise, the payload is interpreted as a Python
- byte string literal.
- """
- fields = s.split(b' ', 5)
- requestid, streamid, streamflags, frametype, frameflags, payload = fields
-
- requestid = int(requestid)
- streamid = int(streamid)
-
- finalstreamflags = 0
- for flag in streamflags.split(b'|'):
- if flag in STREAM_FLAGS:
- finalstreamflags |= STREAM_FLAGS[flag]
- else:
- finalstreamflags |= int(flag)
-
- if frametype in FRAME_TYPES:
- frametype = FRAME_TYPES[frametype]
- else:
- frametype = int(frametype)
-
- finalflags = 0
- validflags = FRAME_TYPE_FLAGS[frametype]
- for flag in frameflags.split(b'|'):
- if flag in validflags:
- finalflags |= validflags[flag]
- else:
- finalflags |= int(flag)
-
- if payload.startswith(b'cbor:'):
- payload = b''.join(
- cborutil.streamencode(stringutil.evalpythonliteral(payload[5:]))
- )
-
- else:
- payload = stringutil.unescapestr(payload)
-
- return makeframe(
- requestid=requestid,
- streamid=streamid,
- streamflags=finalstreamflags,
- typeid=frametype,
- flags=finalflags,
- payload=payload,
- )
-
-
-def parseheader(data):
- """Parse a unified framing protocol frame header from a buffer.
-
- The header is expected to be in the buffer at offset 0 and the
- buffer is expected to be large enough to hold a full header.
- """
- # 24 bits payload length (little endian)
- # 16 bits request ID
- # 8 bits stream ID
- # 8 bits stream flags
- # 4 bits frame type
- # 4 bits frame flags
- # ... payload
- framelength = data[0] + 256 * data[1] + 16384 * data[2]
- requestid, streamid, streamflags = struct.unpack_from('<HBB', data, 3)
- typeflags = data[7]
-
- frametype = (typeflags & 0xF0) >> 4
- frameflags = typeflags & 0x0F
-
- return frameheader(
- framelength, requestid, streamid, streamflags, frametype, frameflags
- )
-
-
-def readframe(fh):
- """Read a unified framing protocol frame from a file object.
-
- Returns a 3-tuple of (type, flags, payload) for the decoded frame or
- None if no frame is available. May raise if a malformed frame is
- seen.
- """
- header = bytearray(FRAME_HEADER_SIZE)
-
- readcount = fh.readinto(header)
-
- if readcount == 0:
- return None
-
- if readcount != FRAME_HEADER_SIZE:
- raise error.Abort(
- _(b'received incomplete frame: got %d bytes: %s')
- % (readcount, header)
- )
-
- h = parseheader(header)
-
- payload = fh.read(h.length)
- if len(payload) != h.length:
- raise error.Abort(
- _(b'frame length error: expected %d; got %d')
- % (h.length, len(payload))
- )
-
- return frame(
- h.requestid, h.streamid, h.streamflags, h.typeid, h.flags, payload
- )
-
-
-def createcommandframes(
- stream,
- requestid,
- cmd,
- args,
- datafh=None,
- maxframesize=DEFAULT_MAX_FRAME_SIZE,
- redirect=None,
-):
- """Create frames necessary to transmit a request to run a command.
-
- This is a generator of bytearrays. Each item represents a frame
- ready to be sent over the wire to a peer.
- """
- data = {b'name': cmd}
- if args:
- data[b'args'] = args
-
- if redirect:
- data[b'redirect'] = redirect
-
- data = b''.join(cborutil.streamencode(data))
-
- offset = 0
-
- while True:
- flags = 0
-
- # Must set new or continuation flag.
- if not offset:
- flags |= FLAG_COMMAND_REQUEST_NEW
- else:
- flags |= FLAG_COMMAND_REQUEST_CONTINUATION
-
- # Data frames is set on all frames.
- if datafh:
- flags |= FLAG_COMMAND_REQUEST_EXPECT_DATA
-
- payload = data[offset : offset + maxframesize]
- offset += len(payload)
-
- if len(payload) == maxframesize and offset < len(data):
- flags |= FLAG_COMMAND_REQUEST_MORE_FRAMES
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_REQUEST,
- flags=flags,
- payload=payload,
- )
-
- if not (flags & FLAG_COMMAND_REQUEST_MORE_FRAMES):
- break
-
- if datafh:
- while True:
- data = datafh.read(DEFAULT_MAX_FRAME_SIZE)
-
- done = False
- if len(data) == DEFAULT_MAX_FRAME_SIZE:
- flags = FLAG_COMMAND_DATA_CONTINUATION
- else:
- flags = FLAG_COMMAND_DATA_EOS
- assert datafh.read(1) == b''
- done = True
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_DATA,
- flags=flags,
- payload=data,
- )
-
- if done:
- break
-
-
-def createcommandresponseokframe(stream, requestid):
- overall = b''.join(cborutil.streamencode({b'status': b'ok'}))
-
- if stream.streamsettingssent:
- overall = stream.encode(overall)
- encoded = True
-
- if not overall:
- return None
- else:
- encoded = False
-
- return stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=overall,
- encoded=encoded,
- )
-
-
-def createcommandresponseeosframes(
- stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE
-):
- """Create an empty payload frame representing command end-of-stream."""
- payload = stream.flush()
-
- offset = 0
- while True:
- chunk = payload[offset : offset + maxframesize]
- offset += len(chunk)
-
- done = offset == len(payload)
-
- if done:
- flags = FLAG_COMMAND_RESPONSE_EOS
- else:
- flags = FLAG_COMMAND_RESPONSE_CONTINUATION
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=flags,
- payload=chunk,
- encoded=payload != b'',
- )
-
- if done:
- break
-
-
-def createalternatelocationresponseframe(stream, requestid, location):
- data = {
- b'status': b'redirect',
- b'location': {
- b'url': location.url,
- b'mediatype': location.mediatype,
- },
- }
-
- for a in (
- 'size',
- 'fullhashes',
- 'fullhashseed',
- 'serverdercerts',
- 'servercadercerts',
- ):
- value = getattr(location, a)
- if value is not None:
- data[b'location'][pycompat.bytestr(a)] = value
-
- payload = b''.join(cborutil.streamencode(data))
-
- if stream.streamsettingssent:
- payload = stream.encode(payload)
- encoded = True
- else:
- encoded = False
-
- return stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=payload,
- encoded=encoded,
- )
-
-
-def createcommanderrorresponse(stream, requestid, message, args=None):
- # TODO should this be using a list of {'msg': ..., 'args': {}} so atom
- # formatting works consistently?
- m = {
- b'status': b'error',
- b'error': {
- b'message': message,
- },
- }
-
- if args:
- m[b'error'][b'args'] = args
-
- overall = b''.join(cborutil.streamencode(m))
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_EOS,
- payload=overall,
- )
-
-
-def createerrorframe(stream, requestid, msg, errtype):
- # TODO properly handle frame size limits.
- assert len(msg) <= DEFAULT_MAX_FRAME_SIZE
-
- payload = b''.join(
- cborutil.streamencode(
- {
- b'type': errtype,
- b'message': [{b'msg': msg}],
- }
- )
- )
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_ERROR_RESPONSE,
- flags=0,
- payload=payload,
- )
-
-
-def createtextoutputframe(
- stream, requestid, atoms, maxframesize=DEFAULT_MAX_FRAME_SIZE
-):
- """Create a text output frame to render text to people.
-
- ``atoms`` is a 3-tuple of (formatting string, args, labels).
-
- The formatting string contains ``%s`` tokens to be replaced by the
- corresponding indexed entry in ``args``. ``labels`` is an iterable of
- formatters to be applied at rendering time. In terms of the ``ui``
- class, each atom corresponds to a ``ui.write()``.
- """
- atomdicts = []
-
- for (formatting, args, labels) in atoms:
- # TODO look for localstr, other types here?
-
- if not isinstance(formatting, bytes):
- raise ValueError(b'must use bytes formatting strings')
- for arg in args:
- if not isinstance(arg, bytes):
- raise ValueError(b'must use bytes for arguments')
- for label in labels:
- if not isinstance(label, bytes):
- raise ValueError(b'must use bytes for labels')
-
- # Formatting string must be ASCII.
- formatting = formatting.decode('ascii', 'replace').encode('ascii')
-
- # Arguments must be UTF-8.
- args = [a.decode('utf-8', 'replace').encode('utf-8') for a in args]
-
- # Labels must be ASCII.
- labels = [l.decode('ascii', 'strict').encode('ascii') for l in labels]
-
- atom = {b'msg': formatting}
- if args:
- atom[b'args'] = args
- if labels:
- atom[b'labels'] = labels
-
- atomdicts.append(atom)
-
- payload = b''.join(cborutil.streamencode(atomdicts))
-
- if len(payload) > maxframesize:
- raise ValueError(b'cannot encode data in a single frame')
-
- yield stream.makeframe(
- requestid=requestid,
- typeid=FRAME_TYPE_TEXT_OUTPUT,
- flags=0,
- payload=payload,
- )
-
-
-class bufferingcommandresponseemitter:
- """Helper object to emit command response frames intelligently.
-
- Raw command response data is likely emitted in chunks much smaller
- than what can fit in a single frame. This class exists to buffer
- chunks until enough data is available to fit in a single frame.
-
- TODO we'll need something like this when compression is supported.
- So it might make sense to implement this functionality at the stream
- level.
- """
-
- def __init__(self, stream, requestid, maxframesize=DEFAULT_MAX_FRAME_SIZE):
- self._stream = stream
- self._requestid = requestid
- self._maxsize = maxframesize
- self._chunks = []
- self._chunkssize = 0
-
- def send(self, data):
- """Send new data for emission.
-
- Is a generator of new frames that were derived from the new input.
-
- If the special input ``None`` is received, flushes all buffered
- data to frames.
- """
-
- if data is None:
- for frame in self._flush():
- yield frame
- return
-
- data = self._stream.encode(data)
-
- # There is a ton of potential to do more complicated things here.
- # Our immediate goal is to coalesce small chunks into big frames,
- # not achieve the fewest number of frames possible. So we go with
- # a simple implementation:
- #
- # * If a chunk is too large for a frame, we flush and emit frames
- # for the new chunk.
- # * If a chunk can be buffered without total buffered size limits
- # being exceeded, we do that.
- # * If a chunk causes us to go over our buffering limit, we flush
- # and then buffer the new chunk.
-
- if not data:
- return
-
- if len(data) > self._maxsize:
- for frame in self._flush():
- yield frame
-
- # Now emit frames for the big chunk.
- offset = 0
- while True:
- chunk = data[offset : offset + self._maxsize]
- offset += len(chunk)
-
- yield self._stream.makeframe(
- self._requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=chunk,
- encoded=True,
- )
-
- if offset == len(data):
- return
-
- # If we don't have enough to constitute a full frame, buffer and
- # return.
- if len(data) + self._chunkssize < self._maxsize:
- self._chunks.append(data)
- self._chunkssize += len(data)
- return
-
- # Else flush what we have and buffer the new chunk. We could do
- # something more intelligent here, like break the chunk. Let's
- # keep things simple for now.
- for frame in self._flush():
- yield frame
-
- self._chunks.append(data)
- self._chunkssize = len(data)
-
- def _flush(self):
- payload = b''.join(self._chunks)
- assert len(payload) <= self._maxsize
-
- self._chunks[:] = []
- self._chunkssize = 0
-
- if not payload:
- return
-
- yield self._stream.makeframe(
- self._requestid,
- typeid=FRAME_TYPE_COMMAND_RESPONSE,
- flags=FLAG_COMMAND_RESPONSE_CONTINUATION,
- payload=payload,
- encoded=True,
- )
-
-
-# TODO consider defining encoders/decoders using the util.compressionengine
-# mechanism.
-
-
-class identityencoder:
- """Encoder for the "identity" stream encoding profile."""
-
- def __init__(self, ui):
- pass
-
- def encode(self, data):
- return data
-
- def flush(self):
- return b''
-
- def finish(self):
- return b''
-
-
-class identitydecoder:
- """Decoder for the "identity" stream encoding profile."""
-
- def __init__(self, ui, extraobjs):
- if extraobjs:
- raise error.Abort(
- _(b'identity decoder received unexpected additional values')
- )
-
- def decode(self, data):
- return data
-
-
-class zlibencoder:
- def __init__(self, ui):
- import zlib
-
- self._zlib = zlib
- self._compressor = zlib.compressobj()
-
- def encode(self, data):
- return self._compressor.compress(data)
-
- def flush(self):
- # Z_SYNC_FLUSH doesn't reset compression context, which is
- # what we want.
- return self._compressor.flush(self._zlib.Z_SYNC_FLUSH)
-
- def finish(self):
- res = self._compressor.flush(self._zlib.Z_FINISH)
- self._compressor = None
- return res
-
-
-class zlibdecoder:
- def __init__(self, ui, extraobjs):
- import zlib
-
- if extraobjs:
- raise error.Abort(
- _(b'zlib decoder received unexpected additional values')
- )
-
- self._decompressor = zlib.decompressobj()
-
- def decode(self, data):
- return self._decompressor.decompress(data)
-
-
-class zstdbaseencoder:
- def __init__(self, level):
- from . import zstd
-
- self._zstd = zstd
- cctx = zstd.ZstdCompressor(level=level)
- self._compressor = cctx.compressobj()
-
- def encode(self, data):
- return self._compressor.compress(data)
-
- def flush(self):
- # COMPRESSOBJ_FLUSH_BLOCK flushes all data previously fed into the
- # compressor and allows a decompressor to access all encoded data
- # up to this point.
- return self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_BLOCK)
-
- def finish(self):
- res = self._compressor.flush(self._zstd.COMPRESSOBJ_FLUSH_FINISH)
- self._compressor = None
- return res
-
-
-class zstd8mbencoder(zstdbaseencoder):
- def __init__(self, ui):
- super(zstd8mbencoder, self).__init__(3)
-
-
-class zstdbasedecoder:
- def __init__(self, maxwindowsize):
- from . import zstd
-
- dctx = zstd.ZstdDecompressor(max_window_size=maxwindowsize)
- self._decompressor = dctx.decompressobj()
-
- def decode(self, data):
- return self._decompressor.decompress(data)
-
-
-class zstd8mbdecoder(zstdbasedecoder):
- def __init__(self, ui, extraobjs):
- if extraobjs:
- raise error.Abort(
- _(b'zstd8mb decoder received unexpected additional values')
- )
-
- super(zstd8mbdecoder, self).__init__(maxwindowsize=8 * 1048576)
-
-
-# We lazily populate this to avoid excessive module imports when importing
-# this module.
-STREAM_ENCODERS = {}
-STREAM_ENCODERS_ORDER = []
-
-
-def populatestreamencoders():
- if STREAM_ENCODERS:
- return
-
- try:
- from . import zstd
-
- zstd.__version__
- except ImportError:
- zstd = None
-
- # zstandard is fastest and is preferred.
- if zstd:
- STREAM_ENCODERS[b'zstd-8mb'] = (zstd8mbencoder, zstd8mbdecoder)
- STREAM_ENCODERS_ORDER.append(b'zstd-8mb')
-
- STREAM_ENCODERS[b'zlib'] = (zlibencoder, zlibdecoder)
- STREAM_ENCODERS_ORDER.append(b'zlib')
-
- STREAM_ENCODERS[b'identity'] = (identityencoder, identitydecoder)
- STREAM_ENCODERS_ORDER.append(b'identity')
-
-
-class stream:
- """Represents a logical unidirectional series of frames."""
-
- def __init__(self, streamid, active=False):
- self.streamid = streamid
- self._active = active
-
- def makeframe(self, requestid, typeid, flags, payload):
- """Create a frame to be sent out over this stream.
-
- Only returns the frame instance. Does not actually send it.
- """
- streamflags = 0
- if not self._active:
- streamflags |= STREAM_FLAG_BEGIN_STREAM
- self._active = True
-
- return makeframe(
- requestid, self.streamid, streamflags, typeid, flags, payload
- )
-
-
-class inputstream(stream):
- """Represents a stream used for receiving data."""
-
- def __init__(self, streamid, active=False):
- super(inputstream, self).__init__(streamid, active=active)
- self._decoder = None
-
- def setdecoder(self, ui, name, extraobjs):
- """Set the decoder for this stream.
-
- Receives the stream profile name and any additional CBOR objects
- decoded from the stream encoding settings frame payloads.
- """
- if name not in STREAM_ENCODERS:
- raise error.Abort(_(b'unknown stream decoder: %s') % name)
-
- self._decoder = STREAM_ENCODERS[name][1](ui, extraobjs)
-
- def decode(self, data):
- # Default is identity decoder. We don't bother instantiating one
- # because it is trivial.
- if not self._decoder:
- return data
-
- return self._decoder.decode(data)
-
- def flush(self):
- if not self._decoder:
- return b''
-
- return self._decoder.flush()
-
-
-class outputstream(stream):
- """Represents a stream used for sending data."""
-
- def __init__(self, streamid, active=False):
- super(outputstream, self).__init__(streamid, active=active)
- self.streamsettingssent = False
- self._encoder = None
- self._encodername = None
-
- def setencoder(self, ui, name):
- """Set the encoder for this stream.
-
- Receives the stream profile name.
- """
- if name not in STREAM_ENCODERS:
- raise error.Abort(_(b'unknown stream encoder: %s') % name)
-
- self._encoder = STREAM_ENCODERS[name][0](ui)
- self._encodername = name
-
- def encode(self, data):
- if not self._encoder:
- return data
-
- return self._encoder.encode(data)
-
- def flush(self):
- if not self._encoder:
- return b''
-
- return self._encoder.flush()
-
- def finish(self):
- if not self._encoder:
- return b''
-
- self._encoder.finish()
-
- def makeframe(self, requestid, typeid, flags, payload, encoded=False):
- """Create a frame to be sent out over this stream.
-
- Only returns the frame instance. Does not actually send it.
- """
- streamflags = 0
- if not self._active:
- streamflags |= STREAM_FLAG_BEGIN_STREAM
- self._active = True
-
- if encoded:
- if not self.streamsettingssent:
- raise error.ProgrammingError(
- b'attempting to send encoded frame without sending stream '
- b'settings'
- )
-
- streamflags |= STREAM_FLAG_ENCODING_APPLIED
-
- if (
- typeid == FRAME_TYPE_STREAM_SETTINGS
- and flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
- ):
- self.streamsettingssent = True
-
- return makeframe(
- requestid, self.streamid, streamflags, typeid, flags, payload
- )
-
- def makestreamsettingsframe(self, requestid):
- """Create a stream settings frame for this stream.
-
- Returns frame data or None if no stream settings frame is needed or has
- already been sent.
- """
- if not self._encoder or self.streamsettingssent:
- return None
-
- payload = b''.join(cborutil.streamencode(self._encodername))
- return self.makeframe(
- requestid,
- FRAME_TYPE_STREAM_SETTINGS,
- FLAG_STREAM_ENCODING_SETTINGS_EOS,
- payload,
- )
-
-
-def ensureserverstream(stream):
- if stream.streamid % 2:
- raise error.ProgrammingError(
- b'server should only write to even '
- b'numbered streams; %d is not even' % stream.streamid
- )
-
-
-DEFAULT_PROTOCOL_SETTINGS = {
- b'contentencodings': [b'identity'],
-}
-
-
-class serverreactor:
- """Holds state of a server handling frame-based protocol requests.
-
- This class is the "brain" of the unified frame-based protocol server
- component. While the protocol is stateless from the perspective of
- requests/commands, something needs to track which frames have been
- received, what frames to expect, etc. This class is that thing.
-
- Instances are modeled as a state machine of sorts. Instances are also
- reactionary to external events. The point of this class is to encapsulate
- the state of the connection and the exchange of frames, not to perform
- work. Instead, callers tell this class when something occurs, like a
- frame arriving. If that activity is worthy of a follow-up action (say
- *run a command*), the return value of that handler will say so.
-
- I/O and CPU intensive operations are purposefully delegated outside of
- this class.
-
- Consumers are expected to tell instances when events occur. They do so by
- calling the various ``on*`` methods. These methods return a 2-tuple
- describing any follow-up action(s) to take. The first element is the
- name of an action to perform. The second is a data structure (usually
- a dict) specific to that action that contains more information. e.g.
- if the server wants to send frames back to the client, the data structure
- will contain a reference to those frames.
-
- Valid actions that consumers can be instructed to take are:
-
- sendframes
- Indicates that frames should be sent to the client. The ``framegen``
- key contains a generator of frames that should be sent. The server
- assumes that all frames are sent to the client.
-
- error
- Indicates that an error occurred. Consumer should probably abort.
-
- runcommand
- Indicates that the consumer should run a wire protocol command. Details
- of the command to run are given in the data structure.
-
- wantframe
- Indicates that nothing of interest happened and the server is waiting on
- more frames from the client before anything interesting can be done.
-
- noop
- Indicates no additional action is required.
-
- Known Issues
- ------------
-
- There are no limits to the number of partially received commands or their
- size. A malicious client could stream command request data and exhaust the
- server's memory.
-
- Partially received commands are not acted upon when end of input is
- reached. Should the server error if it receives a partial request?
- Should the client send a message to abort a partially transmitted request
- to facilitate graceful shutdown?
-
- Active requests that haven't been responded to aren't tracked. This means
- that if we receive a command and instruct its dispatch, another command
- with its request ID can come in over the wire and there will be a race
- between who responds to what.
- """
-
- def __init__(self, ui, deferoutput=False):
- """Construct a new server reactor.
-
- ``deferoutput`` can be used to indicate that no output frames should be
- instructed to be sent until input has been exhausted. In this mode,
- events that would normally generate output frames (such as a command
- response being ready) will instead defer instructing the consumer to
- send those frames. This is useful for half-duplex transports where the
- sender cannot receive until all data has been transmitted.
- """
- self._ui = ui
- self._deferoutput = deferoutput
- self._state = b'initial'
- self._nextoutgoingstreamid = 2
- self._bufferedframegens = []
- # stream id -> stream instance for all active streams from the client.
- self._incomingstreams = {}
- self._outgoingstreams = {}
- # request id -> dict of commands that are actively being received.
- self._receivingcommands = {}
- # Request IDs that have been received and are actively being processed.
- # Once all output for a request has been sent, it is removed from this
- # set.
- self._activecommands = set()
-
- self._protocolsettingsdecoder = None
-
- # Sender protocol settings are optional. Set implied default values.
- self._sendersettings = dict(DEFAULT_PROTOCOL_SETTINGS)
-
- populatestreamencoders()
-
- def onframerecv(self, frame):
- """Process a frame that has been received off the wire.
-
- Returns a dict with an ``action`` key that details what action,
- if any, the consumer should take next.
- """
- if not frame.streamid % 2:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'received frame with even numbered stream ID: %d')
- % frame.streamid
- )
-
- if frame.streamid not in self._incomingstreams:
- if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'received frame on unknown inactive stream without '
- b'beginning of stream flag set'
- )
- )
-
- self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
-
- if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
- # TODO handle decoding frames
- self._state = b'errored'
- raise error.ProgrammingError(
- b'support for decoding stream payloads not yet implemented'
- )
-
- if frame.streamflags & STREAM_FLAG_END_STREAM:
- del self._incomingstreams[frame.streamid]
-
- handlers = {
- b'initial': self._onframeinitial,
- b'protocol-settings-receiving': self._onframeprotocolsettings,
- b'idle': self._onframeidle,
- b'command-receiving': self._onframecommandreceiving,
- b'errored': self._onframeerrored,
- }
-
- meth = handlers.get(self._state)
- if not meth:
- raise error.ProgrammingError(b'unhandled state: %s' % self._state)
-
- return meth(frame)
-
- def oncommandresponsereadyobjects(self, stream, requestid, objs):
- """Signal that objects are ready to be sent to the client.
-
- ``objs`` is an iterable of objects (typically a generator) that will
- be encoded via CBOR and added to frames, which will be sent to the
- client.
- """
- ensureserverstream(stream)
-
- # A more robust solution would be to check for objs.{next,__next__}.
- if isinstance(objs, list):
- objs = iter(objs)
-
- # We need to take care over exception handling. Uncaught exceptions
- # when generating frames could lead to premature end of the frame
- # stream and the possibility of the server or client process getting
- # in a bad state.
- #
- # Keep in mind that if ``objs`` is a generator, advancing it could
- # raise exceptions that originated in e.g. wire protocol command
- # functions. That is why we differentiate between exceptions raised
- # when iterating versus other exceptions that occur.
- #
- # In all cases, when the function finishes, the request is fully
- # handled and no new frames for it should be seen.
-
- def sendframes():
- emitted = False
- alternatelocationsent = False
- emitter = bufferingcommandresponseemitter(stream, requestid)
- while True:
- try:
- o = next(objs)
- except StopIteration:
- for frame in emitter.send(None):
- yield frame
-
- if emitted:
- for frame in createcommandresponseeosframes(
- stream, requestid
- ):
- yield frame
- break
-
- except error.WireprotoCommandError as e:
- for frame in createcommanderrorresponse(
- stream, requestid, e.message, e.messageargs
- ):
- yield frame
- break
-
- except Exception as e:
- for frame in createerrorframe(
- stream,
- requestid,
- b'%s' % stringutil.forcebytestr(e),
- errtype=b'server',
- ):
-
- yield frame
-
- break
-
- try:
- # Alternate location responses can only be the first and
- # only object in the output stream.
- if isinstance(o, wireprototypes.alternatelocationresponse):
- if emitted:
- raise error.ProgrammingError(
- b'alternatelocationresponse seen after initial '
- b'output object'
- )
-
- frame = stream.makestreamsettingsframe(requestid)
- if frame:
- yield frame
-
- yield createalternatelocationresponseframe(
- stream, requestid, o
- )
-
- alternatelocationsent = True
- emitted = True
- continue
-
- if alternatelocationsent:
- raise error.ProgrammingError(
- b'object follows alternatelocationresponse'
- )
-
- if not emitted:
- # Frame is optional.
- frame = stream.makestreamsettingsframe(requestid)
- if frame:
- yield frame
-
- # May be None if empty frame (due to encoding).
- frame = createcommandresponseokframe(stream, requestid)
- if frame:
- yield frame
-
- emitted = True
-
- # Objects emitted by command functions can be serializable
- # data structures or special types.
- # TODO consider extracting the content normalization to a
- # standalone function, as it may be useful for e.g. cachers.
-
- # A pre-encoded object is sent directly to the emitter.
- if isinstance(o, wireprototypes.encodedresponse):
- for frame in emitter.send(o.data):
- yield frame
-
- elif isinstance(
- o, wireprototypes.indefinitebytestringresponse
- ):
- for chunk in cborutil.streamencodebytestringfromiter(
- o.chunks
- ):
-
- for frame in emitter.send(chunk):
- yield frame
-
- # A regular object is CBOR encoded.
- else:
- for chunk in cborutil.streamencode(o):
- for frame in emitter.send(chunk):
- yield frame
-
- except Exception as e:
- for frame in createerrorframe(
- stream, requestid, b'%s' % e, errtype=b'server'
- ):
- yield frame
-
- break
-
- self._activecommands.remove(requestid)
-
- return self._handlesendframes(sendframes())
-
- def oninputeof(self):
- """Signals that end of input has been received.
-
- No more frames will be received. All pending activity should be
- completed.
- """
- # TODO should we do anything about in-flight commands?
-
- if not self._deferoutput or not self._bufferedframegens:
- return b'noop', {}
-
- # If we buffered all our responses, emit those.
- def makegen():
- for gen in self._bufferedframegens:
- for frame in gen:
- yield frame
-
- return b'sendframes', {
- b'framegen': makegen(),
- }
-
- def _handlesendframes(self, framegen):
- if self._deferoutput:
- self._bufferedframegens.append(framegen)
- return b'noop', {}
- else:
- return b'sendframes', {
- b'framegen': framegen,
- }
-
- def onservererror(self, stream, requestid, msg):
- ensureserverstream(stream)
-
- def sendframes():
- for frame in createerrorframe(
- stream, requestid, msg, errtype=b'server'
- ):
- yield frame
-
- self._activecommands.remove(requestid)
-
- return self._handlesendframes(sendframes())
-
- def oncommanderror(self, stream, requestid, message, args=None):
- """Called when a command encountered an error before sending output."""
- ensureserverstream(stream)
-
- def sendframes():
- for frame in createcommanderrorresponse(
- stream, requestid, message, args
- ):
- yield frame
-
- self._activecommands.remove(requestid)
-
- return self._handlesendframes(sendframes())
-
- def makeoutputstream(self):
- """Create a stream to be used for sending data to the client.
-
- If this is called before protocol settings frames are received, we
- don't know what stream encodings are supported by the client and
- we will default to identity.
- """
- streamid = self._nextoutgoingstreamid
- self._nextoutgoingstreamid += 2
-
- s = outputstream(streamid)
- self._outgoingstreams[streamid] = s
-
- # Always use the *server's* preferred encoder over the client's,
- # as servers have more to lose from sub-optimal encoders being used.
- for name in STREAM_ENCODERS_ORDER:
- if name in self._sendersettings[b'contentencodings']:
- s.setencoder(self._ui, name)
- break
-
- return s
-
- def _makeerrorresult(self, msg):
- return b'error', {
- b'message': msg,
- }
-
- def _makeruncommandresult(self, requestid):
- entry = self._receivingcommands[requestid]
-
- if not entry[b'requestdone']:
- self._state = b'errored'
- raise error.ProgrammingError(
- b'should not be called without requestdone set'
- )
-
- del self._receivingcommands[requestid]
-
- if self._receivingcommands:
- self._state = b'command-receiving'
- else:
- self._state = b'idle'
-
- # Decode the payloads as CBOR.
- entry[b'payload'].seek(0)
- request = cborutil.decodeall(entry[b'payload'].getvalue())[0]
-
- if b'name' not in request:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'command request missing "name" field')
- )
-
- if b'args' not in request:
- request[b'args'] = {}
-
- assert requestid not in self._activecommands
- self._activecommands.add(requestid)
-
- return (
- b'runcommand',
- {
- b'requestid': requestid,
- b'command': request[b'name'],
- b'args': request[b'args'],
- b'redirect': request.get(b'redirect'),
- b'data': entry[b'data'].getvalue() if entry[b'data'] else None,
- },
- )
-
- def _makewantframeresult(self):
- return b'wantframe', {
- b'state': self._state,
- }
-
- def _validatecommandrequestframe(self, frame):
- new = frame.flags & FLAG_COMMAND_REQUEST_NEW
- continuation = frame.flags & FLAG_COMMAND_REQUEST_CONTINUATION
-
- if new and continuation:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'received command request frame with both new and '
- b'continuation flags set'
- )
- )
-
- if not new and not continuation:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'received command request frame with neither new nor '
- b'continuation flags set'
- )
- )
-
- def _onframeinitial(self, frame):
- # Called when we receive a frame when in the "initial" state.
- if frame.typeid == FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
- self._state = b'protocol-settings-receiving'
- self._protocolsettingsdecoder = cborutil.bufferingdecoder()
- return self._onframeprotocolsettings(frame)
-
- elif frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
- self._state = b'idle'
- return self._onframeidle(frame)
-
- else:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'expected sender protocol settings or command request '
- b'frame; got %d'
- )
- % frame.typeid
- )
-
- def _onframeprotocolsettings(self, frame):
- assert self._state == b'protocol-settings-receiving'
- assert self._protocolsettingsdecoder is not None
-
- if frame.typeid != FRAME_TYPE_SENDER_PROTOCOL_SETTINGS:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'expected sender protocol settings frame; got %d')
- % frame.typeid
- )
-
- more = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_CONTINUATION
- eos = frame.flags & FLAG_SENDER_PROTOCOL_SETTINGS_EOS
-
- if more and eos:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'sender protocol settings frame cannot have both '
- b'continuation and end of stream flags set'
- )
- )
-
- if not more and not eos:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'sender protocol settings frame must have continuation or '
- b'end of stream flag set'
- )
- )
-
- # TODO establish limits for maximum amount of data that can be
- # buffered.
- try:
- self._protocolsettingsdecoder.decode(frame.payload)
- except Exception as e:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'error decoding CBOR from sender protocol settings frame: %s'
- )
- % stringutil.forcebytestr(e)
- )
-
- if more:
- return self._makewantframeresult()
-
- assert eos
-
- decoded = self._protocolsettingsdecoder.getavailable()
- self._protocolsettingsdecoder = None
-
- if not decoded:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'sender protocol settings frame did not contain CBOR data')
- )
- elif len(decoded) > 1:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'sender protocol settings frame contained multiple CBOR '
- b'values'
- )
- )
-
- d = decoded[0]
-
- if b'contentencodings' in d:
- self._sendersettings[b'contentencodings'] = d[b'contentencodings']
-
- self._state = b'idle'
-
- return self._makewantframeresult()
-
- def _onframeidle(self, frame):
- # The only frame type that should be received in this state is a
- # command request.
- if frame.typeid != FRAME_TYPE_COMMAND_REQUEST:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'expected command request frame; got %d') % frame.typeid
- )
-
- res = self._validatecommandrequestframe(frame)
- if res:
- return res
-
- if frame.requestid in self._receivingcommands:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'request with ID %d already received') % frame.requestid
- )
-
- if frame.requestid in self._activecommands:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'request with ID %d is already active') % frame.requestid
- )
-
- new = frame.flags & FLAG_COMMAND_REQUEST_NEW
- moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
- expectingdata = frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA
-
- if not new:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'received command request frame without new flag set')
- )
-
- payload = util.bytesio()
- payload.write(frame.payload)
-
- self._receivingcommands[frame.requestid] = {
- b'payload': payload,
- b'data': None,
- b'requestdone': not moreframes,
- b'expectingdata': bool(expectingdata),
- }
-
- # This is the final frame for this request. Dispatch it.
- if not moreframes and not expectingdata:
- return self._makeruncommandresult(frame.requestid)
-
- assert moreframes or expectingdata
- self._state = b'command-receiving'
- return self._makewantframeresult()
-
- def _onframecommandreceiving(self, frame):
- if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
- # Process new command requests as such.
- if frame.flags & FLAG_COMMAND_REQUEST_NEW:
- return self._onframeidle(frame)
-
- res = self._validatecommandrequestframe(frame)
- if res:
- return res
-
- # All other frames should be related to a command that is currently
- # receiving but is not active.
- if frame.requestid in self._activecommands:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'received frame for request that is still active: %d')
- % frame.requestid
- )
-
- if frame.requestid not in self._receivingcommands:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'received frame for request that is not receiving: %d')
- % frame.requestid
- )
-
- entry = self._receivingcommands[frame.requestid]
-
- if frame.typeid == FRAME_TYPE_COMMAND_REQUEST:
- moreframes = frame.flags & FLAG_COMMAND_REQUEST_MORE_FRAMES
- expectingdata = bool(frame.flags & FLAG_COMMAND_REQUEST_EXPECT_DATA)
-
- if entry[b'requestdone']:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'received command request frame when request frames '
- b'were supposedly done'
- )
- )
-
- if expectingdata != entry[b'expectingdata']:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'mismatch between expect data flag and previous frame')
- )
-
- entry[b'payload'].write(frame.payload)
-
- if not moreframes:
- entry[b'requestdone'] = True
-
- if not moreframes and not expectingdata:
- return self._makeruncommandresult(frame.requestid)
-
- return self._makewantframeresult()
-
- elif frame.typeid == FRAME_TYPE_COMMAND_DATA:
- if not entry[b'expectingdata']:
- self._state = b'errored'
- return self._makeerrorresult(
- _(
- b'received command data frame for request that is not '
- b'expecting data: %d'
- )
- % frame.requestid
- )
-
- if entry[b'data'] is None:
- entry[b'data'] = util.bytesio()
-
- return self._handlecommanddataframe(frame, entry)
- else:
- self._state = b'errored'
- return self._makeerrorresult(
- _(b'received unexpected frame type: %d') % frame.typeid
- )
-
- def _handlecommanddataframe(self, frame, entry):
- assert frame.typeid == FRAME_TYPE_COMMAND_DATA
-
- # TODO support streaming data instead of buffering it.
- entry[b'data'].write(frame.payload)
-
- if frame.flags & FLAG_COMMAND_DATA_CONTINUATION:
- return self._makewantframeresult()
- elif frame.flags & FLAG_COMMAND_DATA_EOS:
- entry[b'data'].seek(0)
- return self._makeruncommandresult(frame.requestid)
- else:
- self._state = b'errored'
- return self._makeerrorresult(_(b'command data frame without flags'))
-
- def _onframeerrored(self, frame):
- return self._makeerrorresult(_(b'server already errored'))
-
-
-class commandrequest:
- """Represents a request to run a command."""
-
- def __init__(self, requestid, name, args, datafh=None, redirect=None):
- self.requestid = requestid
- self.name = name
- self.args = args
- self.datafh = datafh
- self.redirect = redirect
- self.state = b'pending'
-
-
-class clientreactor:
- """Holds state of a client issuing frame-based protocol requests.
-
- This is like ``serverreactor`` but for client-side state.
-
- Each instance is bound to the lifetime of a connection. For persistent
- connection transports using e.g. TCP sockets and speaking the raw
- framing protocol, there will be a single instance for the lifetime of
- the TCP socket. For transports where there are multiple discrete
- interactions (say tunneled within in HTTP request), there will be a
- separate instance for each distinct interaction.
-
- Consumers are expected to tell instances when events occur by calling
- various methods. These methods return a 2-tuple describing any follow-up
- action(s) to take. The first element is the name of an action to
- perform. The second is a data structure (usually a dict) specific to
- that action that contains more information. e.g. if the reactor wants
- to send frames to the server, the data structure will contain a reference
- to those frames.
-
- Valid actions that consumers can be instructed to take are:
-
- noop
- Indicates no additional action is required.
-
- sendframes
- Indicates that frames should be sent to the server. The ``framegen``
- key contains a generator of frames that should be sent. The reactor
- assumes that all frames in this generator are sent to the server.
-
- error
- Indicates that an error occurred. The ``message`` key contains an
- error message describing the failure.
-
- responsedata
- Indicates a response to a previously-issued command was received.
-
- The ``request`` key contains the ``commandrequest`` instance that
- represents the request this data is for.
-
- The ``data`` key contains the decoded data from the server.
-
- ``expectmore`` and ``eos`` evaluate to True when more response data
- is expected to follow or we're at the end of the response stream,
- respectively.
- """
-
- def __init__(
- self,
- ui,
- hasmultiplesend=False,
- buffersends=True,
- clientcontentencoders=None,
- ):
- """Create a new instance.
-
- ``hasmultiplesend`` indicates whether multiple sends are supported
- by the transport. When True, it is possible to send commands immediately
- instead of buffering until the caller signals an intent to finish a
- send operation.
-
- ``buffercommands`` indicates whether sends should be buffered until the
- last request has been issued.
-
- ``clientcontentencoders`` is an iterable of content encoders the client
- will advertise to the server and that the server can use for encoding
- data. If not defined, the client will not advertise content encoders
- to the server.
- """
- self._ui = ui
- self._hasmultiplesend = hasmultiplesend
- self._buffersends = buffersends
- self._clientcontentencoders = clientcontentencoders
-
- self._canissuecommands = True
- self._cansend = True
- self._protocolsettingssent = False
-
- self._nextrequestid = 1
- # We only support a single outgoing stream for now.
- self._outgoingstream = outputstream(1)
- self._pendingrequests = collections.deque()
- self._activerequests = {}
- self._incomingstreams = {}
- self._streamsettingsdecoders = {}
-
- populatestreamencoders()
-
- def callcommand(self, name, args, datafh=None, redirect=None):
- """Request that a command be executed.
-
- Receives the command name, a dict of arguments to pass to the command,
- and an optional file object containing the raw data for the command.
-
- Returns a 3-tuple of (request, action, action data).
- """
- if not self._canissuecommands:
- raise error.ProgrammingError(b'cannot issue new commands')
-
- requestid = self._nextrequestid
- self._nextrequestid += 2
-
- request = commandrequest(
- requestid, name, args, datafh=datafh, redirect=redirect
- )
-
- if self._buffersends:
- self._pendingrequests.append(request)
- return request, b'noop', {}
- else:
- if not self._cansend:
- raise error.ProgrammingError(
- b'sends cannot be performed on this instance'
- )
-
- if not self._hasmultiplesend:
- self._cansend = False
- self._canissuecommands = False
-
- return (
- request,
- b'sendframes',
- {
- b'framegen': self._makecommandframes(request),
- },
- )
-
- def flushcommands(self):
- """Request that all queued commands be sent.
-
- If any commands are buffered, this will instruct the caller to send
- them over the wire. If no commands are buffered it instructs the client
- to no-op.
-
- If instances aren't configured for multiple sends, no new command
- requests are allowed after this is called.
- """
- if not self._pendingrequests:
- return b'noop', {}
-
- if not self._cansend:
- raise error.ProgrammingError(
- b'sends cannot be performed on this instance'
- )
-
- # If the instance only allows sending once, mark that we have fired
- # our one shot.
- if not self._hasmultiplesend:
- self._canissuecommands = False
- self._cansend = False
-
- def makeframes():
- while self._pendingrequests:
- request = self._pendingrequests.popleft()
- for frame in self._makecommandframes(request):
- yield frame
-
- return b'sendframes', {
- b'framegen': makeframes(),
- }
-
- def _makecommandframes(self, request):
- """Emit frames to issue a command request.
-
- As a side-effect, update request accounting to reflect its changed
- state.
- """
- self._activerequests[request.requestid] = request
- request.state = b'sending'
-
- if not self._protocolsettingssent and self._clientcontentencoders:
- self._protocolsettingssent = True
-
- payload = b''.join(
- cborutil.streamencode(
- {
- b'contentencodings': self._clientcontentencoders,
- }
- )
- )
-
- yield self._outgoingstream.makeframe(
- requestid=request.requestid,
- typeid=FRAME_TYPE_SENDER_PROTOCOL_SETTINGS,
- flags=FLAG_SENDER_PROTOCOL_SETTINGS_EOS,
- payload=payload,
- )
-
- res = createcommandframes(
- self._outgoingstream,
- request.requestid,
- request.name,
- request.args,
- datafh=request.datafh,
- redirect=request.redirect,
- )
-
- for frame in res:
- yield frame
-
- request.state = b'sent'
-
- def onframerecv(self, frame):
- """Process a frame that has been received off the wire.
-
- Returns a 2-tuple of (action, meta) describing further action the
- caller needs to take as a result of receiving this frame.
- """
- if frame.streamid % 2:
- return (
- b'error',
- {
- b'message': (
- _(b'received frame with odd numbered stream ID: %d')
- % frame.streamid
- ),
- },
- )
-
- if frame.streamid not in self._incomingstreams:
- if not frame.streamflags & STREAM_FLAG_BEGIN_STREAM:
- return (
- b'error',
- {
- b'message': _(
- b'received frame on unknown stream '
- b'without beginning of stream flag set'
- ),
- },
- )
-
- self._incomingstreams[frame.streamid] = inputstream(frame.streamid)
-
- stream = self._incomingstreams[frame.streamid]
-
- # If the payload is encoded, ask the stream to decode it. We
- # merely substitute the decoded result into the frame payload as
- # if it had been transferred all along.
- if frame.streamflags & STREAM_FLAG_ENCODING_APPLIED:
- frame.payload = stream.decode(frame.payload)
-
- if frame.streamflags & STREAM_FLAG_END_STREAM:
- del self._incomingstreams[frame.streamid]
-
- if frame.typeid == FRAME_TYPE_STREAM_SETTINGS:
- return self._onstreamsettingsframe(frame)
-
- if frame.requestid not in self._activerequests:
- return (
- b'error',
- {
- b'message': (
- _(b'received frame for inactive request ID: %d')
- % frame.requestid
- ),
- },
- )
-
- request = self._activerequests[frame.requestid]
- request.state = b'receiving'
-
- handlers = {
- FRAME_TYPE_COMMAND_RESPONSE: self._oncommandresponseframe,
- FRAME_TYPE_ERROR_RESPONSE: self._onerrorresponseframe,
- }
-
- meth = handlers.get(frame.typeid)
- if not meth:
- raise error.ProgrammingError(
- b'unhandled frame type: %d' % frame.typeid
- )
-
- return meth(request, frame)
-
- def _onstreamsettingsframe(self, frame):
- assert frame.typeid == FRAME_TYPE_STREAM_SETTINGS
-
- more = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_CONTINUATION
- eos = frame.flags & FLAG_STREAM_ENCODING_SETTINGS_EOS
-
- if more and eos:
- return (
- b'error',
- {
- b'message': (
- _(
- b'stream encoding settings frame cannot have both '
- b'continuation and end of stream flags set'
- )
- ),
- },
- )
-
- if not more and not eos:
- return (
- b'error',
- {
- b'message': _(
- b'stream encoding settings frame must have '
- b'continuation or end of stream flag set'
- ),
- },
- )
-
- if frame.streamid not in self._streamsettingsdecoders:
- decoder = cborutil.bufferingdecoder()
- self._streamsettingsdecoders[frame.streamid] = decoder
-
- decoder = self._streamsettingsdecoders[frame.streamid]
-
- try:
- decoder.decode(frame.payload)
- except Exception as e:
- return (
- b'error',
- {
- b'message': (
- _(
- b'error decoding CBOR from stream encoding '
- b'settings frame: %s'
- )
- % stringutil.forcebytestr(e)
- ),
- },
- )
-
- if more:
- return b'noop', {}
-
- assert eos
-
- decoded = decoder.getavailable()
- del self._streamsettingsdecoders[frame.streamid]
-
- if not decoded:
- return (
- b'error',
- {
- b'message': _(
- b'stream encoding settings frame did not contain '
- b'CBOR data'
- ),
- },
- )
-
- try:
- self._incomingstreams[frame.streamid].setdecoder(
- self._ui, decoded[0], decoded[1:]
- )
- except Exception as e:
- return (
- b'error',
- {
- b'message': (
- _(b'error setting stream decoder: %s')
- % stringutil.forcebytestr(e)
- ),
- },
- )
-
- return b'noop', {}
-
- def _oncommandresponseframe(self, request, frame):
- if frame.flags & FLAG_COMMAND_RESPONSE_EOS:
- request.state = b'received'
- del self._activerequests[request.requestid]
-
- return (
- b'responsedata',
- {
- b'request': request,
- b'expectmore': frame.flags & FLAG_COMMAND_RESPONSE_CONTINUATION,
- b'eos': frame.flags & FLAG_COMMAND_RESPONSE_EOS,
- b'data': frame.payload,
- },
- )
-
- def _onerrorresponseframe(self, request, frame):
- request.state = b'errored'
- del self._activerequests[request.requestid]
-
- # The payload should be a CBOR map.
- m = cborutil.decodeall(frame.payload)[0]
-
- return (
- b'error',
- {
- b'request': request,
- b'type': m[b'type'],
- b'message': m[b'message'],
- },
- )
diff --git a/mercurial/debugcommands.py b/mercurial/debugcommands.py
--- a/mercurial/debugcommands.py
+++ b/mercurial/debugcommands.py
@@ -88,7 +88,6 @@
url as urlmod,
util,
vfs as vfsmod,
- wireprotoframing,
wireprotoserver,
)
from .interfaces import repository
@@ -4455,12 +4454,6 @@
The content of the file defined as the value to this argument will be
transferred verbatim as the HTTP request body.
- ``frame <type> <flags> <payload>``
- Send a unified protocol frame as part of the request body.
-
- All frames will be collected and sent as the body to the HTTP
- request.
-
close
-----
@@ -4500,34 +4493,6 @@
---------
``read()`` N bytes from the server's stderr pipe, if available.
-
- Specifying Unified Frame-Based Protocol Frames
- ----------------------------------------------
-
- It is possible to emit a *Unified Frame-Based Protocol* by using special
- syntax.
-
- A frame is composed as a type, flags, and payload. These can be parsed
- from a string of the form:
-
- <request-id> <stream-id> <stream-flags> <type> <flags> <payload>
-
- ``request-id`` and ``stream-id`` are integers defining the request and
- stream identifiers.
-
- ``type`` can be an integer value for the frame type or the string name
- of the type. The strings are defined in ``wireprotoframing.py``. e.g.
- ``command-name``.
-
- ``stream-flags`` and ``flags`` are a ``|`` delimited list of flag
- components. Each component (and there can be just one) can be an integer
- or a flag name for stream flags or frame flags, respectively. Values are
- resolved to integers and then bitwise OR'd together.
-
- ``payload`` represents the raw frame payload. If it begins with
- ``cbor:``, the following string is evaluated as Python code and the
- resulting object is fed into a CBOR encoder. Otherwise it is interpreted
- as a Python byte string literal.
"""
opts = pycompat.byteskwargs(opts)
@@ -4785,7 +4750,7 @@
method, httppath = request[1:]
headers = {}
body = None
- frames = []
+
for line in lines:
line = line.lstrip()
m = re.match(b'^([a-zA-Z0-9_-]+): (.*)$', line)
@@ -4799,12 +4764,6 @@
if line.startswith(b'BODYFILE '):
with open(line.split(b' ', 1), b'rb') as fh:
body = fh.read()
- elif line.startswith(b'frame '):
- frame = wireprotoframing.makeframefromhumanstring(
- line[len(b'frame ') :]
- )
-
- frames.append(frame)
else:
raise error.Abort(
_(b'unknown argument to httprequest: %s') % line
@@ -4812,9 +4771,6 @@
url = path + httppath
- if frames:
- body = b''.join(bytes(f) for f in frames)
-
req = urlmod.urlreq.request(pycompat.strurl(url), body, headers)
# urllib.Request insists on using has_data() as a proxy for
To: indygreg, #hg-reviewers
Cc: mercurial-patches, mercurial-devel
More information about the Mercurial-devel
mailing list