4823: Add Collection.copy tests
[arvados.git] / sdk / python / arvados / collection.py
index 22c4d66acb232b9dc6b2fe0b4306826c1746e2f1..ddf2eae674d12fd32a25236f94db2c4577c9dc80 100644 (file)
@@ -8,10 +8,11 @@ import time
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
+from .safeapi import SafeApi
 import config
 import errors
 import util
@@ -154,7 +155,6 @@ class CollectionReader(CollectionBase):
                          for sline in self._manifest_text.split("\n")
                          if sline]
 
-    @staticmethod
     def _populate_first(orig_func):
         # Decorator for methods that read actual Collection data.
         @functools.wraps(orig_func)
@@ -642,18 +642,13 @@ class ResumableCollectionWriter(CollectionWriter):
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
 
+ADD = "add"
+DEL = "del"
 
 class SynchronizedCollectionBase(CollectionBase):
-    SYNC_READONLY = 1
-    SYNC_EXPLICIT = 2
-    SYNC_LIVE = 3
-
-    ADD = "add"
-    DEL = "del"
-
     def __init__(self, parent=None):
         self.parent = parent
-        self._items = None
+        self._items = {}
 
     def _my_api(self):
         raise NotImplementedError()
@@ -694,14 +689,14 @@ class SynchronizedCollectionBase(CollectionBase):
           component.
 
         """
-        if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+        if create and self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
         p = path.split("/")
         if p[0] == '.':
             del p[0]
 
-        if len(p) > 0:
+        if p and p[0]:
             item = self._items.get(p[0])
             if len(p) == 1:
                 # item must be a file
@@ -748,10 +743,11 @@ class SynchronizedCollectionBase(CollectionBase):
             raise ArgumentError("Bad mode '%s'" % mode)
         create = (mode != "r")
 
-        if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
+        if create and self.sync_mode() == SYNC_READONLY:
             raise IOError((errno.EROFS, "Collection is read only"))
 
         f = self.find(path, create=create)
+
         if f is None:
             raise IOError((errno.ENOENT, "File not found"))
         if not isinstance(f, ArvadosFile):
@@ -761,9 +757,9 @@ class SynchronizedCollectionBase(CollectionBase):
             f.truncate(0)
 
         if mode == "r":
-            return ArvadosFileReader(f, path, mode)
+            return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
         else:
-            return ArvadosFileWriter(f, path, mode)
+            return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
 
     @_synchronized
     def modified(self):
@@ -851,7 +847,7 @@ class SynchronizedCollectionBase(CollectionBase):
             if item is None:
                 raise IOError((errno.ENOENT, "File not found"))
             if len(p) == 1:
-                if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
+                if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
                     raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
                 del self._items[p[0]]
                 self.notify(self, DEL, p[0], None)
@@ -870,7 +866,7 @@ class SynchronizedCollectionBase(CollectionBase):
 
     @_must_be_writable
     @_synchronized
-    def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
+    def copy(self, source_path, target_path, source_collection=None, overwrite=False):
         """
         copyto('/foo', '/bar') will overwrite 'foo' if it exists.
         copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
@@ -886,13 +882,17 @@ class SynchronizedCollectionBase(CollectionBase):
 
         # Find parent collection the target path
         tp = target_path.split("/")
-        target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
+        target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
 
         # Determine the name to use.
         target_name = tp[-1] if tp[-1] else sp[-1]
 
-        if target_name in target_dir and not overwrite:
-            raise IOError((errno.EEXIST, "File already exists"))
+        if target_name in target_dir:
+            if isinstance(target_dir[target_name], SynchronizedCollectionBase):
+                target_dir = target_dir[target_name]
+                target_name = sp[-1]
+            elif not overwrite:
+                raise IOError((errno.EEXIST, "File already exists"))
 
         # Actually make the copy.
         dup = source_obj.clone(target_dir)
@@ -934,7 +934,7 @@ class SynchronizedCollectionBase(CollectionBase):
                     self[k].merge(other[k])
                 else:
                     if self[k] != other[k]:
-                        name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
+                        name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d_%H:%M%:%S",
                                                                      time.gmtime()))
                         self[name] = other[k].clone(self)
                         self.notify(self, name, ADD, self[name])
@@ -958,9 +958,9 @@ class Collection(SynchronizedCollectionBase):
                  config=None,
                  api_client=None,
                  keep_client=None,
-                 num_retries=0,
+                 num_retries=None,
                  block_manager=None,
-                 sync=Collection.SYNC_READONLY):
+                 sync=SYNC_READONLY):
         """:manifest_locator_or_text:
           One of Arvados collection UUID, block locator of
           a manifest, raw manifest text, or None (to create an empty collection).
@@ -984,15 +984,13 @@ class Collection(SynchronizedCollectionBase):
             Collection is read only.  No synchronization.  This mode will
             also forego locking, which gives better performance.
           :SYNC_EXPLICIT:
-            Synchronize on explicit request via `merge()` or `save()`
+            Synchronize on explicit request via `update()` or `save()`
           :SYNC_LIVE:
             Synchronize with server in response to background websocket events,
             on block write, or on file close.
 
         """
