projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
20613: Update unconfigured logger check to accommodate NullHandler
[arvados.git]
/
sdk
/
python
/
arvados
/
arvfile.py
diff --git
a/sdk/python/arvados/arvfile.py
b/sdk/python/arvados/arvfile.py
index 37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b..2ce0e46b30bd67ad948f832183ab091865c2ea53 100644
(file)
--- a/
sdk/python/arvados/arvfile.py
+++ b/
sdk/python/arvados/arvfile.py
@@
-481,7
+481,7
@@
class _BlockManager(object):
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None):
+ def __init__(self, keep, copies=None, put_threads=None
, num_retries=None, storage_classes_func=None, get_threads=None
):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
@@
-491,15
+491,14
@@
class _BlockManager(object):
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
self._prefetch_threads = None
self.lock = threading.Lock()
self.prefetch_enabled = True
- if put_threads:
- self.num_put_threads = put_threads
- else:
- self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.copies = copies
+ self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
self._pending_write_size = 0
self.threads_lock = threading.Lock()
self.padding_block = None
+ self.num_retries = num_retries
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@synchronized
def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@
-554,9
+553,9
@@
class _BlockManager(object):
return
if self.copies is None:
return
if self.copies is None:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
, num_retries=self.num_retries, classes=self.storage_classes()
)
else:
else:
- loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(),
copies=self.copies
)
+ loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(),
num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()
)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
bufferblock.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
bufferblock.set_state(_BufferBlock.ERROR, e)
@@
-571,7
+570,7
@@
class _BlockManager(object):
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
# If we don't limit the Queue size, the upload queue can quickly
# grow to take up gigabytes of RAM if the writing process is
- # generating data more quickly than it can be sen
d
to the Keep
+ # generating data more quickly than it can be sen
t
to the Keep
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
# servers.
#
# With two upload threads and a queue size of 2, this means up to 4
@@
-594,7
+593,7
@@
class _BlockManager(object):
b = self._prefetch_queue.get()
if b is None:
return
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b)
+ self._keep.get(b
, prefetch=True
)
except Exception:
_logger.exception("Exception doing block prefetch")
except Exception:
_logger.exception("Exception doing block prefetch")
@@
-725,9
+724,9
@@
class _BlockManager(object):
if sync:
try:
if self.copies is None:
if sync:
try:
if self.copies is None:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes()
, num_retries=self.num_retries, classes=self.storage_classes()
)
else:
else:
- loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(),
copies=self.copies
)
+ loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(),
num_retries=self.num_retries, copies=self.copies, classes=self.storage_classes()
)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
block.set_state(_BufferBlock.COMMITTED, loc)
except Exception as e:
block.set_state(_BufferBlock.ERROR, e)
@@
-761,9
+760,10
@@
class _BlockManager(object):
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
self._delete_bufferblock(locator)
def _delete_bufferblock(self, locator):
- bb = self._bufferblocks[locator]
- bb.clear()
- del self._bufferblocks[locator]
+ if locator in self._bufferblocks:
+ bb = self._bufferblocks[locator]
+ bb.clear()
+ del self._bufferblocks[locator]
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
def get_block_contents(self, locator, num_retries, cache_only=False):
"""Fetch a block.
@@
-841,9
+841,6
@@
class _BlockManager(object):
if not self.prefetch_enabled:
return
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
with self.lock:
if locator in self._bufferblocks:
return
@@
-1099,7
+1096,7
@@
class ArvadosFile(object):
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE
* self.parent._my_block_manager().num_get_threads
, limit=32)
locs = set()
data = []
locs = set()
data = []