X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/7dcac8234708b244251f957a4fc2ceb68ec06881..f56d3a6876f246f78d5bc231a0ac5b6e4c6bdb9c:/sdk/python/arvados/collection.py diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 2f26c005f1..22c4d66acb 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -3,17 +3,19 @@ import logging import os import re import errno +import time from collections import deque from stat import * -from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager +from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable from keep import * from .stream import StreamReader, normalize_stream, locator_block_size from .ranges import Range, LocatorAndRange import config import errors import util +import events _logger = logging.getLogger('arvados.collection') @@ -646,6 +648,9 @@ class SynchronizedCollectionBase(CollectionBase): SYNC_EXPLICIT = 2 SYNC_LIVE = 3 + ADD = "add" + DEL = "del" + def __init__(self, parent=None): self.parent = parent self._items = None @@ -665,21 +670,13 @@ class SynchronizedCollectionBase(CollectionBase): def _populate(self): raise NotImplementedError() - def _sync_mode(self): + def sync_mode(self): raise NotImplementedError() - @staticmethod - def _populate_first(orig_func): - # Decorator for methods that read actual Collection data. - @functools.wraps(orig_func) - def wrapper(self, *args, **kwargs): - if self._items is None: - self._populate() - return orig_func(self, *args, **kwargs) - return wrapper + def notify(self, collection, event, name, item): + raise NotImplementedError() - @arvfile._synchronized - @_populate_first + @_synchronized def find(self, path, create=False, create_collection=False): """Recursively search the specified file path. May return either a Collection or ArvadosFile. @@ -715,12 +712,14 @@ class SynchronizedCollectionBase(CollectionBase): else: item = ArvadosFile(self) self._items[p[0]] = item + self.notify(self, ADD, p[0], item) return item else: if item is None and create: # create new collection item = Subcollection(self) self._items[p[0]] = item + self.notify(self, ADD, p[0], item) del p[0] return item.find("/".join(p), create=create) else: @@ -766,8 +765,7 @@ class SynchronizedCollectionBase(CollectionBase): else: return ArvadosFileWriter(f, path, mode) - @arvfile._synchronized - @_populate_first + @_synchronized def modified(self): """Test if the collection (or any subcollection or file) has been modified since it was created.""" @@ -776,67 +774,58 @@ class SynchronizedCollectionBase(CollectionBase): return True return False - @arvfile._synchronized - @_populate_first + @_synchronized def set_unmodified(self): """Recursively clear modified flag""" for k,v in self._items.items(): v.set_unmodified() - @arvfile._synchronized - @_populate_first + @_synchronized def __iter__(self): """Iterate over names of files and collections contained in this collection.""" return self._items.keys() - @arvfile._synchronized - @_populate_first + @_synchronized def iterkeys(self): """Iterate over names of files and collections directly contained in this collection.""" return self._items.keys() - @arvfile._synchronized - @_populate_first + @_synchronized def __getitem__(self, k): """Get a file or collection that is directly contained by this collection. If you want to search a path, use `find()` instead. """ return self._items[k] - @arvfile._synchronized - @_populate_first + @_synchronized def __contains__(self, k): """If there is a file or collection a directly contained by this collection with name "k".""" return k in self._items - @arvfile._synchronized - @_populate_first + @_synchronized def __len__(self): """Get the number of items directly contained in this collection""" return len(self._items) @_must_be_writable - @arvfile._synchronized - @_populate_first + @_synchronized def __delitem__(self, p): """Delete an item by name which is directly contained by this collection.""" del self._items[p] + self.notify(self, DEL, p, None) - @arvfile._synchronized - @_populate_first + @_synchronized def keys(self): """Get a list of names of files and collections directly contained in this collection.""" return self._items.keys() - @arvfile._synchronized - @_populate_first + @_synchronized def values(self): """Get a list of files and collection objects directly contained in this collection.""" return self._items.values() - @arvfile._synchronized - @_populate_first + @_synchronized def items(self): """Get a list of (name, object) tuples directly contained in this collection.""" return self._items.items() @@ -846,8 +835,7 @@ class SynchronizedCollectionBase(CollectionBase): return self.find(path) != None @_must_be_writable - @arvfile._synchronized - @_populate_first + @_synchronized def remove(self, path, rm_r=False): """Remove the file or subcollection (directory) at `path`. :rm_r: @@ -866,6 +854,7 @@ class SynchronizedCollectionBase(CollectionBase): if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r: raise IOError((errno.ENOTEMPTY, "Subcollection not empty")) del self._items[p[0]] + self.notify(self, DEL, p[0], None) else: del p[0] item.remove("/".join(p)) @@ -880,8 +869,7 @@ class SynchronizedCollectionBase(CollectionBase): raise NotImplementedError() @_must_be_writable - @arvfile._synchronized - @_populate_first + @_synchronized def copyto(self, target_path, source_path, source_collection=None, overwrite=False): """ copyto('/foo', '/bar') will overwrite 'foo' if it exists. @@ -907,10 +895,14 @@ class SynchronizedCollectionBase(CollectionBase): raise IOError((errno.EEXIST, "File already exists")) # Actually make the copy. - target_dir[target_name]._items = source_obj.clone(target_dir) + dup = source_obj.clone(target_dir) + with target_dir.lock: + target_dir._items[target_name] = dup - @arvfile._synchronized - @_populate_first + self.notify(target_dir, ADD, target_name, dup) + + + @_synchronized def manifest_text(self, strip=False, normalize=False): """Get the manifest text for this collection, sub collections and files. @@ -933,6 +925,23 @@ class SynchronizedCollectionBase(CollectionBase): else: return self._manifest_text + @_must_be_writable + @_synchronized + def merge(self, other): + for k in other.keys(): + if k in self: + if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection): + self[k].merge(other[k]) + else: + if self[k] != other[k]: + name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S", + time.gmtime())) + self[name] = other[k].clone(self) + self.notify(self, name, ADD, self[name]) + else: + self[k] = other[k].clone(self) + self.notify(self, k, ADD, self[k]) + def portable_data_hash(self): """Get the portable data hash for this collection's manifest.""" stripped = self.manifest_text(strip=True) @@ -994,6 +1003,7 @@ class Collection(SynchronizedCollectionBase): self._api_response = None self._sync = sync self.lock = threading.RLock() + self.callbacks = [] if manifest_locator_or_text: if re.match(util.keep_locator_pattern, manifest_locator_or_text): @@ -1006,20 +1016,39 @@ class Collection(SynchronizedCollectionBase): raise errors.ArgumentError( "Argument to CollectionReader must be a manifest or a collection UUID") + self._populate() + + if self._sync == SYNC_LIVE: + if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator): + raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified") + self.events = events.subscribe(arvados.api(), filters=[["object_uuid", "=", self._manifest_locator]], self.on_message) + + @staticmethod + def create(name, owner_uuid=None, sync=SYNC_EXPLICIT): + c = Collection(sync=SYNC_EXPLICIT) + c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True) + return c + def _root_lock(self): return self.lock def sync_mode(self): return self._sync - @arvfile._synchronized + @_synchronized + def on_message(): + n = self._my_api().collections().get(uuid=self._manifest_locator, select=[["manifest_text"])).execute() + other = import_collection(n["manifest_text"]) + self.merge(other) + + @_synchronized def _my_api(self): if self._api_client is None: self._api_client = arvados.api.SafeApi(self._config) self._keep_client = self._api_client.keep return self._api_client - @arvfile._synchronized + @_synchronized def _my_keep(self): if self._keep_client is None: if self._api_client is None: @@ -1028,7 +1057,7 @@ class Collection(SynchronizedCollectionBase): self._keep_client = KeepClient(api=self._api_client) return self._keep_client - @arvfile._synchronized + @_synchronized def _my_block_manager(self): if self._block_manager is None: self._block_manager = BlockManager(self._my_keep()) @@ -1070,7 +1099,7 @@ class Collection(SynchronizedCollectionBase): error_via_keep = None should_try_keep = ((self._manifest_text is None) and util.keep_locator_pattern.match( - self._manifest_locator)) + self._manifest_locator)) if ((self._manifest_text is None) and util.signed_locator_pattern.match(self._manifest_locator)): error_via_keep = self._populate_from_keep() @@ -1109,8 +1138,7 @@ class Collection(SynchronizedCollectionBase): if self._block_manager is not None: self._block_manager.stop_threads() - @arvfile._synchronized - @_populate_first + @_synchronized def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config): c = Collection(parent=new_parent, config=new_config, sync=new_sync) if new_sync == Collection.SYNC_READONLY: @@ -1119,8 +1147,7 @@ class Collection(SynchronizedCollectionBase): self._cloneinto(c) return c - @arvfile._synchronized - @_populate_first + @_synchronized def api_response(self): """ api_response() -> dict or None @@ -1132,8 +1159,7 @@ class Collection(SynchronizedCollectionBase): return self._api_response @_must_be_writable - @arvfile._synchronized - @_populate_first + @_synchronized def save(self, allow_no_locator=False): """Commit pending buffer blocks to Keep, write the manifest to Keep, and update the collection record to Keep. @@ -1157,8 +1183,7 @@ class Collection(SynchronizedCollectionBase): self.set_unmodified() @_must_be_writable - @arvfile._synchronized - @_populate_first + @_synchronized def save_as(self, name, owner_uuid=None, ensure_unique_name=False): """Save a new collection record. @@ -1182,9 +1207,29 @@ class Collection(SynchronizedCollectionBase): if owner_uuid: body["owner_uuid"] = owner_uuid self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries) + + if self.events: + self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]]) + self._manifest_locator = self._api_response["uuid"] + + if self.events: + self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]]) + self.set_unmodified() + @_synchronized + def subscribe(self, callback): + self.callbacks.append(callback) + + @_synchronized + def unsubscribe(self, callback): + self.callbacks.remove(callback) + + @_synchronized + def notify(self, event): + for c in self.callbacks: + c(event) class Subcollection(SynchronizedCollectionBase): """This is a subdirectory within a collection that doesn't have its own API @@ -1212,8 +1257,10 @@ class Subcollection(SynchronizedCollectionBase): def _populate(self): self.parent._populate() - @arvfile._synchronized - @_populate_first + def notify(self, event): + self.parent.notify(event) + + @_synchronized def clone(self, new_parent): c = Subcollection(new_parent) c._items = {}