[PATCH 2 of 2] upgraderepo: add a config option for parallel computation

Pierre-Yves David pierre-yves.david at ens-lyon.org
Fri Nov 29 15:51:12 UTC 2019

# HG changeset patch
# User Pierre-Yves David <pierre-yves.david at octobus.net>
# Date 1569765632 -7200
#      Sun Sep 29 16:00:32 2019 +0200
# Node ID 85b8caa1a166c9a31a75c9b515a5ae3930db5acf
# Parent  55c69c19cb183f2c315cda4cacfc2d40741cf6e5
# EXP-Topic sidedata-copies-perf
# Available At https://dev.heptapod.net/octobus/mercurial-devel/
#              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 85b8caa1a166
upgraderepo: add a config option for parallel computation

The option is put to use to compute new copy tracing side data in parallel. It
use the multiprocessing module as it had the appropriate primitive for what we
needed. Gregory Szorc had concerned on windows so we disabled it there.

See inline comment for details on the parallel implementation.

diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -706,6 +706,9 @@ coreconfigitem(
     b'experimental', b'worker.wdir-get-thread-safe', default=False,
+    b'experimental', b'worker.repository-upgrade', default=False,
     b'experimental', b'xdiff', default=False,
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -8,6 +8,7 @@
 from __future__ import absolute_import
 import collections
+import multiprocessing
 import os
 from .i18n import _
@@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev):
 def getsidedataadder(srcrepo, destrepo):
+    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
+    if pycompat.iswindows or not use_w:
+        return _get_simple_sidedata_adder(srcrepo, destrepo)
+    else:
+        return _get_worker_sidedata_adder(srcrepo, destrepo)
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+    """The function used by worker precomputing sidedata
+    It read an input queue containing revision numbers
+    It write in an output queue containing (rev, <sidedata-map>)
+    The `None` input value is used as a stop signal.
+    The `tokens` semaphore is user to avoid having too many unprocessed
+    entries. The workers needs to acquire one token before fetching a task.
+    They will be released by the consumer of the produced data.
+    """
+    tokens.acquire()
+    rev = revs_queue.get()
+    while rev is not None:
+        data = _getsidedata(srcrepo, rev)
+        sidedata_queue.put((rev, data))
+        tokens.acquire()
+        rev = revs_queue.get()
+    # processing of `None` is completed, release the token.
+    tokens.release()
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+    """The parallel version of the sidedata computation
+    This code spawn a pool of worker that precompute a buffer of sidedata
+    before we actually need them"""
+    # avoid circular import copies -> scmutil -> worker -> copies
+    from . import worker
+    nbworkers = worker._numworkers(srcrepo.ui)
+    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+    revsq = multiprocessing.Queue()
+    sidedataq = multiprocessing.Queue()
+    assert srcrepo.filtername is None
+    # queue all tasks beforehand, revision numbers are small and it make
+    # synchronisation simpler
+    #
+    # Since the computation for each node can be quite expensive, the overhead
+    # of using a single queue is not revelant. In practice, most computation
+    # are fast but some are very expensive and dominate all the other smaller
+    # cost.
+    for r in srcrepo.changelog.revs():
+        revsq.put(r)
+    # queue the "no more tasks" markers
+    for i in range(nbworkers):
+        revsq.put(None)
+    allworkers = []
+    for i in range(nbworkers):
+        args = (srcrepo, revsq, sidedataq, tokens)
+        w = multiprocessing.Process(target=_sidedata_worker, args=args)
+        allworkers.append(w)
+        w.start()
+    # dictionnary to store results for revision higher than we one we are
+    # looking for. For example, if we need the sidedatamap for 42, and 43 is
+    # received, when shelve 43 for later use.
+    staging = {}
+    def sidedata_companion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
+            # Is the data previously shelved ?
+            sidedata = staging.pop(rev, None)
+            if sidedata is None:
+                # look at the queued result until we find the one we are lookig
+                # for (shelve the other ones)
+                r, sidedata = sidedataq.get()
+                while r != rev:
+                    staging[r] = sidedata
+                    r, sidedata = sidedataq.get()
+            tokens.release()
+        return False, (), sidedata
+    return sidedata_companion
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+    """The simple version of the sidedata computation
+    It just compute it in the same thread on request"""
     def sidedatacompanion(revlog, rev):
         sidedata = {}
         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog

More information about the Mercurial-devel mailing list