projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
17717: Merge branch 'master' into 17717-costanalyzer-date-mode
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index 7c6b732d36fcefc5a88896382ed99b61b83f9d4d..e915ff2ac0a37c86c635f9ce68f52d227fed1725 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-481,7
+481,7
@@
class _BlockManager(object):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes
=[]
):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes
_func=None
):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@
-491,13
+491,10
@@
class _BlockManager(object):
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
- self.storage_classes = storage_classes
+ self.storage_classes = storage_classes
_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
@@
-556,9
+553,9
@@
class _BlockManager(object):
return
if self.copies is None:
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes
()
)
else:
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes
()
)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
@@
-573,7
+570,7
@@
class _BlockManager(object):
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be sen
d
to the Keep
+ # generating data more quickly than it can be sen
t
to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
@@
-727,9
+724,9
@@
class _BlockManager(object):
if sync:
try:
if self.copies is None:
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, classes=self.storage_classes
()
)
else:
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes
()
)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)