projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '9397-prepopulate-output-directory' refs #9397
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index e00eeae47e3890ee7307fb6d7aa9b8e0e87b331e..edeb910570ed80af8d1b45589cc252f7b58e718f 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-5,7
+5,6
@@
import bz2
import config
import hashlib
import threading
import config
import hashlib
import threading
-import traceback
import Queue
import copy
import errno
import Queue
import copy
import errno
@@
-405,7
+404,7
@@
class _BlockManager(object):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None):
+ def __init__(self, keep, copies=None
, put_threads=None
):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@
-415,7
+414,10
@@
class _BlockManager(object):
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ if put_threads:
+ self.num_put_threads = put_threads
+ else:
+ self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self._pending_write_size = 0
@@
-517,7
+519,7
@@
class _BlockManager(object):
return
self._keep.get(b)
except Exception:
return
self._keep.get(b)
except Exception:
- _logger.e
rror(traceback.format_exc()
)
+ _logger.e
xception("Exception doing block prefetch"
)
@synchronized
def start_get_threads(self):
@synchronized
def start_get_threads(self):
@@
-760,6
+762,14
@@
class ArvadosFile(object):
def writable(self):
return self.parent.writable()
def writable(self):
return self.parent.writable()
+ @synchronized
+ def permission_expired(self, as_of_dt=None):
+ """Returns True if any of the segment's locators is expired"""
+ for r in self._segments:
+ if KeepLocator(r.locator).permission_expired(as_of_dt):
+ return True
+ return False
+
@synchronized
def segments(self):
return copy.copy(self._segments)
@synchronized
def segments(self):
return copy.copy(self._segments)
@@
-1060,12
+1070,15
@@
class ArvadosFile(object):
return 0
@synchronized
return 0
@synchronized
- def manifest_text(self, stream_name=".", portable_locators=False, normalize=False):
+ def manifest_text(self, stream_name=".", portable_locators=False,
+ normalize=False, only_committed=False):
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
buf = ""
filestream = []
for segment in self.segments:
loc = segment.locator
- if loc.startswith("bufferblock"):
+ if self.parent._my_block_manager().is_bufferblock(loc):
+ if only_committed:
+ continue
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()
loc = self._bufferblocks[loc].calculate_locator()
if portable_locators:
loc = KeepLocator(loc).stripped()