<notextile>
<pre>
$ <code class="userinput">arv keep get --help</code>
-usage: arv-get [-h] [--retries RETRIES]
+usage: arv-get [-h] [--retries RETRIES] [--version]
[--progress | --no-progress | --batch-progress]
- [--hash HASH | --md5sum] [-n] [-r] [-f | --skip-existing]
+ [--hash HASH | --md5sum] [-n] [-r]
+ [-f | -v | --skip-existing | --strip-manifest] [--threads N]
locator [destination]
Copy data from Keep to a local file or pipe.
locator Collection locator, optionally with a file path or
prefix.
destination Local file or directory where the data is to be written.
- Default: /dev/stdout.
+ Default: stdout.
optional arguments:
-h, --help show this help message and exit
--retries RETRIES Maximum number of times to retry server requests that
- encounter temporary failures (e.g., server down). Default
- 3.
+ encounter temporary failures (e.g., server down).
+ Default 3.
+ --version Print version and exit.
--progress Display human-readable progress on stderr (bytes and, if
possible, percentage of total data size). This is the
default behavior when it is not expected to interfere
with the output: specifically, stderr is a tty _and_
- either stdout is not a tty, or output is being written to
- named files rather than stdout.
+ either stdout is not a tty, or output is being written
+ to named files rather than stdout.
--no-progress Do not display human-readable progress on stderr.
--batch-progress Display machine-readable progress on stderr (bytes and,
if known, total data size).
-f Overwrite existing files while writing. The default
behavior is to refuse to write *anything* if any of the
output files already exist. As a special case, -f is not
- needed to write to /dev/stdout.
- --skip-existing Skip files that already exist. The default behavior is to
- refuse to write *anything* if any files exist that would
- have to be overwritten. This option causes even devices,
- sockets, and fifos to be skipped.
+ needed to write to stdout.
+ -v Once for verbose mode, twice for debug mode.
+ --skip-existing Skip files that already exist. The default behavior is
+ to refuse to write *anything* if any files exist that
+ would have to be overwritten. This option causes even
+ devices, sockets, and fifos to be skipped.
+ --strip-manifest When getting a collection manifest, strip its access
+ tokens before writing it.
+ --threads N Set the number of download threads to be used. Take into
+ account that using lots of threads will increase the RAM
+ requirements. Default is to use 4 threads. On high
+ latency installations, using a greater number will
+ improve overall throughput.
</pre>
</notextile>
DEFAULT_PUT_THREADS = 2
DEFAULT_GET_THREADS = 2
- def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None):
+ def __init__(self, keep, copies=None, put_threads=None, num_retries=None, storage_classes_func=None, get_threads=None):
"""keep: KeepClient object to use"""
self._keep = keep
self._bufferblocks = collections.OrderedDict()
self.lock = threading.Lock()
self.prefetch_enabled = True
self.num_put_threads = put_threads or _BlockManager.DEFAULT_PUT_THREADS
- self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+ self.num_get_threads = get_threads or _BlockManager.DEFAULT_GET_THREADS
self.copies = copies
self.storage_classes = storage_classes_func or (lambda: [])
self._pending_write_size = 0
b = self._prefetch_queue.get()
if b is None:
return
- self._keep.get(b)
+ self._keep.get(b, prefetch=True)
except Exception:
_logger.exception("Exception doing block prefetch")
if not self.prefetch_enabled:
return
- if self._keep.get_from_cache(locator) is not None:
- return
-
with self.lock:
if locator in self._bufferblocks:
return
if size == 0 or offset >= self.size():
return b''
readsegs = locators_and_ranges(self._segments, offset, size)
- prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
+ prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE * self.parent._my_block_manager().num_get_threads, limit=32)
locs = set()
data = []
block_manager=None,
replication_desired=None,
storage_classes_desired=None,
- put_threads=None):
+ put_threads=None,
+ get_threads=None):
"""Collection constructor.
:manifest_locator_or_text:
self.replication_desired = replication_desired
self._storage_classes_desired = storage_classes_desired
self.put_threads = put_threads
+ self.get_threads = get_threads
if apiconfig:
self._config = apiconfig
copies = (self.replication_desired or
self._my_api()._rootDesc.get('defaultCollectionReplication',
2))
- self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+ self._block_manager = _BlockManager(self._my_keep(),
+ copies=copies,
+ put_threads=self.put_threads,
+ num_retries=self.num_retries,
+ storage_classes_func=self.storage_classes_desired,
+ get_threads=self.get_threads,)
return self._block_manager
def _remember_api_response(self, response):
it.
""")
+parser.add_argument('--threads', type=int, metavar='N', default=4,
+ help="""
+Set the number of download threads to be used. Take into account that
+using lots of threads will increase the RAM requirements. Default is
+to use 4 threads.
+On high latency installations, using a greater number will improve
+overall throughput.
+""")
+
def parse_arguments(arguments, stdout, stderr):
args = parser.parse_args(arguments)
try:
reader = arvados.CollectionReader(
- col_loc, api_client=api_client, num_retries=args.retries)
+ col_loc, api_client=api_client, num_retries=args.retries,
+ keep_client=arvados.keep.KeepClient(block_cache=arvados.keep.KeepBlockCache((args.threads+1)*64 * 1024 * 1024)),
+ get_threads=args.threads)
except Exception as error:
logger.error("failed to read collection: {}".format(error))
return 1
else:
return None
- def get_from_cache(self, loc):
+ def get_from_cache(self, loc_s):
"""Fetch a block only if is in the cache, otherwise return None."""
- slot = self.block_cache.get(loc)
+ locator = KeepLocator(loc_s)
+ slot = self.block_cache.get(locator.md5sum)
if slot is not None and slot.ready.is_set():
return slot.get()
else:
def get(self, loc_s, **kwargs):
return self._get_or_head(loc_s, method="GET", **kwargs)
- def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None):
+ def _get_or_head(self, loc_s, method="GET", num_retries=None, request_id=None, headers=None, prefetch=False):
"""Get data from Keep.
This method fetches one or more blocks of data from Keep. It
if method == "GET":
slot, first = self.block_cache.reserve_cache(locator.md5sum)
if not first:
+ if prefetch:
+ # this is request for a prefetch, if it is
+ # already in flight, return immediately.
+ # clear 'slot' to prevent finally block from
+ # calling slot.set()
+ slot = None
+ return None
self.hits_counter.add(1)
blob = slot.get()
if blob is None:
return True
if os.path.exists(os.path.join(self.local_store, locator.md5sum)):
return True
-
- def is_cached(self, locator):
- return self.block_cache.reserve_cache(expect_hash)
def __init__(self, blocks):
self.blocks = blocks
self.requests = []
- def get(self, locator, num_retries=0):
+ def get(self, locator, num_retries=0, prefetch=False):
self.requests.append(locator)
return self.blocks.get(locator)
def get_from_cache(self, locator):
def __init__(self, blocks, nocache):
self.blocks = blocks
self.nocache = nocache
+ self.num_get_threads = 1
def block_prefetch(self, loc):
pass
def __init__(self, content, num_retries=0):
self.content = content
- def get(self, locator, num_retries=0):
+ def get(self, locator, num_retries=0, prefetch=False):
return self.content[locator]
def test_stream_reader(self):
self.collection.update()
new_collection_record = self.collection.api_response()
else:
+ # If there's too many prefetch threads and you
+ # max out the CPU, delivering data to the FUSE
+ # layer actually ends up being slower.
+ # Experimentally, capping 7 threads seems to
+ # be a sweet spot.
+ get_threads = min(max((self.api.keep.block_cache.cache_max // (64 * 1024 * 1024)) - 1, 1), 7)
# Create a new collection object
if uuid_pattern.match(self.collection_locator):
coll_reader = arvados.collection.Collection(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
else:
coll_reader = arvados.collection.CollectionReader(
self.collection_locator, self.api, self.api.keep,
- num_retries=self.num_retries)
+ num_retries=self.num_retries,
+ get_threads=get_threads)
new_collection_record = coll_reader.api_response() or {}
# If the Collection only exists in Keep, there will be no API
# response. Fill in the fields we need.
class MagicDirApiError(FuseMagicTest):
def setUp(self):
api = mock.MagicMock()
+ api.keep.block_cache = mock.MagicMock(cache_max=1)
super(MagicDirApiError, self).setUp(api=api)
api.collections().get().execute.side_effect = iter([
Exception('API fail'),