[Updated] D8447: rust-chg: reimplement locator by using async/await and tokio-0.2

yuja (Yuya Nishihara) phabricator at mercurial-scm.org
Thu Apr 23 18:03:51 UTC 2020


Herald added a subscriber: mercurial-patches.
Closed by commit rHGa347a329e48d: rust-chg: reimplement locator by using async/await and tokio-0.2 (authored by yuja).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D8447?vs=21124&id=21191

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8447/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8447

AFFECTED FILES
  rust/chg/src/lib.rs
  rust/chg/src/locator.rs

CHANGE DETAILS

diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs
--- a/rust/chg/src/locator.rs
+++ b/rust/chg/src/locator.rs
@@ -5,7 +5,6 @@
 
 //! Utility for locating command-server process.
 
-use futures::future::{self, Either, Loop};
 use log::debug;
 use std::env;
 use std::ffi::{OsStr, OsString};
@@ -14,14 +13,11 @@
 use std::os::unix::ffi::{OsStrExt, OsStringExt};
 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
 use std::path::{Path, PathBuf};
-use std::process;
-use std::time::Duration;
-use tokio::prelude::*;
-use tokio::process::{Child, Command};
+use std::process::{self, Child, Command};
+use std::time::{Duration, Instant};
 use tokio::time;
-use tokio_hglib::UnixClient;
 
-use crate::clientext::ChgClientExt;
+use crate::clientext::ChgClient;
 use crate::message::{Instruction, ServerSpec};
 use crate::procutil;
 
@@ -82,21 +78,19 @@
     /// Connects to the server.
     ///
     /// The server process will be spawned if not running.
-    pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
-        future::loop_fn((self, 0), |(loc, cnt)| {
-            if cnt < 10 {
-                let fut = loc
-                    .try_connect()
-                    .and_then(|(loc, client)| {
-                        client
-                            .validate(&loc.hg_early_args)
-                            .map(|(client, instructions)| (loc, client, instructions))
-                    })
-                    .and_then(move |(loc, client, instructions)| {
-                        loc.run_instructions(client, instructions, cnt)
-                    });
-                Either::A(fut)
-            } else {
+    pub async fn connect(&mut self) -> io::Result<ChgClient> {
+        for _cnt in 0..10 {
+            let mut client = self.try_connect().await?;
+            let instructions = client.validate(&self.hg_early_args).await?;
+            let reconnect = self.run_instructions(&instructions)?;
+            if !reconnect {
+                return Ok(client);
+            }
+        }
+
+        // TODO: unindent
+        {
+            {
                 let msg = format!(
                     concat!(
                         "too many redirections.\n",
@@ -105,20 +99,17 @@
                         "before executing hg. If you have to use a ",
                         "wrapper, wrap chg instead of hg.",
                     ),
-                    loc.hg_command
+                    self.hg_command
                 );
-                Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
+                Err(io::Error::new(io::ErrorKind::Other, msg))
             }
-        })
+        }
     }
 
     /// Runs instructions received from the server.
-    fn run_instructions(
-        mut self,
-        client: UnixClient,
-        instructions: Vec<Instruction>,
-        cnt: usize,
-    ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
+    ///
+    /// Returns true if the client should try connecting to the other server.
+    fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> {
         let mut reconnect = false;
         for inst in instructions {
             debug!("instruction: {:?}", inst);
@@ -126,7 +117,7 @@
                 Instruction::Exit(_) => {
                     // Just returns the current connection to run the
                     // unparsable command and report the error
-                    return Ok(Loop::Break((self, client)));
+                    return Ok(false);
                 }
                 Instruction::Reconnect => {
                     reconnect = true;
@@ -139,7 +130,7 @@
                         );
                         return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
                     }
-                    self.redirect_sock_path = Some(path);
+                    self.redirect_sock_path = Some(path.to_owned());
                     reconnect = true;
                 }
                 Instruction::Unlink(path) => {
@@ -155,25 +146,22 @@
             }
         }
 
-        if reconnect {
-            Ok(Loop::Continue((self, cnt + 1)))
-        } else {
-            Ok(Loop::Break((self, client)))
-        }
+        Ok(reconnect)
     }
 
     /// Tries to connect to the existing server, or spawns new if not running.
-    fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+    async fn try_connect(&mut self) -> io::Result<ChgClient> {
         let sock_path = self
             .redirect_sock_path
             .as_ref()
             .unwrap_or(&self.base_sock_path)
             .clone();
         debug!("try connect to {}", sock_path.display());
-        UnixClient::connect(sock_path)
-            .then(|res| {
-                match res {
-                    Ok(client) => Either::A(future::ok((self, client))),
+        // TODO: unindent
+        {
+            {
+                let mut client = match ChgClient::connect(sock_path).await {
+                    Ok(client) => client,
                     Err(_) => {
                         // Prevent us from being re-connected to the outdated
                         // master server: We were told by the server to redirect
@@ -184,35 +172,23 @@
                             fs::remove_file(&self.base_sock_path).unwrap_or(());
                             // may race
                         }
-                        Either::B(self.spawn_connect())
+                        self.spawn_connect().await?
                     }
-                }
-            })
-            .and_then(|(loc, client)| {
+                };
                 check_server_capabilities(client.server_spec())?;
-                Ok((loc, client))
-            })
-            .and_then(|(loc, client)| {
                 // It's purely optional, and the server might not support this command.
                 if client.server_spec().capabilities.contains("setprocname") {
-                    let fut = client
-                        .set_process_name(format!("chg[worker/{}]", loc.process_id))
-                        .map(|client| (loc, client));
-                    Either::A(fut)
-                } else {
-                    Either::B(future::ok((loc, client)))
+                    client
+                        .set_process_name(format!("chg[worker/{}]", self.process_id))
+                        .await?;
                 }
-            })
-            .and_then(|(loc, client)| {
+                client.set_current_dir(&self.current_dir).await?;
                 client
-                    .set_current_dir(&loc.current_dir)
-                    .map(|client| (loc, client))
-            })
-            .and_then(|(loc, client)| {
-                client
-                    .set_env_vars_os(loc.env_vars.iter().cloned())
-                    .map(|client| (loc, client))
-            })
+                    .set_env_vars_os(self.env_vars.iter().cloned())
+                    .await?;
+                Ok(client)
+            }
+        }
     }
 
     /// Spawns new server process and connects to it.
