[Request] [++--- ] D8587: metadata: move computation related to files touched in a dedicated module
marmoute (Pierre-Yves David)
phabricator at mercurial-scm.org
Wed May 27 13:17:41 UTC 2020
marmoute created this revision.
Herald added a reviewer: hg-reviewers.
Herald added a subscriber: mercurial-patches.
REVISION SUMMARY
This was suggested by Yuya Nishihara a while back. Since I am about to add more
metadata related computation, lets create a new repositories.
REPOSITORY
rHG Mercurial
BRANCH
default
REVISION DETAIL
https://phab.mercurial-scm.org/D8587
AFFECTED FILES
mercurial/changelog.py
mercurial/context.py
mercurial/copies.py
mercurial/metadata.py
mercurial/upgrade.py
CHANGE DETAILS
diff --git a/mercurial/upgrade.py b/mercurial/upgrade.py
--- a/mercurial/upgrade.py
+++ b/mercurial/upgrade.py
@@ -13,12 +13,12 @@
from .pycompat import getattr
from . import (
changelog,
- copies,
error,
filelog,
hg,
localrepo,
manifest,
+ metadata,
pycompat,
revlog,
scmutil,
@@ -734,9 +734,9 @@
return False, (), {}
elif localrepo.COPIESSDC_REQUIREMENT in addedreqs:
- sidedatacompanion = copies.getsidedataadder(srcrepo, dstrepo)
+ sidedatacompanion = metadata.getsidedataadder(srcrepo, dstrepo)
elif localrepo.COPIESSDC_REQUIREMENT in removedreqs:
- sidedatacompanion = copies.getsidedataremover(srcrepo, dstrepo)
+ sidedatacompanion = metadata.getsidedataremover(srcrepo, dstrepo)
return sidedatacompanion
diff --git a/mercurial/metadata.py b/mercurial/metadata.py
new file mode 100644
--- /dev/null
+++ b/mercurial/metadata.py
@@ -0,0 +1,268 @@
+# metadata.py -- code related to various metadata computation and access.
+#
+# Copyright 2019 Martin von Zweigbergk <martinvonz at google.com>
+# Copyright 2020 Pierre-Yves David <pierre-yves.david at octobus.net>
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+from __future__ import absolute_import, print_function
+
+import multiprocessing
+
+from . import (
+ error,
+ pycompat,
+ util,
+)
+
+from .revlogutils import (
+ flagutil as sidedataflag,
+ sidedata as sidedatamod,
+)
+
+
+def computechangesetfilesadded(ctx):
+ """return the list of files added in a changeset
+ """
+ added = []
+ for f in ctx.files():
+ if not any(f in p for p in ctx.parents()):
+ added.append(f)
+ return added
+
+
+def computechangesetfilesremoved(ctx):
+ """return the list of files removed in a changeset
+ """
+ removed = []
+ for f in ctx.files():
+ if f not in ctx:
+ removed.append(f)
+ return removed
+
+
+def computechangesetcopies(ctx):
+ """return the copies data for a changeset
+
+ The copies data are returned as a pair of dictionnary (p1copies, p2copies).
+
+ Each dictionnary are in the form: `{newname: oldname}`
+ """
+ p1copies = {}
+ p2copies = {}
+ p1 = ctx.p1()
+ p2 = ctx.p2()
+ narrowmatch = ctx._repo.narrowmatch()
+ for dst in ctx.files():
+ if not narrowmatch(dst) or dst not in ctx:
+ continue
+ copied = ctx[dst].renamed()
+ if not copied:
+ continue
+ src, srcnode = copied
+ if src in p1 and p1[src].filenode() == srcnode:
+ p1copies[dst] = src
+ elif src in p2 and p2[src].filenode() == srcnode:
+ p2copies[dst] = src
+ return p1copies, p2copies
+
+
+def encodecopies(files, copies):
+ items = []
+ for i, dst in enumerate(files):
+ if dst in copies:
+ items.append(b'%d\0%s' % (i, copies[dst]))
+ if len(items) != len(copies):
+ raise error.ProgrammingError(
+ b'some copy targets missing from file list'
+ )
+ return b"\n".join(items)
+
+
+def decodecopies(files, data):
+ try:
+ copies = {}
+ if not data:
+ return copies
+ for l in data.split(b'\n'):
+ strindex, src = l.split(b'\0')
+ i = int(strindex)
+ dst = files[i]
+ copies[dst] = src
+ return copies
+ except (ValueError, IndexError):
+ # Perhaps someone had chosen the same key name (e.g. "p1copies") and
+ # used different syntax for the value.
+ return None
+
+
+def encodefileindices(files, subset):
+ subset = set(subset)
+ indices = []
+ for i, f in enumerate(files):
+ if f in subset:
+ indices.append(b'%d' % i)
+ return b'\n'.join(indices)
+
+
+def decodefileindices(files, data):
+ try:
+ subset = []
+ if not data:
+ return subset
+ for strindex in data.split(b'\n'):
+ i = int(strindex)
+ if i < 0 or i >= len(files):
+ return None
+ subset.append(files[i])
+ return subset
+ except (ValueError, IndexError):
+ # Perhaps someone had chosen the same key name (e.g. "added") and
+ # used different syntax for the value.
+ return None
+
+
+def _getsidedata(srcrepo, rev):
+ ctx = srcrepo[rev]
+ filescopies = computechangesetcopies(ctx)
+ filesadded = computechangesetfilesadded(ctx)
+ filesremoved = computechangesetfilesremoved(ctx)
+ sidedata = {}
+ if any([filescopies, filesadded, filesremoved]):
+ sortedfiles = sorted(ctx.files())
+ p1copies, p2copies = filescopies
+ p1copies = encodecopies(sortedfiles, p1copies)
+ p2copies = encodecopies(sortedfiles, p2copies)
+ filesadded = encodefileindices(sortedfiles, filesadded)
+ filesremoved = encodefileindices(sortedfiles, filesremoved)
+ if p1copies:
+ sidedata[sidedatamod.SD_P1COPIES] = p1copies
+ if p2copies:
+ sidedata[sidedatamod.SD_P2COPIES] = p2copies
+ if filesadded:
+ sidedata[sidedatamod.SD_FILESADDED] = filesadded
+ if filesremoved:
+ sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
+ return sidedata
+
+
+def getsidedataadder(srcrepo, destrepo):
+ use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
+ if pycompat.iswindows or not use_w:
+ return _get_simple_sidedata_adder(srcrepo, destrepo)
+ else:
+ return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+ """The function used by worker precomputing sidedata
+
+ It read an input queue containing revision numbers
+ It write in an output queue containing (rev, <sidedata-map>)
+
+ The `None` input value is used as a stop signal.
+
+ The `tokens` semaphore is user to avoid having too many unprocessed
+ entries. The workers needs to acquire one token before fetching a task.
+ They will be released by the consumer of the produced data.
+ """
+ tokens.acquire()
+ rev = revs_queue.get()
+ while rev is not None:
+ data = _getsidedata(srcrepo, rev)
+ sidedata_queue.put((rev, data))
+ tokens.acquire()
+ rev = revs_queue.get()
+ # processing of `None` is completed, release the token.
+ tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+ """The parallel version of the sidedata computation
+
+ This code spawn a pool of worker that precompute a buffer of sidedata
+ before we actually need them"""
+ # avoid circular import copies -> scmutil -> worker -> copies
+ from . import worker
+
+ nbworkers = worker._numworkers(srcrepo.ui)
+
+ tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+ revsq = multiprocessing.Queue()
+ sidedataq = multiprocessing.Queue()
+
+ assert srcrepo.filtername is None
+ # queue all tasks beforehand, revision numbers are small and it make
+ # synchronisation simpler
+ #
+ # Since the computation for each node can be quite expensive, the overhead
+ # of using a single queue is not revelant. In practice, most computation
+ # are fast but some are very expensive and dominate all the other smaller
+ # cost.
+ for r in srcrepo.changelog.revs():
+ revsq.put(r)
+ # queue the "no more tasks" markers
+ for i in range(nbworkers):
+ revsq.put(None)
+
+ allworkers = []
+ for i in range(nbworkers):
+ args = (srcrepo, revsq, sidedataq, tokens)
+ w = multiprocessing.Process(target=_sidedata_worker, args=args)
+ allworkers.append(w)
+ w.start()
+
+ # dictionnary to store results for revision higher than we one we are
+ # looking for. For example, if we need the sidedatamap for 42, and 43 is
+ # received, when shelve 43 for later use.
+ staging = {}
+
+ def sidedata_companion(revlog, rev):
+ sidedata = {}
+ if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
+ # Is the data previously shelved ?
+ sidedata = staging.pop(rev, None)
+ if sidedata is None:
+ # look at the queued result until we find the one we are lookig
+ # for (shelve the other ones)
+ r, sidedata = sidedataq.get()
+ while r != rev:
+ staging[r] = sidedata
+ r, sidedata = sidedataq.get()
+ tokens.release()
+ return False, (), sidedata
+
+ return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+ """The simple version of the sidedata computation
+
+ It just compute it in the same thread on request"""
+
+ def sidedatacompanion(revlog, rev):
+ sidedata = {}
+ if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
+ sidedata = _getsidedata(srcrepo, rev)
+ return False, (), sidedata
+
+ return sidedatacompanion
+
+
+def getsidedataremover(srcrepo, destrepo):
+ def sidedatacompanion(revlog, rev):
+ f = ()
+ if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
+ if revlog.flags(rev) & sidedataflag.REVIDX_SIDEDATA:
+ f = (
+ sidedatamod.SD_P1COPIES,
+ sidedatamod.SD_P2COPIES,
+ sidedatamod.SD_FILESADDED,
+ sidedatamod.SD_FILESREMOVED,
+ )
+ return False, f, {}
+
+ return sidedatacompanion
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -8,7 +8,6 @@
from __future__ import absolute_import
import collections
-import multiprocessing
import os
from .i18n import _
@@ -17,7 +16,6 @@
from .revlogutils.flagutil import REVIDX_SIDEDATA
from . import (
- error,
match as matchmod,
node,
pathutil,
@@ -25,7 +23,6 @@
util,
)
-from .revlogutils import sidedata as sidedatamod
from .utils import stringutil
@@ -992,250 +989,3 @@
_filter(wctx.p1(), wctx, new_copies)
for dst, src in pycompat.iteritems(new_copies):
wctx[dst].markcopied(src)
-
-
-def computechangesetfilesadded(ctx):
- """return the list of files added in a changeset
- """
- added = []
- for f in ctx.files():
- if not any(f in p for p in ctx.parents()):
- added.append(f)
- return added
-
-
-def computechangesetfilesremoved(ctx):
- """return the list of files removed in a changeset
- """
- removed = []
- for f in ctx.files():
- if f not in ctx:
- removed.append(f)
- return removed
-
-
-def computechangesetcopies(ctx):
- """return the copies data for a changeset
-
- The copies data are returned as a pair of dictionnary (p1copies, p2copies).
-
- Each dictionnary are in the form: `{newname: oldname}`
- """
- p1copies = {}
- p2copies = {}
- p1 = ctx.p1()
- p2 = ctx.p2()
- narrowmatch = ctx._repo.narrowmatch()
- for dst in ctx.files():
- if not narrowmatch(dst) or dst not in ctx:
- continue
- copied = ctx[dst].renamed()
- if not copied:
- continue
- src, srcnode = copied
- if src in p1 and p1[src].filenode() == srcnode:
- p1copies[dst] = src
- elif src in p2 and p2[src].filenode() == srcnode:
- p2copies[dst] = src
- return p1copies, p2copies
-
-
-def encodecopies(files, copies):
- items = []
- for i, dst in enumerate(files):
- if dst in copies:
- items.append(b'%d\0%s' % (i, copies[dst]))
- if len(items) != len(copies):
- raise error.ProgrammingError(
- b'some copy targets missing from file list'
- )
- return b"\n".join(items)
-
-
-def decodecopies(files, data):
- try:
- copies = {}
- if not data:
- return copies
- for l in data.split(b'\n'):
- strindex, src = l.split(b'\0')
- i = int(strindex)
- dst = files[i]
- copies[dst] = src
- return copies
- except (ValueError, IndexError):
- # Perhaps someone had chosen the same key name (e.g. "p1copies") and
- # used different syntax for the value.
- return None
-
-
-def encodefileindices(files, subset):
- subset = set(subset)
- indices = []
- for i, f in enumerate(files):
- if f in subset:
- indices.append(b'%d' % i)
- return b'\n'.join(indices)
-
-
-def decodefileindices(files, data):
- try:
- subset = []
- if not data:
- return subset
- for strindex in data.split(b'\n'):
- i = int(strindex)
- if i < 0 or i >= len(files):
- return None
- subset.append(files[i])
- return subset
- except (ValueError, IndexError):
- # Perhaps someone had chosen the same key name (e.g. "added") and
- # used different syntax for the value.
- return None
-
-
-def _getsidedata(srcrepo, rev):
- ctx = srcrepo[rev]
- filescopies = computechangesetcopies(ctx)
- filesadded = computechangesetfilesadded(ctx)
- filesremoved = computechangesetfilesremoved(ctx)
- sidedata = {}
- if any([filescopies, filesadded, filesremoved]):
- sortedfiles = sorted(ctx.files())
- p1copies, p2copies = filescopies
- p1copies = encodecopies(sortedfiles, p1copies)
- p2copies = encodecopies(sortedfiles, p2copies)
- filesadded = encodefileindices(sortedfiles, filesadded)
- filesremoved = encodefileindices(sortedfiles, filesremoved)
- if p1copies:
- sidedata[sidedatamod.SD_P1COPIES] = p1copies
- if p2copies:
- sidedata[sidedatamod.SD_P2COPIES] = p2copies
- if filesadded:
- sidedata[sidedatamod.SD_FILESADDED] = filesadded
- if filesremoved:
- sidedata[sidedatamod.SD_FILESREMOVED] = filesremoved
- return sidedata
-
-
-def getsidedataadder(srcrepo, destrepo):
- use_w = srcrepo.ui.configbool(b'experimental', b'worker.repository-upgrade')
- if pycompat.iswindows or not use_w:
- return _get_simple_sidedata_adder(srcrepo, destrepo)
- else:
- return _get_worker_sidedata_adder(srcrepo, destrepo)
-
-
-def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
- """The function used by worker precomputing sidedata
-
- It read an input queue containing revision numbers
- It write in an output queue containing (rev, <sidedata-map>)
-
- The `None` input value is used as a stop signal.
-
- The `tokens` semaphore is user to avoid having too many unprocessed
- entries. The workers needs to acquire one token before fetching a task.
- They will be released by the consumer of the produced data.
- """
- tokens.acquire()
- rev = revs_queue.get()
- while rev is not None:
- data = _getsidedata(srcrepo, rev)
- sidedata_queue.put((rev, data))
- tokens.acquire()
- rev = revs_queue.get()
- # processing of `None` is completed, release the token.
- tokens.release()
-
-
-BUFF_PER_WORKER = 50
-
-
-def _get_worker_sidedata_adder(srcrepo, destrepo):
- """The parallel version of the sidedata computation
-
- This code spawn a pool of worker that precompute a buffer of sidedata
- before we actually need them"""
- # avoid circular import copies -> scmutil -> worker -> copies
- from . import worker
-
- nbworkers = worker._numworkers(srcrepo.ui)
-
- tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
- revsq = multiprocessing.Queue()
- sidedataq = multiprocessing.Queue()
-
- assert srcrepo.filtername is None
- # queue all tasks beforehand, revision numbers are small and it make
- # synchronisation simpler
- #
- # Since the computation for each node can be quite expensive, the overhead
- # of using a single queue is not revelant. In practice, most computation
- # are fast but some are very expensive and dominate all the other smaller
- # cost.
- for r in srcrepo.changelog.revs():
- revsq.put(r)
- # queue the "no more tasks" markers
- for i in range(nbworkers):
- revsq.put(None)
-
- allworkers = []
- for i in range(nbworkers):
- args = (srcrepo, revsq, sidedataq, tokens)
- w = multiprocessing.Process(target=_sidedata_worker, args=args)
- allworkers.append(w)
- w.start()
-
- # dictionnary to store results for revision higher than we one we are
- # looking for. For example, if we need the sidedatamap for 42, and 43 is
- # received, when shelve 43 for later use.
- staging = {}
-
- def sidedata_companion(revlog, rev):
- sidedata = {}
- if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog
- # Is the data previously shelved ?
- sidedata = staging.pop(rev, None)
- if sidedata is None:
- # look at the queued result until we find the one we are lookig
- # for (shelve the other ones)
- r, sidedata = sidedataq.get()
- while r != rev:
- staging[r] = sidedata
- r, sidedata = sidedataq.get()
- tokens.release()
- return False, (), sidedata
-
- return sidedata_companion
-
-
-def _get_simple_sidedata_adder(srcrepo, destrepo):
- """The simple version of the sidedata computation
-
- It just compute it in the same thread on request"""
-
- def sidedatacompanion(revlog, rev):
- sidedata = {}
- if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
- sidedata = _getsidedata(srcrepo, rev)
- return False, (), sidedata
-
- return sidedatacompanion
-
-
-def getsidedataremover(srcrepo, destrepo):
- def sidedatacompanion(revlog, rev):
- f = ()
- if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog
- if revlog.flags(rev) & REVIDX_SIDEDATA:
- f = (
- sidedatamod.SD_P1COPIES,
- sidedatamod.SD_P2COPIES,
- sidedatamod.SD_FILESADDED,
- sidedatamod.SD_FILESREMOVED,
- )
- return False, f, {}
-
- return sidedatacompanion
diff --git a/mercurial/context.py b/mercurial/context.py
--- a/mercurial/context.py
+++ b/mercurial/context.py
@@ -28,12 +28,12 @@
open,
)
from . import (
- copies,
dagop,
encoding,
error,
fileset,
match as matchmod,
+ metadata,
obsolete as obsmod,
patch,
pathutil,
@@ -299,7 +299,7 @@
@propertycache
def _copies(self):
- return copies.computechangesetcopies(self)
+ return metadata.computechangesetcopies(self)
def p1copies(self):
return self._copies[0]
@@ -582,7 +582,7 @@
filesadded = None
if filesadded is None:
if compute_on_none:
- filesadded = copies.computechangesetfilesadded(self)
+ filesadded = metadata.computechangesetfilesadded(self)
else:
filesadded = []
return filesadded
@@ -601,7 +601,7 @@
filesremoved = None
if filesremoved is None:
if compute_on_none:
- filesremoved = copies.computechangesetfilesremoved(self)
+ filesremoved = metadata.computechangesetfilesremoved(self)
else:
filesremoved = []
return filesremoved
diff --git a/mercurial/changelog.py b/mercurial/changelog.py
--- a/mercurial/changelog.py
+++ b/mercurial/changelog.py
@@ -16,9 +16,9 @@
from .thirdparty import attr
from . import (
- copies,
encoding,
error,
+ metadata,
pycompat,
revlog,
)
@@ -318,7 +318,7 @@
rawindices = self.extra.get(b'filesadded')
if rawindices is None:
return None
- return copies.decodefileindices(self.files, rawindices)
+ return metadata.decodefileindices(self.files, rawindices)
@property
def filesremoved(self):
@@ -330,7 +330,7 @@
rawindices = self.extra.get(b'filesremoved')
if rawindices is None:
return None
- return copies.decodefileindices(self.files, rawindices)
+ return metadata.decodefileindices(self.files, rawindices)
@property
def p1copies(self):
@@ -342,7 +342,7 @@
rawcopies = self.extra.get(b'p1copies')
if rawcopies is None:
return None
- return copies.decodecopies(self.files, rawcopies)
+ return metadata.decodecopies(self.files, rawcopies)
@property
def p2copies(self):
@@ -354,7 +354,7 @@
rawcopies = self.extra.get(b'p2copies')
if rawcopies is None:
return None
- return copies.decodecopies(self.files, rawcopies)
+ return metadata.decodecopies(self.files, rawcopies)
@property
def description(self):
@@ -570,13 +570,13 @@
):
extra.pop(name, None)
if p1copies is not None:
- p1copies = copies.encodecopies(sortedfiles, p1copies)
+ p1copies = metadata.encodecopies(sortedfiles, p1copies)
if p2copies is not None:
- p2copies = copies.encodecopies(sortedfiles, p2copies)
+ p2copies = metadata.encodecopies(sortedfiles, p2copies)
if filesadded is not None:
- filesadded = copies.encodefileindices(sortedfiles, filesadded)
+ filesadded = metadata.encodefileindices(sortedfiles, filesadded)
if filesremoved is not None:
- filesremoved = copies.encodefileindices(sortedfiles, filesremoved)
+ filesremoved = metadata.encodefileindices(sortedfiles, filesremoved)
if self._copiesstorage == b'extra':
extrasentries = p1copies, p2copies, filesadded, filesremoved
if extra is None and any(x is not None for x in extrasentries):
To: marmoute, #hg-reviewers
Cc: mercurial-patches, mercurial-devel
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mercurial-scm.org/pipermail/mercurial-patches/attachments/20200527/4c936d45/attachment-0001.html>
More information about the Mercurial-patches
mailing list