3198: Apply StreamFileReader tests to ArvadosFileReader
[arvados.git] / sdk / python / arvados / collection.py
index ea9f5de8993b7c310b9e1105282b245626e7593f..ea18123f65474ff67bd6b4c137ecb15208694f34 100644 (file)
@@ -2,11 +2,12 @@ import functools
 import logging
 import os
 import re
+import errno
 
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -640,12 +641,15 @@ class ResumableCollectionWriter(CollectionWriter):
 
 
 class Collection(CollectionBase):
-    def __init__(self, manifest_locator_or_text=None, api_client=None,
-                 keep_client=None, num_retries=0):
+    def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
+                 keep_client=None, num_retries=0, block_manager=None):
 
+        self.parent = parent
         self._items = None
         self._api_client = api_client
         self._keep_client = keep_client
+        self._block_manager = block_manager
+
         self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
@@ -662,6 +666,29 @@ class Collection(CollectionBase):
                 raise errors.ArgumentError(
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
+    def _my_api(self):
+        if self._api_client is None:
+            if self.parent is not None:
+                return self.parent._my_api()
+            self._api_client = arvados.api('v1')
+            self._keep_client = None  # Make a new one with the new api.
+        return self._api_client
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            if self.parent is not None:
+                return self.parent._my_keep()
+            self._keep_client = KeepClient(api_client=self._my_api(),
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+    def _my_block_manager(self):
+        if self._block_manager is None:
+            if self.parent is not None:
+                return self.parent._my_block_manager()
+            self._block_manager = BlockManager(self._my_keep())
+        return self._block_manager
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -671,10 +698,7 @@ class Collection(CollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            if self._api_client is None:
-                self._api_client = arvados.api('v1')
-                self._keep_client = None  # Make a new one with the new api.
-            self._api_response = self._api_client.collections().get(
+            self._api_response = self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries)
             self._manifest_text = self._api_response['manifest_text']
@@ -739,10 +763,12 @@ class Collection(CollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        self.save()
+        self.save(no_locator=True)
+        if self._block_manager is not None:
+            self._block_manager.stop_threads()
 
     @_populate_first
-    def find(self, path, create=False):
+    def find(self, path, create=False, create_collection=False):
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -753,13 +779,16 @@ class Collection(CollectionBase):
                 # item must be a file
                 if item is None and create:
                     # create new file
-                    item = ArvadosFile(keep=self._keep_client)
+                    if create_collection:
+                        item = Collection(parent=self, num_retries=self.num_retries)
+                    else:
+                        item = ArvadosFile(self)
                     self._items[p[0]] = item
                 return item
             else:
                 if item is None and create:
                     # create new collection
-                    item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries)
+                    item = Collection(parent=self, num_retries=self.num_retries)
                     self._items[p[0]] = item
                 del p[0]
                 return item.find("/".join(p), create=create)
@@ -784,9 +813,9 @@ class Collection(CollectionBase):
 
         f = self.find(path, create=create)
         if f is None:
-            raise ArgumentError("File not found")
+            raise IOError((errno.ENOENT, "File not found"))
         if not isinstance(f, ArvadosFile):
-            raise ArgumentError("Path must refer to a file.")
+            raise IOError((errno.EISDIR, "Path must refer to a file."))
 
         if mode[0] == "w":
             f.truncate(0)
@@ -810,20 +839,19 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __iter__(self):
-        for k in self._items.keys():
-            yield k
+        return self._items.iterkeys()
+
+    @_populate_first
+    def iterkeys(self):
+        return self._items.iterkeys()
 
     @_populate_first
     def __getitem__(self, k):
-        r = self.find(k)
-        if r:
-            return r
-        else:
-            raise KeyError(k)
+        return self._items[k]
 
     @_populate_first
     def __contains__(self, k):
-        return self.find(k) is not None
+        return k in self._items
 
     @_populate_first
     def __len__(self):
@@ -831,6 +859,26 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __delitem__(self, p):
+        del self._items[p]
+
+    @_populate_first
+    def keys(self):
+        return self._items.keys()
+
+    @_populate_first
+    def values(self):
+        return self._items.values()
+
+    @_populate_first
+    def items(self):
+        return self._items.items()
+
+    @_populate_first
+    def exists(self, path):
+        return self.find(path) != None
+
+    @_populate_first
+    def remove(self, path):
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -838,32 +886,75 @@ class Collection(CollectionBase):
         if len(p) > 0:
             item = self._items.get(p[0])
             if item is None:
-                raise NotFoundError()
+                raise IOError((errno.ENOENT, "File not found"))
             if len(p) == 1:
                 del self._items[p[0]]
             else:
                 del p[0]
-                del item["/".join(p)]
+                item.remove("/".join(p))
         else:
-            raise NotFoundError()
+            raise IOError((errno.ENOENT, "File not found"))
 
     @_populate_first
-    def keys(self):
-        return self._items.keys()
+    def manifest_text(self, strip=False, normalize=False):
+        if self.modified() or self._manifest_text is None or normalize:
+            return export_manifest(self, stream_name=".", portable_locators=strip)
+        else:
+            if strip:
+                return self.stripped_manifest()
+            else:
+                return self._manifest_text
 
-    @_populate_first
-    def values(self):
-        return self._items.values()
+    def portable_data_hash(self):
+        stripped = self.manifest_text(strip=True)
+        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @_populate_first
-    def items(self):
-        return self._items.items()
+    def save(self, no_locator=False):
+        if self.modified():
+            self._my_block_manager().commit_all()
+            self._my_keep().put(self.manifest_text(strip=True))
+            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+                self._api_response = self._my_api().collections().update(
+                    uuid=self._manifest_locator,
+                    body={'manifest_text': self.manifest_text(strip=False)}
+                    ).execute(
+                        num_retries=self.num_retries)
+            elif not no_locator:
+                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+            self.set_unmodified()
 
     @_populate_first
-    def save(self):
-        self._my_keep().put(self.portable_manifest_text())
-
+    def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
+        self._my_block_manager().commit_all()
+        self._my_keep().put(self.manifest_text(strip=True))
+        body = {"manifest_text": self.manifest_text(strip=False),
+                "name": name}
+        if owner_uuid:
+            body["owner_uuid"] = owner_uuid
+        self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
+        self._manifest_locator = self._api_response["uuid"]
+        self.set_unmodified()
 
+    @_populate_first
+    def rename(self, old, new):
+        old_path, old_fn = os.path.split(old)
+        old_col = self.find(path)
+        if old_col is None:
+            raise IOError((errno.ENOENT, "File not found"))
+        if not isinstance(old_p, Collection):
+            raise IOError((errno.ENOTDIR, "Parent in path is a file, not a directory"))
+        if old_fn in old_col:
+            new_path, new_fn = os.path.split(new)
+            new_col = self.find(new_path, create=True, create_collection=True)
+            if not isinstance(new_col, Collection):
+                raise IOError((errno.ENOTDIR, "Destination is a file, not a directory"))
+            ent = old_col[old_fn]
+            del old_col[old_fn]
+            ent.parent = new_col
+            new_col[new_fn] = ent
+        else:
+            raise IOError((errno.ENOENT, "File not found"))
 
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
     if into_collection is not None:
@@ -929,23 +1020,28 @@ def export_manifest(item, stream_name=".", portable_locators=False):
         for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
             v = item[k]
             st = []
-            for s in v._segments:
+            for s in v.segments:
                 loc = s.locator
                 if loc.startswith("bufferblock"):
-                    loc = v._bufferblocks[loc].calculate_locator()
+                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
+                if portable_locators:
+                    loc = KeepLocator(loc).stripped()
                 st.append(LocatorAndRange(loc, locator_block_size(loc),
                                      s.segment_offset, s.range_size))
             stream[k] = st
-        buf += ' '.join(normalize_stream(stream_name, stream))
-        buf += "\n"
+        if stream:
+            buf += ' '.join(normalize_stream(stream_name, stream))
+            buf += "\n"
         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
-            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
+            buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
     elif isinstance(item, ArvadosFile):
         st = []
-        for s in item._segments:
+        for s in item.segments:
             loc = s.locator
             if loc.startswith("bufferblock"):
                 loc = item._bufferblocks[loc].calculate_locator()
+            if portable_locators:
+                loc = KeepLocator(loc).stripped()
             st.append(LocatorAndRange(loc, locator_block_size(loc),
                                  s.segment_offset, s.range_size))
         stream[stream_name] = st