[Updated] D10878: revlog: Extract low-level random-access file read caching logic
baymax (Baymax, Your Personal Patch-care Companion)
phabricator at mercurial-scm.org
Thu Jun 17 16:29:13 UTC 2021
baymax added a comment.
baymax updated this revision to Diff 28597.
✅ refresh by Heptapod after a successful CI run (🐙 💚)
REPOSITORY
rHG Mercurial
CHANGES SINCE LAST UPDATE
https://phab.mercurial-scm.org/D10878?vs=28563&id=28597
BRANCH
default
CHANGES SINCE LAST ACTION
https://phab.mercurial-scm.org/D10878/new/
REVISION DETAIL
https://phab.mercurial-scm.org/D10878
AFFECTED FILES
mercurial/changelog.py
mercurial/revlog.py
mercurial/revlogutils/randomaccessfile.py
CHANGE DETAILS
diff --git a/mercurial/revlogutils/randomaccessfile.py b/mercurial/revlogutils/randomaccessfile.py
new file mode 100644
--- /dev/null
+++ b/mercurial/revlogutils/randomaccessfile.py
@@ -0,0 +1,138 @@
+# Copyright Mercurial Contributors
+#
+# This software may be used and distributed according to the terms of the
+# GNU General Public License version 2 or any later version.
+
+import contextlib
+
+from ..i18n import _
+from .. import (
+ error,
+ util,
+)
+
+
+_MAX_CACHED_CHUNK_SIZE = 1048576 # 1 MiB
+
+PARTIAL_READ_MSG = _(
+ b'partial read of revlog %s; expected %d bytes from offset %d, got %d'
+)
+
+
+def _is_power_of_two(n):
+ return (n & (n - 1) == 0) and n != 0
+
+
+class randomaccessfile(object):
+ """Accessing arbitrary chuncks of data within a file, with some caching"""
+
+ def __init__(
+ self,
+ opener,
+ filename,
+ default_cached_chunk_size,
+ initial_cache=None,
+ ):
+ # Required by bitwise manipulation below
+ assert _is_power_of_two(default_cached_chunk_size)
+
+ self.opener = opener
+ self.filename = filename
+ self.default_cached_chunk_size = default_cached_chunk_size
+ self.writing_handle = None # This is set from revlog.py
+ self._cached_chunk = b''
+ self._cached_chunk_position = 0 # Offset from the start of the file
+ if initial_cache:
+ self._cached_chunk_position, self._cached_chunk = initial_cache
+
+ def clear_cache(self):
+ self._cached_chunk = b''
+ self._cached_chunk_position = 0
+
+ def _open(self, mode=b'r'):
+ """Return a file object"""
+ return self.opener(self.filename, mode=mode)
+
+ @contextlib.contextmanager
+ def _open_read(self, existing_file_obj=None):
+ """File object suitable for reading data"""
+ # Use explicit file handle, if given.
+ if existing_file_obj is not None:
+ yield existing_file_obj
+
+ # Use a file handle being actively used for writes, if available.
+ # There is some danger to doing this because reads will seek the
+ # file. However, revlog._writeentry performs a SEEK_END before all
+ # writes, so we should be safe.
+ elif self.writing_handle:
+ yield self.writing_handle
+
+ # Otherwise open a new file handle.
+ else:
+ with self._open() as fp:
+ yield fp
+
+ def read_chunk(self, offset, length, existing_file_obj=None):
+ """Read a chunk of bytes from the file.
+
+ Accepts an absolute offset, length to read, and an optional existing
+ file handle to read from.
+
+ If an existing file handle is passed, it will be seeked and the
+ original seek position will NOT be restored.
+
+ Returns a str or buffer of raw byte data.
+
+ Raises if the requested number of bytes could not be read.
+ """
+ end = offset + length
+ cache_start = self._cached_chunk_position
+ cache_end = cache_start + len(self._cached_chunk)
+ # Is the requested chunk within the cache?
+ if cache_start <= offset and end <= cache_end:
+ if cache_start == offset and end == cache_end:
+ return self._cached_chunk # avoid a copy
+ relative_start = offset - cache_start
+ return util.buffer(self._cached_chunk, relative_start, length)
+
+ return self._read_and_update_cache(offset, length, existing_file_obj)
+
+ def _read_and_update_cache(self, offset, length, existing_file_obj=None):
+ # Cache data both forward and backward around the requested
+ # data, in a fixed size window. This helps speed up operations
+ # involving reading the revlog backwards.
+ real_offset = offset & ~(self.default_cached_chunk_size - 1)
+ real_length = (
+ (offset + length + self.default_cached_chunk_size)
+ & ~(self.default_cached_chunk_size - 1)
+ ) - real_offset
+ with self._open_read(existing_file_obj) as file_obj:
+ file_obj.seek(real_offset)
+ data = file_obj.read(real_length)
+
+ self._add_cached_chunk(real_offset, data)
+
+ relative_offset = offset - real_offset
+ got = len(data) - relative_offset
+ if got < length:
+ message = PARTIAL_READ_MSG % (self.filename, length, offset, got)
+ raise error.RevlogError(message)
+
+ if offset != real_offset or real_length != length:
+ return util.buffer(data, relative_offset, length)
+ return data
+
+ def _add_cached_chunk(self, offset, data):
+ """Add to or replace the cached data chunk.
+
+ Accepts an absolute offset and the data that is at that location.
+ """
+ if (
+ self._cached_chunk_position + len(self._cached_chunk) == offset
+ and len(self._cached_chunk) + len(data) < _MAX_CACHED_CHUNK_SIZE
+ ):
+ # add to existing cache
+ self._cached_chunk += data
+ else:
+ self._cached_chunk = data
+ self._cached_chunk_position = offset
diff --git a/mercurial/revlog.py b/mercurial/revlog.py
--- a/mercurial/revlog.py
+++ b/mercurial/revlog.py
@@ -89,6 +89,7 @@
revlogv0,
sidedata as sidedatautil,
)
+from .revlogutils import randomaccessfile
from .utils import (
storageutil,
stringutil,
@@ -125,7 +126,6 @@
# max size of revlog with inline data
_maxinline = 131072
-_chunksize = 1048576
# Flag processors for REVIDX_ELLIPSIS.
def ellipsisreadprocessor(rl, text):
@@ -232,10 +232,6 @@
# signed integer)
_maxentrysize = 0x7FFFFFFF
-PARTIAL_READ_MSG = _(
- b'partial read of revlog %s; expected %d bytes from offset %d, got %d'
-)
-
FILE_TOO_SHORT_MSG = _(
b'cannot read from revlog %s;'
b' expected %d bytes from offset %d, data size is %d'
@@ -605,7 +601,7 @@
self._parse_index = parse_index_v1_mixed
try:
d = self._parse_index(index_data, self._inline)
- index, _chunkcache = d
+ index, chunkcache = d
use_nodemap = (
not self._inline
and self._nodemap_file is not None
@@ -626,9 +622,13 @@
raise error.RevlogError(
_(b"index %s is corrupted") % self.display_id
)
- self.index, self._chunkcache = d
- if not self._chunkcache:
- self._chunkclear()
+ self.index = index
+ self._segmentfile = randomaccessfile.randomaccessfile(
+ self.opener,
+ (self._indexfile if self._inline else self._datafile),
+ self._chunkcachesize,
+ chunkcache,
+ )
# revnum -> (chain-length, sum-delta-length)
self._chaininfocache = util.lrucachedict(500)
# revlog header -> revlog compressor
@@ -709,32 +709,6 @@
return self.opener(self._datafile, mode=mode)
@contextlib.contextmanager
- def _datareadfp(self, existingfp=None):
- """file object suitable to read data"""
- # Use explicit file handle, if given.
- if existingfp is not None:
- yield existingfp
-
- # Use a file handle being actively used for writes, if available.
- # There is some danger to doing this because reads will seek the
- # file. However, _writeentry() performs a SEEK_END before all writes,
- # so we should be safe.
- elif self._writinghandles:
- if self._inline:
- yield self._writinghandles[0]
- else:
- yield self._writinghandles[1]
-
- # Otherwise open a new file handle.
- else:
- if self._inline:
- func = self._indexfp
- else:
- func = self._datafp
- with func() as fp:
- yield fp
-
- @contextlib.contextmanager
def _sidedatareadfp(self):
"""file object suitable to read sidedata"""
if self._writinghandles:
@@ -807,7 +781,7 @@
def clearcaches(self):
self._revisioncache = None
self._chainbasecache.clear()
- self._chunkcache = (0, b'')
+ self._segmentfile.clear_cache()
self._pcache = {}
self._nodemap_docket = None
self.index.clearcaches()
@@ -1629,85 +1603,6 @@
p1, p2 = self.parents(node)
return storageutil.hashrevisionsha1(text, p1, p2) != node
- def _cachesegment(self, offset, data):
- """Add a segment to the revlog cache.
-
- Accepts an absolute offset and the data that is at that location.
- """
- o, d = self._chunkcache
- # try to add to existing cache
- if o + len(d) == offset and len(d) + len(data) < _chunksize:
- self._chunkcache = o, d + data
- else:
- self._chunkcache = offset, data
-
- def _readsegment(self, offset, length, df=None):
- """Load a segment of raw data from the revlog.
-
- Accepts an absolute offset, length to read, and an optional existing
- file handle to read from.
-
- If an existing file handle is passed, it will be seeked and the
- original seek position will NOT be restored.
-
- Returns a str or buffer of raw byte data.
-
- Raises if the requested number of bytes could not be read.
- """
- # Cache data both forward and backward around the requested
- # data, in a fixed size window. This helps speed up operations
- # involving reading the revlog backwards.
- cachesize = self._chunkcachesize
- realoffset = offset & ~(cachesize - 1)
- reallength = (
- (offset + length + cachesize) & ~(cachesize - 1)
- ) - realoffset
- with self._datareadfp(df) as df:
- df.seek(realoffset)
- d = df.read(reallength)
-
- self._cachesegment(realoffset, d)
- if offset != realoffset or reallength != length:
- startoffset = offset - realoffset
- if len(d) - startoffset < length:
- filename = self._indexfile if self._inline else self._datafile
- got = len(d) - startoffset
- m = PARTIAL_READ_MSG % (filename, length, offset, got)
- raise error.RevlogError(m)
- return util.buffer(d, startoffset, length)
-
- if len(d) < length:
- filename = self._indexfile if self._inline else self._datafile
- got = len(d) - startoffset
- m = PARTIAL_READ_MSG % (filename, length, offset, got)
- raise error.RevlogError(m)
-
- return d
-
- def _getsegment(self, offset, length, df=None):
- """Obtain a segment of raw data from the revlog.
-
- Accepts an absolute offset, length of bytes to obtain, and an
- optional file handle to the already-opened revlog. If the file
- handle is used, it's original seek position will not be preserved.
-
- Requests for data may be returned from a cache.
-
- Returns a str or a buffer instance of raw byte data.
- """
- o, d = self._chunkcache
- l = len(d)
-
- # is it in the cache?
- cachestart = offset - o
- cacheend = cachestart + length
- if cachestart >= 0 and cacheend <= l:
- if cachestart == 0 and cacheend == l:
- return d # avoid a copy
- return util.buffer(d, cachestart, cacheend - cachestart)
-
- return self._readsegment(offset, length, df=df)
-
def _getsegmentforrevs(self, startrev, endrev, df=None):
"""Obtain a segment of raw data corresponding to a range of revisions.
@@ -1740,7 +1635,7 @@
end += (endrev + 1) * self.index.entry_size
length = end - start
- return start, self._getsegment(start, length, df=df)
+ return start, self._segmentfile.read_chunk(start, length, df)
def _chunk(self, rev, df=None):
"""Obtain a single decompressed chunk for a revision.
@@ -1832,10 +1727,6 @@
return l
- def _chunkclear(self):
- """Clear the raw chunk cache."""
- self._chunkcache = (0, b'')
-
def deltaparent(self, rev):
"""return deltaparent of the given revision"""
base = self.index[rev][3]
@@ -2043,7 +1934,12 @@
length = sidedata_size
offset = sidedata_offset
got = len(comp_segment)
- m = PARTIAL_READ_MSG % (filename, length, offset, got)
+ m = randomaccessfile.PARTIAL_READ_MSG % (
+ filename,
+ length,
+ offset,
+ got,
+ )
raise error.RevlogError(m)
comp = self.index[rev][11]
@@ -2136,6 +2032,7 @@
# We can't use the cached file handle after close(). So prevent
# its usage.
self._writinghandles = None
+ self._segmentfile.writing_handle = None
new_dfh = self._datafp(b'w+')
new_dfh.truncate(0) # drop any potentially existing data
@@ -2171,12 +2068,17 @@
tr.replace(self._indexfile, trindex * self.index.entry_size)
nodemaputil.setup_persistent_nodemap(tr, self)
- self._chunkclear()
+ self._segmentfile = randomaccessfile.randomaccessfile(
+ self.opener,
+ self._datafile,
+ self._chunkcachesize,
+ )
if existing_handles:
# switched from inline to conventional reopen the index
ifh = self.__index_write_fp()
self._writinghandles = (ifh, new_dfh, None)
+ self._segmentfile.writing_handle = new_dfh
new_dfh = None
finally:
if new_dfh is not None:
@@ -2235,11 +2137,13 @@
transaction.add(self._indexfile, isize)
# exposing all file handle for writing.
self._writinghandles = (ifh, dfh, sdfh)
+ self._segmentfile.writing_handle = ifh if self._inline else dfh
yield
if self._docket is not None:
self._write_docket(transaction)
finally:
self._writinghandles = None
+ self._segmentfile.writing_handle = None
if dfh is not None:
dfh.close()
if sdfh is not None:
@@ -2873,7 +2777,7 @@
# then reset internal state in memory to forget those revisions
self._revisioncache = None
self._chaininfocache = util.lrucachedict(500)
- self._chunkclear()
+ self._segmentfile.clear_cache()
del self.index[rev:-1]
diff --git a/mercurial/changelog.py b/mercurial/changelog.py
--- a/mercurial/changelog.py
+++ b/mercurial/changelog.py
@@ -454,6 +454,7 @@
self.opener = _delayopener(
self._realopener, self._indexfile, self._delaybuf
)
+ self._segmentfile.opener = self.opener
self._delayed = True
tr.addpending(b'cl-%i' % id(self), self._writepending)
tr.addfinalize(b'cl-%i' % id(self), self._finalize)
@@ -462,6 +463,7 @@
"""finalize index updates"""
self._delayed = False
self.opener = self._realopener
+ self._segmentfile.opener = self.opener
# move redirected index data back into place
if self._docket is not None:
self._write_docket(tr)
@@ -501,6 +503,7 @@
self._delaybuf = None
self._divert = True
self.opener = _divertopener(self._realopener, self._indexfile)
+ self._segmentfile.opener = self.opener
if self._divert:
return True
To: SimonSapin, indygreg, #hg-reviewers, Alphare
Cc: Alphare, mercurial-patches
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.mercurial-scm.org/pipermail/mercurial-patches/attachments/20210617/e8fe58cb/attachment-0002.html>
More information about the Mercurial-patches
mailing list