X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/02e9754a68a5816458d517b8f5012530cf17ebba..89796f01a6ea3cb553a61be6ce92883a1decf003:/sdk/python/arvados/collection.py?ds=sidebyside diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 85c7ad7f4f..ea18123f65 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -2,6 +2,7 @@ import functools import logging import os import re +import errno from collections import deque from stat import * @@ -640,20 +641,20 @@ class ResumableCollectionWriter(CollectionWriter): class Collection(CollectionBase): - def __init__(self, manifest_locator_or_text=None, api_client=None, + def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None, keep_client=None, num_retries=0, block_manager=None): + self.parent = parent self._items = None self._api_client = api_client self._keep_client = keep_client + self._block_manager = block_manager + self.num_retries = num_retries self._manifest_locator = None self._manifest_text = None self._api_response = None - if block_manager is None: - self.block_manager = BlockManager(keep_client) - if manifest_locator_or_text: if re.match(util.keep_locator_pattern, manifest_locator_or_text): self._manifest_locator = manifest_locator_or_text @@ -665,6 +666,29 @@ class Collection(CollectionBase): raise errors.ArgumentError( "Argument to CollectionReader must be a manifest or a collection UUID") + def _my_api(self): + if self._api_client is None: + if self.parent is not None: + return self.parent._my_api() + self._api_client = arvados.api('v1') + self._keep_client = None # Make a new one with the new api. + return self._api_client + + def _my_keep(self): + if self._keep_client is None: + if self.parent is not None: + return self.parent._my_keep() + self._keep_client = KeepClient(api_client=self._my_api(), + num_retries=self.num_retries) + return self._keep_client + + def _my_block_manager(self): + if self._block_manager is None: + if self.parent is not None: + return self.parent._my_block_manager() + self._block_manager = BlockManager(self._my_keep()) + return self._block_manager + def _populate_from_api_server(self): # As in KeepClient itself, we must wait until the last # possible moment to instantiate an API client, in order to @@ -674,10 +698,7 @@ class Collection(CollectionBase): # clause, just like any other Collection lookup # failure. Return an exception, or None if successful. try: - if self._api_client is None: - self._api_client = arvados.api('v1') - self._keep_client = None # Make a new one with the new api. - self._api_response = self._api_client.collections().get( + self._api_response = self._my_api().collections().get( uuid=self._manifest_locator).execute( num_retries=self.num_retries) self._manifest_text = self._api_response['manifest_text'] @@ -742,10 +763,12 @@ class Collection(CollectionBase): return self def __exit__(self, exc_type, exc_value, traceback): - self.save() + self.save(no_locator=True) + if self._block_manager is not None: + self._block_manager.stop_threads() @_populate_first - def find(self, path, create=False): + def find(self, path, create=False, create_collection=False): p = path.split("/") if p[0] == '.': del p[0] @@ -756,13 +779,16 @@ class Collection(CollectionBase): # item must be a file if item is None and create: # create new file - item = ArvadosFile(self.block_manager, keep=self._keep_client) + if create_collection: + item = Collection(parent=self, num_retries=self.num_retries) + else: + item = ArvadosFile(self) self._items[p[0]] = item return item else: if item is None and create: # create new collection - item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager) + item = Collection(parent=self, num_retries=self.num_retries) self._items[p[0]] = item del p[0] return item.find("/".join(p), create=create) @@ -787,9 +813,9 @@ class Collection(CollectionBase): f = self.find(path, create=create) if f is None: - raise ArgumentError("File not found") + raise IOError((errno.ENOENT, "File not found")) if not isinstance(f, ArvadosFile): - raise ArgumentError("Path must refer to a file.") + raise IOError((errno.EISDIR, "Path must refer to a file.")) if mode[0] == "w": f.truncate(0) @@ -813,23 +839,19 @@ class Collection(CollectionBase): @_populate_first def __iter__(self): - self._items.iterkeys() + return self._items.iterkeys() @_populate_first def iterkeys(self): - self._items.iterkeys() + return self._items.iterkeys() @_populate_first def __getitem__(self, k): - r = self.find(k) - if r: - return r - else: - raise KeyError(k) + return self._items[k] @_populate_first def __contains__(self, k): - return self.find(k) is not None + return k in self._items @_populate_first def __len__(self): @@ -837,21 +859,7 @@ class Collection(CollectionBase): @_populate_first def __delitem__(self, p): - p = path.split("/") - if p[0] == '.': - del p[0] - - if len(p) > 0: - item = self._items.get(p[0]) - if item is None: - raise NotFoundError() - if len(p) == 1: - del self._items[p[0]] - else: - del p[0] - del item["/".join(p)] - else: - raise NotFoundError() + del self._items[p] @_populate_first def keys(self): @@ -865,6 +873,28 @@ class Collection(CollectionBase): def items(self): return self._items.items() + @_populate_first + def exists(self, path): + return self.find(path) != None + + @_populate_first + def remove(self, path): + p = path.split("/") + if p[0] == '.': + del p[0] + + if len(p) > 0: + item = self._items.get(p[0]) + if item is None: + raise IOError((errno.ENOENT, "File not found")) + if len(p) == 1: + del self._items[p[0]] + else: + del p[0] + item.remove("/".join(p)) + else: + raise IOError((errno.ENOENT, "File not found")) + @_populate_first def manifest_text(self, strip=False, normalize=False): if self.modified() or self._manifest_text is None or normalize: @@ -880,34 +910,51 @@ class Collection(CollectionBase): return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) @_populate_first - def commit_bufferblocks(self): - pass - - @_populate_first - def save(self): + def save(self, no_locator=False): if self.modified(): + self._my_block_manager().commit_all() self._my_keep().put(self.manifest_text(strip=True)) - if re.match(util.collection_uuid_pattern, self._manifest_locator): - self._api_response = self._api_client.collections().update( + if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator): + self._api_response = self._my_api().collections().update( uuid=self._manifest_locator, body={'manifest_text': self.manifest_text(strip=False)} ).execute( num_retries=self.num_retries) - else: + elif not no_locator: raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.") self.set_unmodified() @_populate_first - def save_as(self, name, owner_uuid=None): + def save_as(self, name, owner_uuid=None, ensure_unique_name=False): + self._my_block_manager().commit_all() self._my_keep().put(self.manifest_text(strip=True)) body = {"manifest_text": self.manifest_text(strip=False), "name": name} if owner_uuid: body["owner_uuid"] = owner_uuid - self._api_response = self._api_client.collections().create(body=body).execute(num_retries=self.num_retries) + self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries) self._manifest_locator = self._api_response["uuid"] self.set_unmodified() + @_populate_first + def rename(self, old, new): + old_path, old_fn = os.path.split(old) + old_col = self.find(path) + if old_col is None: + raise IOError((errno.ENOENT, "File not found")) + if not isinstance(old_p, Collection): + raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory")) + if old_fn in old_col: + new_path, new_fn = os.path.split(new) + new_col = self.find(new_path, create=True, create_collection=True) + if not isinstance(new_col, Collection): + raise IOError((errno.ENOTDIR, "Destination is a file, not a directory")) + ent = old_col[old_fn] + del old_col[old_fn] + ent.parent = new_col + new_col[new_fn] = ent + else: + raise IOError((errno.ENOENT, "File not found")) def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None): if into_collection is not None: @@ -973,23 +1020,28 @@ def export_manifest(item, stream_name=".", portable_locators=False): for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]: v = item[k] st = [] - for s in v._segments: + for s in v.segments: loc = s.locator if loc.startswith("bufferblock"): - loc = v.bbm._bufferblocks[loc].locator() + loc = v.parent._my_block_manager()._bufferblocks[loc].locator() + if portable_locators: + loc = KeepLocator(loc).stripped() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[k] = st - buf += ' '.join(normalize_stream(stream_name, stream)) - buf += "\n" + if stream: + buf += ' '.join(normalize_stream(stream_name, stream)) + buf += "\n" for k in [s for s in sorted_keys if isinstance(item[s], Collection)]: - buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k)) + buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators) elif isinstance(item, ArvadosFile): st = [] - for s in item._segments: + for s in item.segments: loc = s.locator if loc.startswith("bufferblock"): loc = item._bufferblocks[loc].calculate_locator() + if portable_locators: + loc = KeepLocator(loc).stripped() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[stream_name] = st