X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1811fb602be08a1f9ff9f71070861d8a2af60849..5109db362064053ed6711169d6c414b2cb4e22fb:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index ea9f5de899..6602ed1598 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -2,11 +2,12 @@ import functools import logging import os import re +import errno from collections import deque from stat import * -from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader +from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager from keep import * from .stream import StreamReader, normalize_stream, locator_block_size from .ranges import Range, LocatorAndRange @@ -640,12 +641,15 @@ class ResumableCollectionWriter(CollectionWriter): class Collection(CollectionBase): - def __init__(self, manifest_locator_or_text=None, api_client=None, - keep_client=None, num_retries=0): + def __init__(self, parent=None, manifest_locator_or_text=None, api_client=None, + keep_client=None, num_retries=0, block_manager=None): + self._parent = parent self._items = None self._api_client = api_client self._keep_client = keep_client + self._block_manager = block_manager + self.num_retries = num_retries self._manifest_locator = None self._manifest_text = None @@ -662,6 +666,29 @@ class Collection(CollectionBase): raise errors.ArgumentError( "Argument to CollectionReader must be a manifest or a collection UUID") + def _my_api(self): + if self._api_client is None: + if self._parent is not None: + return self._parent._my_api() + self._api_client = arvados.api('v1') + self._keep_client = None # Make a new one with the new api. + return self._api_client + + def _my_keep(self): + if self._keep_client is None: + if self._parent is not None: + return self._parent._my_keep() + self._keep_client = KeepClient(api_client=self._my_api(), + num_retries=self.num_retries) + return self._keep_client + + def _my_block_manager(self): + if self._block_manager is None: + if self._parent is not None: + return self._parent._my_block_manager() + self._block_manager = BlockManager(self._my_keep()) + return self._block_manager + def _populate_from_api_server(self): # As in KeepClient itself, we must wait until the last # possible moment to instantiate an API client, in order to @@ -671,10 +698,7 @@ class Collection(CollectionBase): # clause, just like any other Collection lookup # failure. Return an exception, or None if successful. try: - if self._api_client is None: - self._api_client = arvados.api('v1') - self._keep_client = None # Make a new one with the new api. - self._api_response = self._api_client.collections().get( + self._api_response = self._my_api().collections().get( uuid=self._manifest_locator).execute( num_retries=self.num_retries) self._manifest_text = self._api_response['manifest_text'] @@ -739,7 +763,9 @@ class Collection(CollectionBase): return self def __exit__(self, exc_type, exc_value, traceback): - self.save() + self.save(no_locator=True) + if self._block_manager is not None: + self._block_manager.stop_threads() @_populate_first def find(self, path, create=False): @@ -753,13 +779,13 @@ class Collection(CollectionBase): # item must be a file if item is None and create: # create new file - item = ArvadosFile(keep=self._keep_client) + item = ArvadosFile(self, keep=self._keep_client) self._items[p[0]] = item return item else: if item is None and create: # create new collection - item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries) + item = Collection(parent=self, num_retries=self.num_retries) self._items[p[0]] = item del p[0] return item.find("/".join(p), create=create) @@ -784,9 +810,9 @@ class Collection(CollectionBase): f = self.find(path, create=create) if f is None: - raise ArgumentError("File not found") + raise IOError((errno.ENOENT, "File not found")) if not isinstance(f, ArvadosFile): - raise ArgumentError("Path must refer to a file.") + raise IOError((errno.EISDIR, "Path must refer to a file.")) if mode[0] == "w": f.truncate(0) @@ -810,20 +836,19 @@ class Collection(CollectionBase): @_populate_first def __iter__(self): - for k in self._items.keys(): - yield k + self._items.iterkeys() + + @_populate_first + def iterkeys(self): + self._items.iterkeys() @_populate_first def __getitem__(self, k): - r = self.find(k) - if r: - return r - else: - raise KeyError(k) + return self._items[k] @_populate_first def __contains__(self, k): - return self.find(k) is not None + return k in self._items @_populate_first def __len__(self): @@ -831,6 +856,26 @@ class Collection(CollectionBase): @_populate_first def __delitem__(self, p): + del self._items[p] + + @_populate_first + def keys(self): + return self._items.keys() + + @_populate_first + def values(self): + return self._items.values() + + @_populate_first + def items(self): + return self._items.items() + + @_populate_first + def exists(self, path): + return self.find(path) != None + + @_populate_first + def remove(self, path): p = path.split("/") if p[0] == '.': del p[0] @@ -838,31 +883,55 @@ class Collection(CollectionBase): if len(p) > 0: item = self._items.get(p[0]) if item is None: - raise NotFoundError() + raise IOError((errno.ENOENT, "File not found")) if len(p) == 1: del self._items[p[0]] else: del p[0] - del item["/".join(p)] + item.remove("/".join(p)) else: - raise NotFoundError() + raise IOError((errno.ENOENT, "File not found")) @_populate_first - def keys(self): - return self._items.keys() + def manifest_text(self, strip=False, normalize=False): + if self.modified() or self._manifest_text is None or normalize: + return export_manifest(self, stream_name=".", portable_locators=strip) + else: + if strip: + return self.stripped_manifest() + else: + return self._manifest_text - @_populate_first - def values(self): - return self._items.values() + def portable_data_hash(self): + stripped = self.manifest_text(strip=True) + return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) @_populate_first - def items(self): - return self._items.items() + def save(self, no_locator=False): + if self.modified(): + self._my_block_manager().commit_all() + self._my_keep().put(self.manifest_text(strip=True)) + if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator): + self._api_response = self._my_api().collections().update( + uuid=self._manifest_locator, + body={'manifest_text': self.manifest_text(strip=False)} + ).execute( + num_retries=self.num_retries) + elif not no_locator: + raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.") + self.set_unmodified() @_populate_first - def save(self): - self._my_keep().put(self.portable_manifest_text()) - + def save_as(self, name, owner_uuid=None): + self._my_block_manager().commit_all() + self._my_keep().put(self.manifest_text(strip=True)) + body = {"manifest_text": self.manifest_text(strip=False), + "name": name} + if owner_uuid: + body["owner_uuid"] = owner_uuid + self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries) + self._manifest_locator = self._api_response["uuid"] + self.set_unmodified() def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None): @@ -932,12 +1001,13 @@ def export_manifest(item, stream_name=".", portable_locators=False): for s in v._segments: loc = s.locator if loc.startswith("bufferblock"): - loc = v._bufferblocks[loc].calculate_locator() + loc = v.parent._my_block_manager()._bufferblocks[loc].locator() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[k] = st - buf += ' '.join(normalize_stream(stream_name, stream)) - buf += "\n" + if stream: + buf += ' '.join(normalize_stream(stream_name, stream)) + buf += "\n" for k in [s for s in sorted_keys if isinstance(item[s], Collection)]: buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k)) elif isinstance(item, ArvadosFile):