[Updated] D8445: rust-chg: reimplement run_command operation as async function
yuja (Yuya Nishihara)
phabricator at mercurial-scm.org
Thu Apr 23 18:03:19 UTC 2020
Herald added a subscriber: mercurial-patches.
Closed by commit rHG94cace4b80ea: rust-chg: reimplement run_command operation as async function (authored by yuja).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D8445?vs=21122&id=21189
CHANGES SINCE LAST ACTION
https://phab.mercurial-scm.org/D8445/new/
REVISION DETAIL
https://phab.mercurial-scm.org/D8445
AFFECTED FILES
rust/chg/src/lib.rs
rust/chg/src/runcommand.rs
CHANGE DETAILS
diff --git a/rust/chg/src/runcommand.rs b/rust/chg/src/runcommand.rs
--- a/rust/chg/src/runcommand.rs
+++ b/rust/chg/src/runcommand.rs
@@ -6,164 +6,56 @@
//! Functions to run Mercurial command in cHg-aware command server.
use bytes::Bytes;
-use futures::future::IntoFuture;
-use futures::{Async, Future, Poll};
use std::io;
-use std::mem;
use std::os::unix::io::AsRawFd;
use tokio_hglib::codec::ChannelMessage;
-use tokio_hglib::protocol::MessageLoop;
-use tokio_hglib::{Client, Connection};
+use tokio_hglib::{Connection, Protocol};
-use crate::attachio::AttachIo;
+use crate::attachio;
use crate::message::{self, CommandType};
use crate::uihandler::SystemHandler;
-enum AsyncS<R, S> {
- Ready(R),
- NotReady(S),
- PollAgain(S),
-}
-
-enum CommandState<C, H>
-where
- C: Connection,
- H: SystemHandler,
-{
- Running(MessageLoop<C>, H),
- SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
- AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
- WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
- Finished,
-}
-
-type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
-
-/// Future resolves to `(exit_code, client)`.
-#[must_use = "futures do nothing unless polled"]
-pub struct ChgRunCommand<C, H>
-where
- C: Connection,
- H: SystemHandler,
-{
- state: CommandState<C, H>,
-}
-
-impl<C, H> ChgRunCommand<C, H>
-where
- C: Connection + AsRawFd,
- H: SystemHandler,
-{
- pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
- let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
- ChgRunCommand {
- state: CommandState::Running(msg_loop, handler),
- }
- }
-}
-
-impl<C, H> Future for ChgRunCommand<C, H>
-where
- C: Connection + AsRawFd,
- H: SystemHandler,
-{
- type Item = (Client<C>, H, i32);
- type Error = io::Error;
-
- fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
- loop {
- let state = mem::replace(&mut self.state, CommandState::Finished);
- match state.poll()? {
- AsyncS::Ready((client, handler, code)) => {
- return Ok(Async::Ready((client, handler, code)));
- }
- AsyncS::NotReady(newstate) => {
- self.state = newstate;
- return Ok(Async::NotReady);
- }
- AsyncS::PollAgain(newstate) => {
- self.state = newstate;
- }
- }
- }
- }
-}
-
-impl<C, H> CommandState<C, H>
-where
- C: Connection + AsRawFd,
- H: SystemHandler,
-{
- fn poll(self) -> CommandPoll<C, H> {
- match self {
- CommandState::Running(mut msg_loop, handler) => {
- if let Async::Ready((client, msg)) = msg_loop.poll()? {
- process_message(client, handler, msg)
- } else {
- Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
- }
- }
- CommandState::SpawningPager(client, mut fut) => {
- if let Async::Ready((handler, pin)) = fut.poll()? {
- let fut = AttachIo::with_client(client, io::stdin(), pin, None);
- Ok(AsyncS::PollAgain(CommandState::AttachingPager(
- fut, handler,
- )))
- } else {
- Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
- }
- }
- CommandState::AttachingPager(mut fut, handler) => {
- if let Async::Ready(client) = fut.poll()? {
- let msg_loop = MessageLoop::start(client, b""); // terminator
- Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
- } else {
- Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
- }
- }
- CommandState::WaitingSystem(client, mut fut) => {
- if let Async::Ready((handler, code)) = fut.poll()? {
- let data = message::pack_result_code(code);
- let msg_loop = MessageLoop::resume_with_data(client, data);
- Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
- } else {
- Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
- }
- }
- CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
- }
- }
-}
-
-fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
-where
- C: Connection,
- H: SystemHandler,
-{
- {
- match msg {
+/// Runs the given Mercurial command in cHg-aware command server, and
+/// fetches the result code.
+///
+/// This is a subset of tokio-hglib's `run_command()` with the additional
+/// SystemRequest support.
+pub async fn run_command(
+ proto: &mut Protocol<impl Connection + AsRawFd>,
+ handler: &mut impl SystemHandler,
+ packed_args: impl Into<Bytes>,
+) -> io::Result<i32> {
+ proto
+ .send_command_with_args("runcommand", packed_args)
+ .await?;
+ loop {
+ match proto.fetch_response().await? {
ChannelMessage::Data(b'r', data) => {
- let code = message::parse_result_code(data)?;
- Ok(AsyncS::Ready((client, handler, code)))
+ return message::parse_result_code(data);
}
ChannelMessage::Data(..) => {
// just ignores data sent to optional channel
- let msg_loop = MessageLoop::resume(client);
- Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
}
- ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
- io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
- ),
+ ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidData,
+ "unsupported request",
+ ));
+ }
ChannelMessage::SystemRequest(data) => {
let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
match cmd_type {
CommandType::Pager => {
- let fut = handler.spawn_pager(cmd_spec).into_future();
- Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
+ // server spins new command loop while pager request is
+ // in progress, which can be terminated by "" command.
+ let pin = handler.spawn_pager(&cmd_spec).await?;
+ attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?;
+ proto.send_command("").await?; // terminator
}
CommandType::System => {
- let fut = handler.run_system(cmd_spec).into_future();
- Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
+ let code = handler.run_system(&cmd_spec).await?;
+ let data = message::pack_result_code(code);
+ proto.send_data(data).await?;
}
}
}
diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
--- a/rust/chg/src/lib.rs
+++ b/rust/chg/src/lib.rs
@@ -8,7 +8,7 @@
//pub mod locator;
pub mod message;
pub mod procutil;
-//mod runcommand;
+mod runcommand;
mod uihandler;
//pub use clientext::ChgClientExt;
To: yuja, #hg-reviewers, Alphare
Cc: mercurial-patches, mercurial-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mercurial-scm.org/pipermail/mercurial-patches/attachments/20200423/9066ba3f/attachment-0001.html>
More information about the Mercurial-patches
mailing list