projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '18027-unmount-fuse'
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index 37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b..e915ff2ac0a37c86c635f9ce68f52d227fed1725 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-481,7
+481,7
@@
class _BlockManager(object):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None):
+ def __init__(self, keep, copies=None, put_threads=None
, num_retries=None, storage_classes_func=None
):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@
-491,15
+491,14
@@
class _BlockManager(object):
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self.num_retries = num_retries
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@
-554,9
+553,9
@@
class _BlockManager(object):
return
if self.copies is None:
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
, num_retries=self.num_retries, classes=self.storage_classes()
)
else:
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(),
copies=self.copies
)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(),
num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()
)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
@@
-571,7
+570,7
@@
class _BlockManager(object):
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be sen
d
to the Keep
+ # generating data more quickly than it can be sen
t
to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
@@
-725,9
+724,9
@@
class _BlockManager(object):
if sync:
try:
if self.copies is None:
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()
, num_retries=self.num_retries, classes=self.storage_classes()
)
else:
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(),
copies=self.copies
)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(),
num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()
)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)