X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/1811fb602be08a1f9ff9f71070861d8a2af60849..a7b16675337995c65b909a7519178efed98a3089:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index ea9f5de899..ddf2eae674 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -2,17 +2,21 @@ import functools import logging import os import re +import errno +import time from collections import deque from stat import * -from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader +from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock from keep import * from .stream import StreamReader, normalize_stream, locator_block_size from .ranges import Range, LocatorAndRange +from .safeapi import SafeApi import config import errors import util +import events _logger = logging.getLogger('arvados.collection') @@ -638,18 +642,367 @@ class ResumableCollectionWriter(CollectionWriter): "resumable writer can't accept unsourced data") return super(ResumableCollectionWriter, self).write(data) +ADD = "add" +DEL = "del" -class Collection(CollectionBase): - def __init__(self, manifest_locator_or_text=None, api_client=None, - keep_client=None, num_retries=0): +class SynchronizedCollectionBase(CollectionBase): + def __init__(self, parent=None): + self.parent = parent + self._items = {} + + def _my_api(self): + raise NotImplementedError() + + def _my_keep(self): + raise NotImplementedError() + + def _my_block_manager(self): + raise NotImplementedError() + + def _root_lock(self): + raise NotImplementedError() + + def _populate(self): + raise NotImplementedError() + + def sync_mode(self): + raise NotImplementedError() + + def notify(self, collection, event, name, item): + raise NotImplementedError() + + @_synchronized + def find(self, path, create=False, create_collection=False): + """Recursively search the specified file path. May return either a Collection + or ArvadosFile. + + :create: + If true, create path components (i.e. Collections) that are + missing. If "create" is False, return None if a path component is + not found. + + :create_collection: + If the path is not found, "create" is True, and + "create_collection" is False, then create and return a new + ArvadosFile for the last path component. If "create_collection" is + True, then create and return a new Collection for the last path + component. + + """ + if create and self.sync_mode() == SYNC_READONLY: + raise IOError((errno.EROFS, "Collection is read only")) + + p = path.split("/") + if p[0] == '.': + del p[0] + + if p and p[0]: + item = self._items.get(p[0]) + if len(p) == 1: + # item must be a file + if item is None and create: + # create new file + if create_collection: + item = Subcollection(self) + else: + item = ArvadosFile(self) + self._items[p[0]] = item + self.notify(self, ADD, p[0], item) + return item + else: + if item is None and create: + # create new collection + item = Subcollection(self) + self._items[p[0]] = item + self.notify(self, ADD, p[0], item) + del p[0] + return item.find("/".join(p), create=create) + else: + return self + + def open(self, path, mode): + """Open a file-like object for access. + + :path: + path to a file in the collection + :mode: + one of "r", "r+", "w", "w+", "a", "a+" + :"r": + opens for reading + :"r+": + opens for reading and writing. Reads/writes share a file pointer. + :"w", "w+": + truncates to 0 and opens for reading and writing. Reads/writes share a file pointer. + :"a", "a+": + opens for reading and writing. All writes are appended to + the end of the file. Writing does not affect the file pointer for + reading. + """ + mode = mode.replace("b", "") + if len(mode) == 0 or mode[0] not in ("r", "w", "a"): + raise ArgumentError("Bad mode '%s'" % mode) + create = (mode != "r") + + if create and self.sync_mode() == SYNC_READONLY: + raise IOError((errno.EROFS, "Collection is read only")) + + f = self.find(path, create=create) + + if f is None: + raise IOError((errno.ENOENT, "File not found")) + if not isinstance(f, ArvadosFile): + raise IOError((errno.EISDIR, "Path must refer to a file.")) + + if mode[0] == "w": + f.truncate(0) + + if mode == "r": + return ArvadosFileReader(f, path, mode, num_retries=self.num_retries) + else: + return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries) + + @_synchronized + def modified(self): + """Test if the collection (or any subcollection or file) has been modified + since it was created.""" + for k,v in self._items.items(): + if v.modified(): + return True + return False + + @_synchronized + def set_unmodified(self): + """Recursively clear modified flag""" + for k,v in self._items.items(): + v.set_unmodified() + + @_synchronized + def __iter__(self): + """Iterate over names of files and collections contained in this collection.""" + return self._items.keys() + + @_synchronized + def iterkeys(self): + """Iterate over names of files and collections directly contained in this collection.""" + return self._items.keys() + + @_synchronized + def __getitem__(self, k): + """Get a file or collection that is directly contained by this collection. If + you want to search a path, use `find()` instead. + """ + return self._items[k] + + @_synchronized + def __contains__(self, k): + """If there is a file or collection a directly contained by this collection + with name "k".""" + return k in self._items + + @_synchronized + def __len__(self): + """Get the number of items directly contained in this collection""" + return len(self._items) + + @_must_be_writable + @_synchronized + def __delitem__(self, p): + """Delete an item by name which is directly contained by this collection.""" + del self._items[p] + self.notify(self, DEL, p, None) + + @_synchronized + def keys(self): + """Get a list of names of files and collections directly contained in this collection.""" + return self._items.keys() + + @_synchronized + def values(self): + """Get a list of files and collection objects directly contained in this collection.""" + return self._items.values() + + @_synchronized + def items(self): + """Get a list of (name, object) tuples directly contained in this collection.""" + return self._items.items() + + def exists(self, path): + """Test if there is a file or collection at "path" """ + return self.find(path) != None + + @_must_be_writable + @_synchronized + def remove(self, path, rm_r=False): + """Remove the file or subcollection (directory) at `path`. + :rm_r: + Specify whether to remove non-empty subcollections (True), or raise an error (False). + """ + p = path.split("/") + if p[0] == '.': + # Remove '.' from the front of the path + del p[0] + + if len(p) > 0: + item = self._items.get(p[0]) + if item is None: + raise IOError((errno.ENOENT, "File not found")) + if len(p) == 1: + if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r: + raise IOError((errno.ENOTEMPTY, "Subcollection not empty")) + del self._items[p[0]] + self.notify(self, DEL, p[0], None) + else: + del p[0] + item.remove("/".join(p)) + else: + raise IOError((errno.ENOENT, "File not found")) + + def _cloneinto(self, target): + for k,v in self._items: + target._items[k] = v.clone(new_parent=target) + + def clone(self): + raise NotImplementedError() + + @_must_be_writable + @_synchronized + def copy(self, source_path, target_path, source_collection=None, overwrite=False): + """ + copyto('/foo', '/bar') will overwrite 'foo' if it exists. + copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo' + """ + if source_collection is None: + source_collection = self + + # Find the object to copy + sp = source_path.split("/") + source_obj = source_collection.find(source_path) + if source_obj is None: + raise IOError((errno.ENOENT, "File not found")) + + # Find parent collection the target path + tp = target_path.split("/") + target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True) + + # Determine the name to use. + target_name = tp[-1] if tp[-1] else sp[-1] + + if target_name in target_dir: + if isinstance(target_dir[target_name], SynchronizedCollectionBase): + target_dir = target_dir[target_name] + target_name = sp[-1] + elif not overwrite: + raise IOError((errno.EEXIST, "File already exists")) + + # Actually make the copy. + dup = source_obj.clone(target_dir) + with target_dir.lock: + target_dir._items[target_name] = dup + + self.notify(target_dir, ADD, target_name, dup) + + + @_synchronized + def manifest_text(self, strip=False, normalize=False): + """Get the manifest text for this collection, sub collections and files. + + :strip: + If True, remove signing tokens from block locators if present. + If False, block locators are left unchanged. + + :normalize: + If True, always export the manifest text in normalized form + even if the Collection is not modified. If False and the collection + is not modified, return the original manifest text even if it is not + in normalized form. + + """ + if self.modified() or self._manifest_text is None or normalize: + return export_manifest(self, stream_name=".", portable_locators=strip) + else: + if strip: + return self.stripped_manifest() + else: + return self._manifest_text + + @_must_be_writable + @_synchronized + def merge(self, other): + for k in other.keys(): + if k in self: + if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection): + self[k].merge(other[k]) + else: + if self[k] != other[k]: + name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d_%H:%M%:%S", + time.gmtime())) + self[name] = other[k].clone(self) + self.notify(self, name, ADD, self[name]) + else: + self[k] = other[k].clone(self) + self.notify(self, k, ADD, self[k]) + + def portable_data_hash(self): + """Get the portable data hash for this collection's manifest.""" + stripped = self.manifest_text(strip=True) + return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped)) - self._items = None + +class Collection(SynchronizedCollectionBase): + """Store an Arvados collection, consisting of a set of files and + sub-collections. + """ + + def __init__(self, manifest_locator_or_text=None, + parent=None, + config=None, + api_client=None, + keep_client=None, + num_retries=None, + block_manager=None, + sync=SYNC_READONLY): + """:manifest_locator_or_text: + One of Arvados collection UUID, block locator of + a manifest, raw manifest text, or None (to create an empty collection). + :parent: + the parent Collection, may be None. + :config: + the arvados configuration to get the hostname and api token. + Prefer this over supplying your own api_client and keep_client (except in testing). + Will use default config settings if not specified. + :api_client: + The API client object to use for requests. If not specified, create one using `config`. + :keep_client: + the Keep client to use for requests. If not specified, create one using `config`. + :num_retries: + the number of retries for API and Keep requests. + :block_manager: + the block manager to use. If not specified, create one. + :sync: + Set synchronization policy with API server collection record. + :SYNC_READONLY: + Collection is read only. No synchronization. This mode will + also forego locking, which gives better performance. + :SYNC_EXPLICIT: + Synchronize on explicit request via `update()` or `save()` + :SYNC_LIVE: + Synchronize with server in response to background websocket events, + on block write, or on file close. + + """ + super(Collection, self).__init__(parent) self._api_client = api_client self._keep_client = keep_client + self._block_manager = block_manager + self._config = config self.num_retries = num_retries self._manifest_locator = None self._manifest_text = None self._api_response = None + self._sync = sync + self.lock = threading.RLock() + self.callbacks = [] + self.events = None if manifest_locator_or_text: if re.match(util.keep_locator_pattern, manifest_locator_or_text): @@ -662,6 +1015,56 @@ class Collection(CollectionBase): raise errors.ArgumentError( "Argument to CollectionReader must be a manifest or a collection UUID") + self._populate() + + if self._sync == SYNC_LIVE: + if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator): + raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified") + self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message) + + @staticmethod + def create(name, owner_uuid=None, sync=SYNC_EXPLICIT): + c = Collection(sync=sync) + c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True) + return c + + def _root_lock(self): + return self.lock + + def sync_mode(self): + return self._sync + + def on_message(self): + self.update() + + @_synchronized + def update(self): + n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute() + other = import_collection(n["manifest_text"]) + self.merge(other) + + @_synchronized + def _my_api(self): + if self._api_client is None: + self._api_client = arvados.SafeApi(self._config) + self._keep_client = self._api_client.keep + return self._api_client + + @_synchronized + def _my_keep(self): + if self._keep_client is None: + if self._api_client is None: + self._my_api() + else: + self._keep_client = KeepClient(api=self._api_client) + return self._keep_client + + @_synchronized + def _my_block_manager(self): + if self._block_manager is None: + self._block_manager = BlockManager(self._my_keep()) + return self._block_manager + def _populate_from_api_server(self): # As in KeepClient itself, we must wait until the last # possible moment to instantiate an API client, in order to @@ -671,10 +1074,7 @@ class Collection(CollectionBase): # clause, just like any other Collection lookup # failure. Return an exception, or None if successful. try: - if self._api_client is None: - self._api_client = arvados.api('v1') - self._keep_client = None # Make a new one with the new api. - self._api_response = self._api_client.collections().get( + self._api_response = self._my_api().collections().get( uuid=self._manifest_locator).execute( num_retries=self.num_retries) self._manifest_text = self._api_response['manifest_text'] @@ -694,14 +1094,13 @@ class Collection(CollectionBase): return e def _populate(self): - self._items = {} if self._manifest_locator is None and self._manifest_text is None: return error_via_api = None error_via_keep = None should_try_keep = ((self._manifest_text is None) and util.keep_locator_pattern.match( - self._manifest_locator)) + self._manifest_locator)) if ((self._manifest_text is None) and util.signed_locator_pattern.match(self._manifest_locator)): error_via_keep = self._populate_from_keep() @@ -726,49 +1125,36 @@ class Collection(CollectionBase): # populate import_manifest(self._manifest_text, self) - def _populate_first(orig_func): - # Decorator for methods that read actual Collection data. - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - if self._items is None: - self._populate() - return orig_func(self, *args, **kwargs) - return wrapper + if self._sync == SYNC_READONLY: + # Now that we're populated, knowing that this will be readonly, + # forego any further locking. + self.lock = NoopLock() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): - self.save() - - @_populate_first - def find(self, path, create=False): - p = path.split("/") - if p[0] == '.': - del p[0] - - if len(p) > 0: - item = self._items.get(p[0]) - if len(p) == 1: - # item must be a file - if item is None and create: - # create new file - item = ArvadosFile(keep=self._keep_client) - self._items[p[0]] = item - return item - else: - if item is None and create: - # create new collection - item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries) - self._items[p[0]] = item - del p[0] - return item.find("/".join(p), create=create) - else: - return self - - @_populate_first + """Support scoped auto-commit in a with: block""" + if self._sync != SYNC_READONLY: + self.save(allow_no_locator=True) + if self._block_manager is not None: + self._block_manager.stop_threads() + + @_synchronized + def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None): + if new_config is None: + new_config = self.config + c = Collection(parent=new_parent, config=new_config, sync=new_sync) + if new_sync == SYNC_READONLY: + c.lock = NoopLock() + c._items = {} + self._cloneinto(c) + return c + + @_synchronized def api_response(self): - """api_response() -> dict or None + """ + api_response() -> dict or None Returns information about this Collection fetched from the API server. If the Collection exists in Keep but not the API server, currently @@ -776,102 +1162,151 @@ class Collection(CollectionBase): """ return self._api_response - def open(self, path, mode): - mode = mode.replace("b", "") - if len(mode) == 0 or mode[0] not in ("r", "w", "a"): - raise ArgumentError("Bad mode '%s'" % mode) - create = (mode != "r") + @_must_be_writable + @_synchronized + def save(self, allow_no_locator=False): + """Commit pending buffer blocks to Keep, write the manifest to Keep, and + update the collection record to Keep. - f = self.find(path, create=create) - if f is None: - raise ArgumentError("File not found") - if not isinstance(f, ArvadosFile): - raise ArgumentError("Path must refer to a file.") + :allow_no_locator: + If there is no collection uuid associated with this + Collection and `allow_no_locator` is False, raise an error. If True, + do not raise an error. + """ + if self.modified(): + self._my_block_manager().commit_all() + self._my_keep().put(self.manifest_text(strip=True)) + if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator): + self._api_response = self._my_api().collections().update( + uuid=self._manifest_locator, + body={'manifest_text': self.manifest_text(strip=False)} + ).execute( + num_retries=self.num_retries) + elif not allow_no_locator: + raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.") + self.set_unmodified() + + @_must_be_writable + @_synchronized + def save_as(self, name, owner_uuid=None, ensure_unique_name=False): + """Save a new collection record. + + :name: + The collection name. + + :owner_uuid: + the user, or project uuid that will own this collection. + If None, defaults to the current user. + + :ensure_unique_name: + If True, ask the API server to rename the collection + if it conflicts with a collection with the same name and owner. If + False, a name conflict will result in an error. - if mode[0] == "w": - f.truncate(0) + """ + self._my_block_manager().commit_all() + self._my_keep().put(self.manifest_text(strip=True)) + body = {"manifest_text": self.manifest_text(strip=False), + "name": name} + if owner_uuid: + body["owner_uuid"] = owner_uuid + self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries) - if mode == "r": - return ArvadosFileReader(f, path, mode) - else: - return ArvadosFileWriter(f, path, mode) + if self.events: + self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]]) - @_populate_first - def modified(self): - for k,v in self._items.items(): - if v.modified(): - return True - return False + self._manifest_locator = self._api_response["uuid"] - @_populate_first - def set_unmodified(self): - for k,v in self._items.items(): - v.set_unmodified() + if self.events: + self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]]) - @_populate_first - def __iter__(self): - for k in self._items.keys(): - yield k + self.set_unmodified() - @_populate_first - def __getitem__(self, k): - r = self.find(k) - if r: - return r - else: - raise KeyError(k) + @_synchronized + def subscribe(self, callback): + self.callbacks.append(callback) - @_populate_first - def __contains__(self, k): - return self.find(k) is not None + @_synchronized + def unsubscribe(self, callback): + self.callbacks.remove(callback) - @_populate_first - def __len__(self): - return len(self._items) + @_synchronized + def notify(self, collection, event, name, item): + for c in self.callbacks: + c(collection, event, name, item) - @_populate_first - def __delitem__(self, p): - p = path.split("/") - if p[0] == '.': - del p[0] +class Subcollection(SynchronizedCollectionBase): + """This is a subdirectory within a collection that doesn't have its own API + server record. It falls under the umbrella of the root collection.""" - if len(p) > 0: - item = self._items.get(p[0]) - if item is None: - raise NotFoundError() - if len(p) == 1: - del self._items[p[0]] - else: - del p[0] - del item["/".join(p)] - else: - raise NotFoundError() + def __init__(self, parent): + super(Subcollection, self).__init__(parent) + self.lock = parent._root_lock() - @_populate_first - def keys(self): - return self._items.keys() + def _root_lock(self): + return self.parent._root_lock() - @_populate_first - def values(self): - return self._items.values() + def sync_mode(self): + return self.parent.sync_mode() - @_populate_first - def items(self): - return self._items.items() + def _my_api(self): + return self.parent._my_api() - @_populate_first - def save(self): - self._my_keep().put(self.portable_manifest_text()) + def _my_keep(self): + return self.parent._my_keep() + + def _my_block_manager(self): + return self.parent._my_block_manager() + + def _populate(self): + self.parent._populate() + + def notify(self, collection, event, name, item): + self.parent.notify(collection, event, name, item) + + @_synchronized + def clone(self, new_parent): + c = Subcollection(new_parent) + c._items = {} + self._cloneinto(c) + return c +def import_manifest(manifest_text, + into_collection=None, + api_client=None, + keep=None, + num_retries=None, + sync=SYNC_READONLY): + """Import a manifest into a `Collection`. + :manifest_text: + The manifest text to import from. -def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None): + :into_collection: + The `Collection` that will be initialized (must be empty). + If None, create a new `Collection` object. + + :api_client: + The API client object that will be used when creating a new `Collection` object. + + :keep: + The keep client object that will be used when creating a new `Collection` object. + + :num_retries: + the default number of api client and keep retries on error. + + :sync: + Collection sync mode (only if into_collection is None) + """ if into_collection is not None: if len(into_collection) > 0: raise ArgumentError("Can only import manifest into an empty collection") c = into_collection else: - c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries) + c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync) + + save_sync = c.sync_mode() + c._sync = None STREAM_NAME = 0 BLOCKS = 1 @@ -919,33 +1354,51 @@ def import_manifest(manifest_text, into_collection=None, api_client=None, keep=N state = STREAM_NAME c.set_unmodified() + c._sync = save_sync return c def export_manifest(item, stream_name=".", portable_locators=False): + """ + :item: + Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If + `item` is a is a `Collection`, this will also export subcollections. + + :stream_name: + the name of the stream when exporting `item`. + + :portable_locators: + If True, strip any permission hints on block locators. + If False, use block locators as-is. + """ buf = "" - if isinstance(item, Collection): + if isinstance(item, SynchronizedCollectionBase): stream = {} sorted_keys = sorted(item.keys()) for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]: v = item[k] st = [] - for s in v._segments: + for s in v.segments(): loc = s.locator if loc.startswith("bufferblock"): - loc = v._bufferblocks[loc].calculate_locator() + loc = v.parent._my_block_manager()._bufferblocks[loc].locator() + if portable_locators: + loc = KeepLocator(loc).stripped() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[k] = st - buf += ' '.join(normalize_stream(stream_name, stream)) - buf += "\n" - for k in [s for s in sorted_keys if isinstance(item[s], Collection)]: - buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k)) + if stream: + buf += ' '.join(normalize_stream(stream_name, stream)) + buf += "\n" + for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]: + buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators) elif isinstance(item, ArvadosFile): st = [] - for s in item._segments: + for s in item.segments: loc = s.locator if loc.startswith("bufferblock"): loc = item._bufferblocks[loc].calculate_locator() + if portable_locators: + loc = KeepLocator(loc).stripped() st.append(LocatorAndRange(loc, locator_block_size(loc), s.segment_offset, s.range_size)) stream[stream_name] = st