-
-        self.parent = parent
-        self._items = None
+        super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
@@ -1004,6 +1002,7 @@ class Collection(SynchronizedCollectionBase):
         self._sync = sync
         self.lock = threading.RLock()
         self.callbacks = []
+        self.events = None
 
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
@@ -1021,11 +1020,11 @@ class Collection(SynchronizedCollectionBase):
             if self._sync == SYNC_LIVE:
                 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
                     raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
-                self.events = events.subscribe(arvados.api(), filters=[["object_uuid", "=", self._manifest_locator]], self.on_message)
+                self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
 
     @staticmethod
     def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
-        c = Collection(sync=SYNC_EXPLICIT)
+        c = Collection(sync=sync)
         c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
         return c
 
@@ -1035,16 +1034,19 @@ class Collection(SynchronizedCollectionBase):
     def sync_mode(self):
         return self._sync
 
+    def on_message(self):
+        self.update()
+
     @_synchronized
-    def on_message():
-        n = self._my_api().collections().get(uuid=self._manifest_locator, select=[["manifest_text"])).execute()
+    def update(self):
+        n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
         other = import_collection(n["manifest_text"])
         self.merge(other)
 
     @_synchronized
     def _my_api(self):
         if self._api_client is None:
-            self._api_client = arvados.api.SafeApi(self._config)
+            self._api_client = arvados.SafeApi(self._config)
             self._keep_client = self._api_client.keep
         return self._api_client
 
@@ -1092,7 +1094,6 @@ class Collection(SynchronizedCollectionBase):
             return e
 
     def _populate(self):
-        self._items = {}
         if self._manifest_locator is None and self._manifest_text is None:
             return
         error_via_api = None
@@ -1134,14 +1135,17 @@ class Collection(SynchronizedCollectionBase):
 
     def __exit__(self, exc_type, exc_value, traceback):
         """Support scoped auto-commit in a with: block"""
-        self.save(allow_no_locator=True)
+        if self._sync != SYNC_READONLY:
+            self.save(allow_no_locator=True)
         if self._block_manager is not None:
             self._block_manager.stop_threads()
 
     @_synchronized
-    def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
+    def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
+        if new_config is None:
+            new_config = self.config
         c = Collection(parent=new_parent, config=new_config, sync=new_sync)
-        if new_sync == Collection.SYNC_READONLY:
+        if new_sync == SYNC_READONLY:
             c.lock = NoopLock()
         c._items = {}
         self._cloneinto(c)
@@ -1227,9 +1231,9 @@ class Collection(SynchronizedCollectionBase):
         self.callbacks.remove(callback)
 
     @_synchronized
-    def notify(self, event):
+    def notify(self, collection, event, name, item):
         for c in self.callbacks:
-            c(event)
+            c(collection, event, name, item)
 
 class Subcollection(SynchronizedCollectionBase):
     """This is a subdirectory within a collection that doesn't have its own API
@@ -1239,7 +1243,7 @@ class Subcollection(SynchronizedCollectionBase):
         super(Subcollection, self).__init__(parent)
         self.lock = parent._root_lock()
 
-    def _root_lock():
+    def _root_lock(self):
         return self.parent._root_lock()
 
     def sync_mode(self):
@@ -1257,8 +1261,8 @@ class Subcollection(SynchronizedCollectionBase):
     def _populate(self):
         self.parent._populate()
 
-    def notify(self, event):
-        self.parent.notify(event)
+    def notify(self, collection, event, name, item):
+        self.parent.notify(collection, event, name, item)
 
     @_synchronized
     def clone(self, new_parent):
@@ -1267,7 +1271,12 @@ class Subcollection(SynchronizedCollectionBase):
         self._cloneinto(c)
         return c
 
-def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
+def import_manifest(manifest_text,
+                    into_collection=None,
+                    api_client=None,
+                    keep=None,
+                    num_retries=None,
+                    sync=SYNC_READONLY):
     """Import a manifest into a `Collection`.
 
     :manifest_text:
@@ -1283,15 +1292,21 @@ def import_manifest(manifest_text, into_collection=None, api_client=None, keep=N
     :keep:
       The keep client object that will be used when creating a new `Collection` object.
 
-    num_retries
+    :num_retries:
       the default number of api client and keep retries on error.
+
+    :sync:
+      Collection sync mode (only if into_collection is None)
     """
     if into_collection is not None:
         if len(into_collection) > 0:
             raise ArgumentError("Can only import manifest into an empty collection")
         c = into_collection
     else:
-        c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
+        c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
+
+    save_sync = c.sync_mode()
+    c._sync = None
 
     STREAM_NAME = 0
     BLOCKS = 1
@@ -1339,6 +1354,7 @@ def import_manifest(manifest_text, into_collection=None, api_client=None, keep=N
             state = STREAM_NAME
 
     c.set_unmodified()
+    c._sync = save_sync
     return c
 
 def export_manifest(item, stream_name=".", portable_locators=False):
@@ -1361,7 +1377,7 @@ def export_manifest(item, stream_name=".", portable_locators=False):
         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
             v = item[k]
             st = []
-            for s in v.segments:
+            for s in v.segments():
                 loc = s.locator
                 if loc.startswith("bufferblock"):
                     loc = v.parent._my_block_manager()._bufferblocks[loc].locator()