[Updated] D10878: revlog: Extract low-level random-access file read caching logic

SimonSapin phabricator at mercurial-scm.org
Thu Jun 17 17:30:25 UTC 2021


Closed by commit rHGe0a314bcbc9d: revlog: Extract low-level random-access file read caching logic (authored by SimonSapin).
This revision was automatically updated to reflect the committed changes.

CHANGED PRIOR TO COMMIT
  https://phab.mercurial-scm.org/D10878?vs=28597&id=28599#toc

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D10878?vs=28597&id=28599

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D10878/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D10878

AFFECTED FILES
  mercurial/changelog.py
  mercurial/revlog.py
  mercurial/revlogutils/randomaccessfile.py

CHANGE DETAILS

diff --git a/mercurial/revlogutils/randomaccessfile.py b/mercurial/revlogutils/randomaccessfile.py
new file mode 100644
--- /dev/null
+++ b/mercurial/revlogutils/randomaccessfile.py
@@ -0,0 +1,138 @@
+# Copyright Mercurial Contributors
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+import contextlib
+
+from ..i18n import _
+from .. import (
+    error,
+    util,
+)
+
+
+_MAX_CACHED_CHUNK_SIZE = 1048576  # 1 MiB
+
+PARTIAL_READ_MSG = _(
+    b'partial read of revlog %s; expected %d bytes from offset %d, got %d'
+)
+
+
+def _is_power_of_two(n):
+    return (n & (n - 1) == 0) and n != 0
+
+
+class randomaccessfile(object):
+    """Accessing arbitrary chuncks of data within a file, with some caching"""
+
+    def __init__(
+        self,
+        opener,
+        filename,
+        default_cached_chunk_size,
+        initial_cache=None,
+    ):
+        # Required by bitwise manipulation below
+        assert _is_power_of_two(default_cached_chunk_size)
+
+        self.opener = opener
+        self.filename = filename
+        self.default_cached_chunk_size = default_cached_chunk_size
+        self.writing_handle = None  # This is set from revlog.py
+        self._cached_chunk = b''
+        self._cached_chunk_position = 0  # Offset from the start of the file
+        if initial_cache:
+            self._cached_chunk_position, self._cached_chunk = initial_cache
+
+    def clear_cache(self):
+        self._cached_chunk = b''
+        self._cached_chunk_position = 0
+
+    def _open(self, mode=b'r'):
+        """Return a file object"""
+        return self.opener(self.filename, mode=mode)
+
+    @contextlib.contextmanager
+    def _open_read(self, existing_file_obj=None):
+        """File object suitable for reading data"""
+        # Use explicit file handle, if given.
+        if existing_file_obj is not None:
+            yield existing_file_obj
+
+        # Use a file handle being actively used for writes, if available.
+        # There is some danger to doing this because reads will seek the
+        # file. However, revlog._writeentry performs a SEEK_END before all
+        # writes, so we should be safe.
+        elif self.writing_handle:
+            yield self.writing_handle
+
+        # Otherwise open a new file handle.
+        else:
+            with self._open() as fp:
+                yield fp
+
+    def read_chunk(self, offset, length, existing_file_obj=None):
+        """Read a chunk of bytes from the file.
+
+        Accepts an absolute offset, length to read, and an optional existing
+        file handle to read from.
+
+        If an existing file handle is passed, it will be seeked and the
+        original seek position will NOT be restored.
+
+        Returns a str or buffer of raw byte data.
+
+        Raises if the requested number of bytes could not be read.
+        """
+        end = offset + length
+        cache_start = self._cached_chunk_position
+        cache_end = cache_start + len(self._cached_chunk)
+        # Is the requested chunk within the cache?
+        if cache_start <= offset and end <= cache_end:
+            if cache_start == offset and end == cache_end:
+                return self._cached_chunk  # avoid a copy
+            relative_start = offset - cache_start
+            return util.buffer(self._cached_chunk, relative_start, length)
+
+        return self._read_and_update_cache(offset, length, existing_file_obj)
+
+    def _read_and_update_cache(self, offset, length, existing_file_obj=None):
+        # Cache data both forward and backward around the requested
+        # data, in a fixed size window. This helps speed up operations
+        # involving reading the revlog backwards.
+        real_offset = offset & ~(self.default_cached_chunk_size - 1)
+        real_length = (
+            (offset + length + self.default_cached_chunk_size)
+            & ~(self.default_cached_chunk_size - 1)
+        ) - real_offset
+        with self._open_read(existing_file_obj) as file_obj:
+            file_obj.seek(real_offset)
+            data = file_obj.read(real_length)
+
+        self._add_cached_chunk(real_offset, data)
+
+        relative_offset = offset - real_offset
+        got = len(data) - relative_offset
+        if got < length:
+            message = PARTIAL_READ_MSG % (self.filename, length, offset, got)
+            raise error.RevlogError(message)
+
+        if offset != real_offset or real_length != length:
+            return util.buffer(data, relative_offset, length)
+        return data
+
+    def _add_cached_chunk(self, offset, data):
+        """Add to or replace the cached data chunk.
+
+        Accepts an absolute offset and the data that is at that location.
+        """
+        if (
+            self._cached_chunk_position + len(self._cached_chunk) == offset
+            and len(self._cached_chunk) + len(data) < _MAX_CACHED_CHUNK_SIZE
+        ):
+            # add to existing cache
+            self._cached_chunk += data
+        else:
+            self._cached_chunk = data
+            self._cached_chunk_position = offset
diff --git a/mercurial/revlog.py b/mercurial/revlog.py
--- a/mercurial/revlog.py
+++ b/mercurial/revlog.py
@@ -86,6 +86,7 @@
     docket as docketutil,
     flagutil,
     nodemap as nodemaputil,
