3198: Apply StreamFileReader tests to ArvadosFileReader
[arvados.git] / sdk / python / arvados / collection.py
index 85c7ad7f4f935acd566ff20d5a4a543968f7abce..ea18123f65474ff67bd6b4c137ecb15208694f34 100644 (file)
@@ -2,6 +2,7 @@ import functools
 import logging
 import os
 import re
+import errno
 
 from collections import deque
 from stat import *
@@ -640,20 +641,20 @@ class ResumableCollectionWriter(CollectionWriter):
 
 
 class Collection(CollectionBase):
-    def __init__(self, manifest_locator_or_text=None, api_client=None,
+    def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
                  keep_client=None, num_retries=0, block_manager=None):
 
+        self.parent = parent
         self._items = None
         self._api_client = api_client
         self._keep_client = keep_client
+        self._block_manager = block_manager
+
         self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
         self._api_response = None
 
-        if block_manager is None:
-            self.block_manager = BlockManager(keep_client)
-
         if manifest_locator_or_text:
             if re.match(util.keep_locator_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
@@ -665,6 +666,29 @@ class Collection(CollectionBase):
                 raise errors.ArgumentError(
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
+    def _my_api(self):
+        if self._api_client is None:
+            if self.parent is not None:
+                return self.parent._my_api()
+            self._api_client = arvados.api('v1')
+            self._keep_client = None  # Make a new one with the new api.
+        return self._api_client
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            if self.parent is not None:
+                return self.parent._my_keep()
+            self._keep_client = KeepClient(api_client=self._my_api(),
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+    def _my_block_manager(self):
+        if self._block_manager is None:
+            if self.parent is not None:
+                return self.parent._my_block_manager()
+            self._block_manager = BlockManager(self._my_keep())
+        return self._block_manager
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -674,10 +698,7 @@ class Collection(CollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            if self._api_client is None:
-                self._api_client = arvados.api('v1')
-                self._keep_client = None  # Make a new one with the new api.
-            self._api_response = self._api_client.collections().get(
+            self._api_response = self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries)
             self._manifest_text = self._api_response['manifest_text']
@@ -742,10 +763,12 @@ class Collection(CollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        self.save()
+        self.save(no_locator=True)
+        if self._block_manager is not None:
+            self._block_manager.stop_threads()
 
     @_populate_first
-    def find(self, path, create=False):
+    def find(self, path, create=False, create_collection=False):
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -756,13 +779,16 @@ class Collection(CollectionBase):
                 # item must be a file
                 if item is None and create:
                     # create new file
-                    item = ArvadosFile(self.block_manager, keep=self._keep_client)
+                    if create_collection:
+                        item = Collection(parent=self, num_retries=self.num_retries)
+                    else:
+                        item = ArvadosFile(self)
                     self._items[p[0]] = item
                 return item
             else:
                 if item is None and create:
                     # create new collection
-                    item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
+                    item = Collection(parent=self, num_retries=self.num_retries)
                     self._items[p[0]] = item
                 del p[0]
                 return item.find("/".join(p), create=create)
@@ -787,9 +813,9 @@ class Collection(CollectionBase):
 
         f = self.find(path, create=create)
         if f is None:
-            raise ArgumentError("File not found")
+            raise IOError((errno.ENOENT, "File not found"))
         if not isinstance(f, ArvadosFile):
-            raise ArgumentError("Path must refer to a file.")
+            raise IOError((errno.EISDIR, "Path must refer to a file."))
 
         if mode[0] == "w":
             f.truncate(0)
@@ -813,23 +839,19 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __iter__(self):
-        self._items.iterkeys()
+        return self._items.iterkeys()
 
     @_populate_first
     def iterkeys(self):
-        self._items.iterkeys()
+        return self._items.iterkeys()
 
     @_populate_first
     def __getitem__(self, k):
-        r = self.find(k)
-        if r:
-            return r
-        else:
-            raise KeyError(k)
+        return self._items[k]
 
     @_populate_first
     def __contains__(self, k):
-        return self.find(k) is not None
+        return k in self._items
 
     @_populate_first
     def __len__(self):
@@ -837,21 +859,7 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __delitem__(self, p):
-        p = path.split("/")
-        if p[0] == '.':
-            del p[0]
-
-        if len(p) > 0:
-            item = self._items.get(p[0])
-            if item is None:
-                raise NotFoundError()
-            if len(p) == 1:
-                del self._items[p[0]]
-            else:
-                del p[0]
-                del item["/".join(p)]
-        else:
-            raise NotFoundError()
+        del self._items[p]
 
     @_populate_first
     def keys(self):
@@ -865,6 +873,28 @@ class Collection(CollectionBase):
     def items(self):
         return self._items.items()
 
+    @_populate_first
+    def exists(self, path):
+        return self.find(path) != None
+
+    @_populate_first
+    def remove(self, path):
+        p = path.split("/")
+        if p[0] == '.':
+            del p[0]
+
+        if len(p) > 0:
+            item = self._items.get(p[0])
+            if item is None:
+                raise IOError((errno.ENOENT, "File not found"))
+            if len(p) == 1:
+                del self._items[p[0]]
+            else:
+                del p[0]
+                item.remove("/".join(p))
+        else:
+            raise IOError((errno.ENOENT, "File not found"))
+
     @_populate_first
     def manifest_text(self, strip=False, normalize=False):
         if self.modified() or self._manifest_text is None or normalize:
@@ -880,34 +910,51 @@ class Collection(CollectionBase):
         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @_populate_first
-    def commit_bufferblocks(self):
-        pass
-
-    @_populate_first
-    def save(self):
+    def save(self, no_locator=False):
         if self.modified():
+            self._my_block_manager().commit_all()
             self._my_keep().put(self.manifest_text(strip=True))
-            if re.match(util.collection_uuid_pattern, self._manifest_locator):
-                self._api_response = self._api_client.collections().update(
+            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+                self._api_response = self._my_api().collections().update(
                     uuid=self._manifest_locator,
                     body={'manifest_text': self.manifest_text(strip=False)}
                     ).execute(
                         num_retries=self.num_retries)
-            else:
+            elif not no_locator:
                 raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
             self.set_unmodified()
 
     @_populate_first
-    def save_as(self, name, owner_uuid=None):
+    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+        self._my_block_manager().commit_all()
         self._my_keep().put(self.manifest_text(strip=True))
         body = {"manifest_text": self.manifest_text(strip=False),
                 "name": name}
         if owner_uuid:
             body["owner_uuid"] = owner_uuid
-        self._api_response = self._api_client.collections().create(body=body).execute(num_retries=self.num_retries)
+        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
         self._manifest_locator = self._api_response["uuid"]
         self.set_unmodified()
 
+    @_populate_first
+    def rename(self, old, new):
+        old_path, old_fn = os.path.split(old)
+        old_col = self.find(path)
+        if old_col is None:
+            raise IOError((errno.ENOENT, "File not found"))
+        if not isinstance(old_p, Collection):
+            raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory"))
+        if old_fn in old_col:
+            new_path, new_fn = os.path.split(new)
+            new_col = self.find(new_path, create=True, create_collection=True)
+            if not isinstance(new_col, Collection):
+                raise IOError((errno.ENOTDIR, "Destination is a file, not a directory"))
+            ent = old_col[old_fn]
+            del old_col[old_fn]
+            ent.parent = new_col
+            new_col[new_fn] = ent
+        else:
+            raise IOError((errno.ENOENT, "File not found"))
 
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
     if into_collection is not None:
@@ -973,23 +1020,28 @@ def export_manifest(item, stream_name=".", portable_locators=False):
         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
             v = item[k]
             st = []
-            for s in v._segments:
+            for s in v.segments:
                 loc = s.locator
                 if loc.startswith("bufferblock"):
-                    loc = v.bbm._bufferblocks[loc].locator()
+                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+                if portable_locators:
+                    loc = KeepLocator(loc).stripped()
                 st.append(LocatorAndRange(loc, locator_block_size(loc),
                                      s.segment_offset, s.range_size))
             stream[k] = st
-        buf += ' '.join(normalize_stream(stream_name, stream))
-        buf += "\n"
+        if stream:
+            buf += ' '.join(normalize_stream(stream_name, stream))
+            buf += "\n"
         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
-            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
+            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
     elif isinstance(item, ArvadosFile):
         st = []
-        for s in item._segments:
+        for s in item.segments:
             loc = s.locator
             if loc.startswith("bufferblock"):
                 loc = item._bufferblocks[loc].calculate_locator()
+            if portable_locators:
+                loc = KeepLocator(loc).stripped()
             st.append(LocatorAndRange(loc, locator_block_size(loc),
                                  s.segment_offset, s.range_size))
         stream[stream_name] = st