projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
20937: Add error handling to threaded collection copy
[arvados.git]
/
sdk
/
python
/
arvados
/
diskcache.py
diff --git
a/sdk/python/arvados/diskcache.py
b/sdk/python/arvados/diskcache.py
index 74b2a77b28506b2b5fd15b64b7977d877ea0ace8..f8fca5780332e41ec1f894759b27df5c0bffd1a1 100644
(file)
--- a/
sdk/python/arvados/diskcache.py
+++ b/
sdk/python/arvados/diskcache.py
@@
-12,13
+12,14
@@
import fcntl
import time
import errno
import logging
import time
import errno
import logging
+import weakref
_logger = logging.getLogger('arvados.keep')
cacheblock_suffix = ".keepcacheblock"
class DiskCacheSlot(object):
_logger = logging.getLogger('arvados.keep')
cacheblock_suffix = ".keepcacheblock"
class DiskCacheSlot(object):
- __slots__ = ("locator", "ready", "content", "cachedir", "filehandle")
+ __slots__ = ("locator", "ready", "content", "cachedir", "filehandle"
, "linger"
)
def __init__(self, locator, cachedir):
self.locator = locator
def __init__(self, locator, cachedir):
self.locator = locator
@@
-26,6
+27,7
@@
class DiskCacheSlot(object):
self.content = None
self.cachedir = cachedir
self.filehandle = None
self.content = None
self.cachedir = cachedir
self.filehandle = None
+ self.linger = None
def get(self):
self.ready.wait()
def get(self):
self.ready.wait()
@@
-36,15
+38,18
@@
class DiskCacheSlot(object):
try:
if value is None:
self.content = None
try:
if value is None:
self.content = None
+ self.ready.set()
return
if len(value) == 0:
# Can't mmap a 0 length file
self.content = b''
return
if len(value) == 0:
# Can't mmap a 0 length file
self.content = b''
+ self.ready.set()
return
if self.content is not None:
# Has been set already
return
if self.content is not None:
# Has been set already
+ self.ready.set()
return
blockdir = os.path.join(self.cachedir, self.locator[0:3])
return
blockdir = os.path.join(self.cachedir, self.locator[0:3])
@@
-66,6
+71,8
@@
class DiskCacheSlot(object):
tmpfile = None
self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
tmpfile = None
self.content = mmap.mmap(self.filehandle.fileno(), 0, access=mmap.ACCESS_READ)
+ # only set the event when mmap is successful
+ self.ready.set()
finally:
if tmpfile is not None:
# If the tempfile hasn't been renamed on disk yet, try to delete it.
finally:
if tmpfile is not None:
# If the tempfile hasn't been renamed on disk yet, try to delete it.
@@
-76,6
+83,13
@@
class DiskCacheSlot(object):
def size(self):
if self.content is None:
def size(self):
if self.content is None:
+ if self.linger is not None:
+ # If it is still lingering (object is still accessible
+ # through the weak reference) it is still taking up
+ # space.
+ content = self.linger()
+ if content is not None:
+ return len(content)
return 0
else:
return len(self.content)
return 0
else:
return len(self.content)
@@
-133,8
+147,13
@@
class DiskCacheSlot(object):
pass
finally:
self.filehandle = None
pass
finally:
self.filehandle = None
+ self.linger = weakref.ref(self.content)
self.content = None
self.content = None
- return False
+ return False
+
+ def gone(self):
+ # Test if an evicted object is lingering
+ return self.content is None and (self.linger is None or self.linger() is None)
@staticmethod
def get_from_disk(locator, cachedir):
@staticmethod
def get_from_disk(locator, cachedir):