D3960: worker: use one pipe per posix worker and select() in parent process
hooper (Danny Hooper)
phabricator at mercurial-scm.org
Thu Jul 19 12:18:32 UTC 2018
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG9e6afe7fca31: worker: use one pipe per posix worker and select() in parent process (authored by hooper, committed by ).
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D3960?vs=9626&id=9628
REVISION DETAIL
https://phab.mercurial-scm.org/D3960
AFFECTED FILES
mercurial/worker.py
CHANGE DETAILS
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -14,6 +14,12 @@
import threading
import time
+try:
+ import selectors
+ selectors.BaseSelector
+except ImportError:
+ from .thirdparty import selectors2 as selectors
+
from .i18n import _
from . import (
encoding,
@@ -89,7 +95,6 @@
return func(*staticargs + (args,))
def _posixworker(ui, func, staticargs, args):
- rfd, wfd = os.pipe()
workers = _numworkers(ui)
oldhandler = signal.getsignal(signal.SIGINT)
signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@
oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
ui.flush()
parentpid = os.getpid()
+ pipes = []
for pargs in partition(args, workers):
+ # Every worker gets its own pipe to send results on, so we don't have to
+ # implement atomic writes larger than PIPE_BUF. Each forked process has
+ # its own pipe's descriptors in the local variables, and the parent
+ # process has the full list of pipe descriptors (and it doesn't really
+ # care what order they're in).
+ rfd, wfd = os.pipe()
+ pipes.append((rfd, wfd))
# make sure we use os._exit in all worker code paths. otherwise the
# worker may do some clean-ups which could cause surprises like
# deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@
signal.signal(signal.SIGCHLD, oldchldhandler)
def workerfunc():
+ for r, w in pipes[:-1]:
+ os.close(r)
+ os.close(w)
os.close(rfd)
for result in func(*(staticargs + (pargs,))):
os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@
finally:
os._exit(ret & 255)
pids.add(pid)
- os.close(wfd)
- fp = os.fdopen(rfd, r'rb', 0)
+ selector = selectors.DefaultSelector()
+ for rfd, wfd in pipes:
+ os.close(wfd)
+ selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
def cleanup():
signal.signal(signal.SIGINT, oldhandler)
waitforworkers()
@@ -187,15 +205,19 @@
os.kill(os.getpid(), -status)
sys.exit(status)
try:
- while True:
- try:
- yield util.pickle.load(fp)
- except EOFError:
- break
- except IOError as e:
- if e.errno == errno.EINTR:
- continue
- raise
+ openpipes = len(pipes)
+ while openpipes > 0:
+ for key, events in selector.select():
+ try:
+ yield util.pickle.load(key.fileobj)
+ except EOFError:
+ selector.unregister(key.fileobj)
+ key.fileobj.close()
+ openpipes -= 1
+ except IOError as e:
+ if e.errno == errno.EINTR:
+ continue
+ raise
except: # re-raises
killworkers()
cleanup()
To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel
More information about the Mercurial-devel
mailing list