18842: put the file locking back
[arvados.git] / sdk / python / arvados / diskcache.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 import threading
6 import mmap
7 import os
8 import traceback
9 import stat
10 import tempfile
11 import fcntl
12
13 class DiskCacheSlot(object):
14     __slots__ = ("locator", "ready", "content", "cachedir")
15
16     def __init__(self, locator, cachedir):
17         self.locator = locator
18         self.ready = threading.Event()
19         self.content = None
20         self.cachedir = cachedir
21
22     def get(self):
23         self.ready.wait()
24         return self.content
25
26     def set(self, value):
27         try:
28             if value is None:
29                 self.content = None
30                 return
31
32             if len(value) == 0:
33                 # Can't mmap a 0 length file
34                 self.content = b''
35                 return
36
37             if self.content is not None:
38                 # Has been set already
39                 return
40
41             blockdir = os.path.join(self.cachedir, self.locator[0:3])
42             os.makedirs(blockdir, mode=0o700, exist_ok=True)
43
44             final = os.path.join(blockdir, self.locator)
45
46             f = tempfile.NamedTemporaryFile(dir=blockdir, delete=False)
47             tmpfile = f.name
48             os.chmod(tmpfile, stat.S_IRUSR | stat.S_IWUSR)
49
50             # aquire a shared lock, this tells other processes that
51             # we're using this block and to please not delete it.
52             fcntl.flock(f, fcntl.LOCK_SH)
53
54             f.write(value)
55             f.flush()
56             os.rename(tmpfile, final)
57
58             self.content = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
59         except Exception as e:
60             traceback.print_exc()
61         finally:
62             self.ready.set()
63
64     def size(self):
65         if self.content is None:
66             return 0
67         else:
68             return len(self.content)
69
70     def evict(self):
71         if self.content is not None and len(self.content) > 0:
72             # The mmap region might be in use when we decided to evict
73             # it.  This can happen if the cache is too small.
74             #
75             # If we call close() now, it'll throw an error if
76             # something tries to access it.
77             #
78             # However, we don't need to explicitly call mmap.close()
79             #
80             # I confirmed in mmapmodule.c that that both close
81             # and deallocate do the same thing:
82             #
83             # a) close the file descriptor
84             # b) unmap the memory range
85             #
86             # So we can forget it in the cache and delete the file on
87             # disk, and it will tear it down after any other
88             # lingering Python references to the mapped memory are
89             # gone.
90
91             blockdir = os.path.join(self.cachedir, self.locator[0:3])
92             final = os.path.join(blockdir, self.locator)
93             try:
94                 with open(final, "rb") as f:
95                     # unlock,
96                     fcntl.flock(f, fcntl.LOCK_UN)
97
98                     # try to get an exclusive lock, this ensures other
99                     # processes are not using the block.  It is
100                     # nonblocking and will throw an exception if we
101                     # can't get it, which is fine because that means
102                     # we just won't try to delete it.
103                     #
104                     # I should note here, the file locking is not
105                     # strictly necessary, we could just remove it and
106                     # the kernel would ensure that the underlying
107                     # inode remains available as long as other
108                     # processes still have the file open.  However, if
109                     # you have multiple processes sharing the cache
110                     # and deleting each other's files, you'll end up
111                     # with a bunch of ghost files that don't show up
112                     # in the file system but are still taking up
113                     # space, which isn't particularly user friendly.
114                     # The locking strategy ensures that cache blocks
115                     # in use remain visible.
116                     #
117                     fcntl.flock(filehandle, fcntl.LOCK_EX | fcntl.LOCK_NB)
118
119                     os.remove(final)
120                     return True
121             except OSError:
122                 pass
123             return False
124
125     @staticmethod
126     def get_from_disk(locator, cachedir):
127         blockdir = os.path.join(cachedir, locator[0:3])
128         final = os.path.join(blockdir, locator)
129
130         try:
131             filehandle = open(final, "rb")
132
133             # aquire a shared lock, this tells other processes that
134             # we're using this block and to please not delete it.
135             fcntl.flock(filehandle, fcntl.LOCK_SH)
136
137             content = mmap.mmap(filehandle.fileno(), 0, access=mmap.ACCESS_READ)
138             dc = DiskCacheSlot(locator, cachedir)
139             dc.content = content
140             dc.ready.set()
141             return dc
142         except FileNotFoundError:
143             pass
144         except Exception as e:
145             traceback.print_exc()
146
147         return None
148
149     @staticmethod
150     def init_cache(cachedir, maxslots):
151         # map in all the files in the cache directory, up to max slots.
152         # after max slots, try to delete the excess blocks.
153         #
154         # this gives the calling process ownership of all the blocks
155
156         blocks = []
157         for root, dirs, files in os.walk(cachedir):
158             for name in files:
159                 blockpath = os.path.join(root, name)
160                 res = os.stat(blockpath)
161                 blocks.append((name, res.st_atime))
162
163         # sort by access time (atime), going from most recently
164         # accessed (highest timestamp) to least recently accessed
165         # (lowest timestamp).
166         blocks.sort(key=lambda x: x[1], reverse=True)
167
168         # Map in all the files we found, up to maxslots, if we exceed
169         # maxslots, start throwing things out.
170         cachelist = []
171         for b in blocks:
172             got = DiskCacheSlot.get_from_disk(b[0], cachedir)
173             if got is None:
174                 continue
175             if len(cachelist) < maxslots:
176                 cachelist.append(got)
177             else:
178                 # we found more blocks than maxslots, try to
179                 # throw it out of the cache.
180                 got.evict()
181
182         return cachelist