+    randomaccessfile,
     revlogv0,
     sidedata as sidedatautil,
 )
@@ -125,7 +126,6 @@
 
 # max size of revlog with inline data
 _maxinline = 131072
-_chunksize = 1048576
 
 # Flag processors for REVIDX_ELLIPSIS.
 def ellipsisreadprocessor(rl, text):
@@ -232,10 +232,6 @@
 # signed integer)
 _maxentrysize = 0x7FFFFFFF
 
-PARTIAL_READ_MSG = _(
-    b'partial read of revlog %s; expected %d bytes from offset %d, got %d'
-)
-
 FILE_TOO_SHORT_MSG = _(
     b'cannot read from revlog %s;'
     b'  expected %d bytes from offset %d, data size is %d'
@@ -605,7 +601,7 @@
             self._parse_index = parse_index_v1_mixed
         try:
             d = self._parse_index(index_data, self._inline)
-            index, _chunkcache = d
+            index, chunkcache = d
             use_nodemap = (
                 not self._inline
                 and self._nodemap_file is not None
@@ -626,9 +622,13 @@
             raise error.RevlogError(
                 _(b"index %s is corrupted") % self.display_id
             )
-        self.index, self._chunkcache = d
-        if not self._chunkcache:
-            self._chunkclear()
+        self.index = index
+        self._segmentfile = randomaccessfile.randomaccessfile(
+            self.opener,
+            (self._indexfile if self._inline else self._datafile),
+            self._chunkcachesize,
+            chunkcache,
+        )
         # revnum -> (chain-length, sum-delta-length)
         self._chaininfocache = util.lrucachedict(500)
         # revlog header -> revlog compressor
@@ -709,32 +709,6 @@
         return self.opener(self._datafile, mode=mode)
 
     @contextlib.contextmanager
-    def _datareadfp(self, existingfp=None):
-        """file object suitable to read data"""
-        # Use explicit file handle, if given.
-        if existingfp is not None:
-            yield existingfp
-
-        # Use a file handle being actively used for writes, if available.
-        # There is some danger to doing this because reads will seek the
-        # file. However, _writeentry() performs a SEEK_END before all writes,
-        # so we should be safe.
-        elif self._writinghandles:
-            if self._inline:
-                yield self._writinghandles[0]
-            else:
-                yield self._writinghandles[1]
-
-        # Otherwise open a new file handle.
-        else:
-            if self._inline:
-                func = self._indexfp
-            else:
-                func = self._datafp
-            with func() as fp:
-                yield fp
-
-    @contextlib.contextmanager
     def _sidedatareadfp(self):
         """file object suitable to read sidedata"""
         if self._writinghandles:
@@ -807,7 +781,7 @@
     def clearcaches(self):
         self._revisioncache = None
         self._chainbasecache.clear()
-        self._chunkcache = (0, b'')
+        self._segmentfile.clear_cache()
         self._pcache = {}
         self._nodemap_docket = None
         self.index.clearcaches()
@@ -1629,85 +1603,6 @@
         p1, p2 = self.parents(node)
         return storageutil.hashrevisionsha1(text, p1, p2) != node
 
-    def _cachesegment(self, offset, data):
-        """Add a segment to the revlog cache.
-
-        Accepts an absolute offset and the data that is at that location.
-        """
-        o, d = self._chunkcache
-        # try to add to existing cache
-        if o + len(d) == offset and len(d) + len(data) < _chunksize:
-            self._chunkcache = o, d + data
-        else:
-            self._chunkcache = offset, data
-
-    def _readsegment(self, offset, length, df=None):
-        """Load a segment of raw data from the revlog.
-
-        Accepts an absolute offset, length to read, and an optional existing
-        file handle to read from.
-
-        If an existing file handle is passed, it will be seeked and the
-        original seek position will NOT be restored.
-
-        Returns a str or buffer of raw byte data.
-
-        Raises if the requested number of bytes could not be read.
-        """
-        # Cache data both forward and backward around the requested
-        # data, in a fixed size window. This helps speed up operations
-        # involving reading the revlog backwards.
-        cachesize = self._chunkcachesize
-        realoffset = offset & ~(cachesize - 1)
-        reallength = (
-            (offset + length + cachesize) & ~(cachesize - 1)
-        ) - realoffset
-        with self._datareadfp(df) as df:
-            df.seek(realoffset)
-            d = df.read(reallength)
-
-        self._cachesegment(realoffset, d)
-        if offset != realoffset or reallength != length:
-            startoffset = offset - realoffset
-            if len(d) - startoffset < length:
-                filename = self._indexfile if self._inline else self._datafile
-                got = len(d) - startoffset
-                m = PARTIAL_READ_MSG % (filename, length, offset, got)
-                raise error.RevlogError(m)
-            return util.buffer(d, startoffset, length)
-
-        if len(d) < length:
-            filename = self._indexfile if self._inline else self._datafile
-            got = len(d) - startoffset
-            m = PARTIAL_READ_MSG % (filename, length, offset, got)
-            raise error.RevlogError(m)
-
-        return d
-
-    def _getsegment(self, offset, length, df=None):
-        """Obtain a segment of raw data from the revlog.
-
-        Accepts an absolute offset, length of bytes to obtain, and an
-        optional file handle to the already-opened revlog. If the file
-        handle is used, it's original seek position will not be preserved.
-
-        Requests for data may be returned from a cache.
-
-        Returns a str or a buffer instance of raw byte data.
-        """
-        o, d = self._chunkcache
-        l = len(d)
-
-        # is it in the cache?
-        cachestart = offset - o
-        cacheend = cachestart + length
-        if cachestart >= 0 and cacheend <= l:
-            if cachestart == 0 and cacheend == l:
-                return d  # avoid a copy
-            return util.buffer(d, cachestart, cacheend - cachestart)
-
-        return self._readsegment(offset, length, df=df)
-
     def _getsegmentforrevs(self, startrev, endrev, df=None):
         """Obtain a segment of raw data corresponding to a range of revisions.
 
@@ -1740,7 +1635,7 @@
             end += (endrev + 1) * self.index.entry_size
         length = end - start
 
-        return start, self._getsegment(start, length, df=df)
+        return start, self._segmentfile.read_chunk(start, length, df)
 
     def _chunk(self, rev, df=None):
         """Obtain a single decompressed chunk for a revision.
@@ -1832,10 +1727,6 @@
 
         return l
 
-    def _chunkclear(self):
-        """Clear the raw chunk cache."""
-        self._chunkcache = (0, b'')
-
     def deltaparent(self, rev):
         """return deltaparent of the given revision"""
         base = self.index[rev][3]
@@ -2043,7 +1934,12 @@
                 length = sidedata_size
                 offset = sidedata_offset
                 got = len(comp_segment)
-                m = PARTIAL_READ_MSG % (filename, length, offset, got)
+                m = randomaccessfile.PARTIAL_READ_MSG % (
+                    filename,
+                    length,
+                    offset,
+                    got,
+                )
                 raise error.RevlogError(m)
 
         comp = self.index[rev][11]
@@ -2136,6 +2032,7 @@
             # We can't use the cached file handle after close(). So prevent
             # its usage.
             self._writinghandles = None
+            self._segmentfile.writing_handle = None
 
         new_dfh = self._datafp(b'w+')
         new_dfh.truncate(0)  # drop any potentially existing data
@@ -2171,12 +2068,17 @@
 
             tr.replace(self._indexfile, trindex * self.index.entry_size)
             nodemaputil.setup_persistent_nodemap(tr, self)
-            self._chunkclear()
+            self._segmentfile = randomaccessfile.randomaccessfile(
+                self.opener,
+                self._datafile,
+                self._chunkcachesize,
+            )
 
             if existing_handles:
                 # switched from inline to conventional reopen the index
                 ifh = self.__index_write_fp()
                 self._writinghandles = (ifh, new_dfh, None)
+                self._segmentfile.writing_handle = new_dfh
                 new_dfh = None
         finally:
             if new_dfh is not None:
@@ -2235,11 +2137,13 @@
                     transaction.add(self._indexfile, isize)
                 # exposing all file handle for writing.
                 self._writinghandles = (ifh, dfh, sdfh)
+                self._segmentfile.writing_handle = ifh if self._inline else dfh
                 yield
                 if self._docket is not None:
                     self._write_docket(transaction)
             finally:
                 self._writinghandles = None
+                self._segmentfile.writing_handle = None
                 if dfh is not None:
                     dfh.close()
                 if sdfh is not None:
@@ -2873,7 +2777,7 @@
         # then reset internal state in memory to forget those revisions
         self._revisioncache = None
         self._chaininfocache = util.lrucachedict(500)
-        self._chunkclear()
+        self._segmentfile.clear_cache()
 
         del self.index[rev:-1]
 
diff --git a/mercurial/changelog.py b/mercurial/changelog.py
--- a/mercurial/changelog.py
+++ b/mercurial/changelog.py
@@ -454,6 +454,7 @@
                 self.opener = _delayopener(
                     self._realopener, self._indexfile, self._delaybuf
                 )
+            self._segmentfile.opener = self.opener
         self._delayed = True
         tr.addpending(b'cl-%i' % id(self), self._writepending)
         tr.addfinalize(b'cl-%i' % id(self), self._finalize)
@@ -462,6 +463,7 @@
         """finalize index updates"""
         self._delayed = False
         self.opener = self._realopener
+        self._segmentfile.opener = self.opener
         # move redirected index data back into place
         if self._docket is not None:
             self._write_docket(tr)
@@ -501,6 +503,7 @@
             self._delaybuf = None
             self._divert = True
             self.opener = _divertopener(self._realopener, self._indexfile)
+            self._segmentfile.opener = self.opener
 
         if self._divert:
             return True



To: SimonSapin, indygreg, #hg-reviewers, Alphare
Cc: Alphare, mercurial-patches
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mercurial-scm.org/pipermail/mercurial-patches/attachments/20210617/4abbf9cb/attachment-0002.html>


More information about the Mercurial-patches mailing list