@@ -220,10 +196,10 @@
     /// The server will be spawned at the current working directory, then
     /// chdir to "/", so that the server will load configs from the target
     /// repository.
-    fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+    async fn spawn_connect(&mut self) -> io::Result<ChgClient> {
         let sock_path = self.temp_sock_path();
         debug!("start cmdserver at {}", sock_path.display());
-        Command::new(&self.hg_command)
+        let server = Command::new(&self.hg_command)
             .arg("serve")
             .arg("--cmdserver")
             .arg("chgunix")
@@ -236,68 +212,54 @@
             .env_clear()
             .envs(self.env_vars.iter().cloned())
             .env("CHGINTERNALMARK", "")
-            .spawn()
-            .into_future()
-            .and_then(|server| self.connect_spawned(server, sock_path))
-            .and_then(|(loc, client, sock_path)| {
+            .spawn()?;
+        let client = self.connect_spawned(server, &sock_path).await?;
+        // TODO: unindent
+        {
+            {
                 debug!(
                     "rename {} to {}",
                     sock_path.display(),
-                    loc.base_sock_path.display()
+                    self.base_sock_path.display()
                 );
-                fs::rename(&sock_path, &loc.base_sock_path)?;
-                Ok((loc, client))
-            })
+                fs::rename(&sock_path, &self.base_sock_path)?;
+                Ok(client)
+            }
+        }
     }
 
     /// Tries to connect to the just spawned server repeatedly until timeout
     /// exceeded.
-    fn connect_spawned(
-        self,
-        server: Child,
-        sock_path: PathBuf,
-    ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
+    async fn connect_spawned(
+        &mut self,
+        mut server: Child,
+        sock_path: &Path,
+    ) -> io::Result<ChgClient> {
         debug!("try connect to {} repeatedly", sock_path.display());
-        let connect = future::loop_fn(sock_path, |sock_path| {
-            UnixClient::connect(sock_path.clone()).then(|res| {
-                match res {
-                    Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
-                    Err(_) => {
-                        // try again with slight delay
-                        let fut = time::delay_for(Duration::from_millis(10))
-                            .map(|()| Loop::Continue(sock_path))
-                            .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
-                        Either::B(fut)
-                    }
-                }
-            })
-        });
-
         // waits for either connection established or server failed to start
-        connect
-            .select2(server)
-            .map_err(|res| res.split().0)
-            .timeout(self.timeout)
-            .map_err(|err| {
-                err.into_inner().unwrap_or_else(|| {
-                    io::Error::new(
-                        io::ErrorKind::TimedOut,
-                        "timed out while connecting to server",
-                    )
-                })
-            })
-            .and_then(|res| {
-                match res {
-                    Either::A(((client, sock_path), server)) => {
-                        server.forget(); // continue to run in background
-                        Ok((self, client, sock_path))
-                    }
-                    Either::B((st, _)) => Err(io::Error::new(
-                        io::ErrorKind::Other,
-                        format!("server exited too early: {}", st),
-                    )),
-                }
-            })
+        let start_time = Instant::now();
+        while start_time.elapsed() < self.timeout {
+            if let Ok(client) = ChgClient::connect(&sock_path).await {
+                // server handle is dropped here, but the detached process
+                // will continue running in background
+                return Ok(client);
+            }
+
+            if let Some(st) = server.try_wait()? {
+                return Err(io::Error::new(
+                    io::ErrorKind::Other,
+                    format!("server exited too early: {}", st),
+                ));
+            }
+
+            // try again with slight delay
+            time::delay_for(Duration::from_millis(10)).await;
+        }
+
+        Err(io::Error::new(
+            io::ErrorKind::TimedOut,
+            "timed out while connecting to server",
+        ))
     }
 }
 
diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
--- a/rust/chg/src/lib.rs
+++ b/rust/chg/src/lib.rs
@@ -5,7 +5,7 @@
 
 mod attachio;
 mod clientext;
-//pub mod locator;
+pub mod locator;
 pub mod message;
 pub mod procutil;
 mod runcommand;



To: yuja, #hg-reviewers, Alphare
Cc: mercurial-patches, mercurial-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mercurial-scm.org/pipermail/mercurial-patches/attachments/20200423/1083d883/attachment-0001.html>


More information about the Mercurial-patches mailing list