[Updated] D8447: rust-chg: reimplement locator by using async/await and tokio-0.2
yuja (Yuya Nishihara)
phabricator at mercurial-scm.org
Thu Apr 23 18:03:51 UTC 2020
Herald added a subscriber: mercurial-patches.
Closed by commit rHGa347a329e48d: rust-chg: reimplement locator by using async/await and tokio-0.2 (authored by yuja).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D8447?vs=21124&id=21191
CHANGES SINCE LAST ACTION
https://phab.mercurial-scm.org/D8447/new/
REVISION DETAIL
https://phab.mercurial-scm.org/D8447
AFFECTED FILES
rust/chg/src/lib.rs
rust/chg/src/locator.rs
CHANGE DETAILS
diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs
--- a/rust/chg/src/locator.rs
+++ b/rust/chg/src/locator.rs
@@ -5,7 +5,6 @@
//! Utility for locating command-server process.
-use futures::future::{self, Either, Loop};
use log::debug;
use std::env;
use std::ffi::{OsStr, OsString};
@@ -14,14 +13,11 @@
use std::os::unix::ffi::{OsStrExt, OsStringExt};
use std::os::unix::fs::{DirBuilderExt, MetadataExt};
use std::path::{Path, PathBuf};
-use std::process;
-use std::time::Duration;
-use tokio::prelude::*;
-use tokio::process::{Child, Command};
+use std::process::{self, Child, Command};
+use std::time::{Duration, Instant};
use tokio::time;
-use tokio_hglib::UnixClient;
-use crate::clientext::ChgClientExt;
+use crate::clientext::ChgClient;
use crate::message::{Instruction, ServerSpec};
use crate::procutil;
@@ -82,21 +78,19 @@
/// Connects to the server.
///
/// The server process will be spawned if not running.
- pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
- future::loop_fn((self, 0), |(loc, cnt)| {
- if cnt < 10 {
- let fut = loc
- .try_connect()
- .and_then(|(loc, client)| {
- client
- .validate(&loc.hg_early_args)
- .map(|(client, instructions)| (loc, client, instructions))
- })
- .and_then(move |(loc, client, instructions)| {
- loc.run_instructions(client, instructions, cnt)
- });
- Either::A(fut)
- } else {
+ pub async fn connect(&mut self) -> io::Result<ChgClient> {
+ for _cnt in 0..10 {
+ let mut client = self.try_connect().await?;
+ let instructions = client.validate(&self.hg_early_args).await?;
+ let reconnect = self.run_instructions(&instructions)?;
+ if !reconnect {
+ return Ok(client);
+ }
+ }
+
+ // TODO: unindent
+ {
+ {
let msg = format!(
concat!(
"too many redirections.\n",
@@ -105,20 +99,17 @@
"before executing hg. If you have to use a ",
"wrapper, wrap chg instead of hg.",
),
- loc.hg_command
+ self.hg_command
);
- Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
+ Err(io::Error::new(io::ErrorKind::Other, msg))
}
- })
+ }
}
/// Runs instructions received from the server.
- fn run_instructions(
- mut self,
- client: UnixClient,
- instructions: Vec<Instruction>,
- cnt: usize,
- ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
+ ///
+ /// Returns true if the client should try connecting to the other server.
+ fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> {
let mut reconnect = false;
for inst in instructions {
debug!("instruction: {:?}", inst);
@@ -126,7 +117,7 @@
Instruction::Exit(_) => {
// Just returns the current connection to run the
// unparsable command and report the error
- return Ok(Loop::Break((self, client)));
+ return Ok(false);
}
Instruction::Reconnect => {
reconnect = true;
@@ -139,7 +130,7 @@
);
return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
}
- self.redirect_sock_path = Some(path);
+ self.redirect_sock_path = Some(path.to_owned());
reconnect = true;
}
Instruction::Unlink(path) => {
@@ -155,25 +146,22 @@
}
}
- if reconnect {
- Ok(Loop::Continue((self, cnt + 1)))
- } else {
- Ok(Loop::Break((self, client)))
- }
+ Ok(reconnect)
}
/// Tries to connect to the existing server, or spawns new if not running.
- fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+ async fn try_connect(&mut self) -> io::Result<ChgClient> {
let sock_path = self
.redirect_sock_path
.as_ref()
.unwrap_or(&self.base_sock_path)
.clone();
debug!("try connect to {}", sock_path.display());
- UnixClient::connect(sock_path)
- .then(|res| {
- match res {
- Ok(client) => Either::A(future::ok((self, client))),
+ // TODO: unindent
+ {
+ {
+ let mut client = match ChgClient::connect(sock_path).await {
+ Ok(client) => client,
Err(_) => {
// Prevent us from being re-connected to the outdated
// master server: We were told by the server to redirect
@@ -184,35 +172,23 @@
fs::remove_file(&self.base_sock_path).unwrap_or(());
// may race
}
- Either::B(self.spawn_connect())
+ self.spawn_connect().await?
}
- }
- })
- .and_then(|(loc, client)| {
+ };
check_server_capabilities(client.server_spec())?;
- Ok((loc, client))
- })
- .and_then(|(loc, client)| {
// It's purely optional, and the server might not support this command.
if client.server_spec().capabilities.contains("setprocname") {
- let fut = client
- .set_process_name(format!("chg[worker/{}]", loc.process_id))
- .map(|client| (loc, client));
- Either::A(fut)
- } else {
- Either::B(future::ok((loc, client)))
+ client
+ .set_process_name(format!("chg[worker/{}]", self.process_id))
+ .await?;
}
- })
- .and_then(|(loc, client)| {
+ client.set_current_dir(&self.current_dir).await?;
client
- .set_current_dir(&loc.current_dir)
- .map(|client| (loc, client))
- })
- .and_then(|(loc, client)| {
- client
- .set_env_vars_os(loc.env_vars.iter().cloned())
- .map(|client| (loc, client))
- })
+ .set_env_vars_os(self.env_vars.iter().cloned())
+ .await?;
+ Ok(client)
+ }
+ }
}
/// Spawns new server process and connects to it.
@@ -220,10 +196,10 @@
/// The server will be spawned at the current working directory, then
/// chdir to "/", so that the server will load configs from the target
/// repository.
- fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+ async fn spawn_connect(&mut self) -> io::Result<ChgClient> {
let sock_path = self.temp_sock_path();
debug!("start cmdserver at {}", sock_path.display());
- Command::new(&self.hg_command)
+ let server = Command::new(&self.hg_command)
.arg("serve")
.arg("--cmdserver")
.arg("chgunix")
@@ -236,68 +212,54 @@
.env_clear()
.envs(self.env_vars.iter().cloned())
.env("CHGINTERNALMARK", "")
- .spawn()
- .into_future()
- .and_then(|server| self.connect_spawned(server, sock_path))
- .and_then(|(loc, client, sock_path)| {
+ .spawn()?;
+ let client = self.connect_spawned(server, &sock_path).await?;
+ // TODO: unindent
+ {
+ {
debug!(
"rename {} to {}",
sock_path.display(),
- loc.base_sock_path.display()
+ self.base_sock_path.display()
);
- fs::rename(&sock_path, &loc.base_sock_path)?;
- Ok((loc, client))
- })
+ fs::rename(&sock_path, &self.base_sock_path)?;
+ Ok(client)
+ }
+ }
}
/// Tries to connect to the just spawned server repeatedly until timeout
/// exceeded.
- fn connect_spawned(
- self,
- server: Child,
- sock_path: PathBuf,
- ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
+ async fn connect_spawned(
+ &mut self,
+ mut server: Child,
+ sock_path: &Path,
+ ) -> io::Result<ChgClient> {
debug!("try connect to {} repeatedly", sock_path.display());
- let connect = future::loop_fn(sock_path, |sock_path| {
- UnixClient::connect(sock_path.clone()).then(|res| {
- match res {
- Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
- Err(_) => {
- // try again with slight delay
- let fut = time::delay_for(Duration::from_millis(10))
- .map(|()| Loop::Continue(sock_path))
- .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
- Either::B(fut)
- }
- }
- })
- });
-
// waits for either connection established or server failed to start
- connect
- .select2(server)
- .map_err(|res| res.split().0)
- .timeout(self.timeout)
- .map_err(|err| {
- err.into_inner().unwrap_or_else(|| {
- io::Error::new(
- io::ErrorKind::TimedOut,
- "timed out while connecting to server",
- )
- })
- })
- .and_then(|res| {
- match res {
- Either::A(((client, sock_path), server)) => {
- server.forget(); // continue to run in background
- Ok((self, client, sock_path))
- }
- Either::B((st, _)) => Err(io::Error::new(
- io::ErrorKind::Other,
- format!("server exited too early: {}", st),
- )),
- }
- })
+ let start_time = Instant::now();
+ while start_time.elapsed() < self.timeout {
+ if let Ok(client) = ChgClient::connect(&sock_path).await {
+ // server handle is dropped here, but the detached process
+ // will continue running in background
+ return Ok(client);
+ }
+
+ if let Some(st) = server.try_wait()? {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ format!("server exited too early: {}", st),
+ ));
+ }
+
+ // try again with slight delay
+ time::delay_for(Duration::from_millis(10)).await;
+ }
+
+ Err(io::Error::new(
+ io::ErrorKind::TimedOut,
+ "timed out while connecting to server",
+ ))
}
}
diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
--- a/rust/chg/src/lib.rs
+++ b/rust/chg/src/lib.rs
@@ -5,7 +5,7 @@
mod attachio;
mod clientext;
-//pub mod locator;
+pub mod locator;
pub mod message;
pub mod procutil;
mod runcommand;
To: yuja, #hg-reviewers, Alphare
Cc: mercurial-patches, mercurial-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mercurial-scm.org/pipermail/mercurial-patches/attachments/20200423/1083d883/attachment-0001.html>
More information about the Mercurial-patches
mailing list