3198: Many tests. Fixed lots of bugs.
[arvados.git] / sdk / python / arvados / collection.py
index ea9f5de8993b7c310b9e1105282b245626e7593f..6602ed1598927b665209886fd3431659f8028512 100644 (file)
@@ -2,11 +2,12 @@ import functools
 import logging
 import os
 import re
+import errno
 
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader
+from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
 from keep import *
 from .stream import StreamReader, normalize_stream, locator_block_size
 from .ranges import Range, LocatorAndRange
@@ -640,12 +641,15 @@ class ResumableCollectionWriter(CollectionWriter):
 
 
 class Collection(CollectionBase):
-    def __init__(self, manifest_locator_or_text=None, api_client=None,
-                 keep_client=None, num_retries=0):
+    def __init__(self, parent=None, manifest_locator_or_text=None, api_client=None,
+                 keep_client=None, num_retries=0, block_manager=None):
 
+        self._parent = parent
         self._items = None
         self._api_client = api_client
         self._keep_client = keep_client
+        self._block_manager = block_manager
+
         self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
@@ -662,6 +666,29 @@ class Collection(CollectionBase):
                 raise errors.ArgumentError(
                     "Argument to CollectionReader must be a manifest or a collection UUID")
 
+    def _my_api(self):
+        if self._api_client is None:
+            if self._parent is not None:
+                return self._parent._my_api()
+            self._api_client = arvados.api('v1')
+            self._keep_client = None  # Make a new one with the new api.
+        return self._api_client
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            if self._parent is not None:
+                return self._parent._my_keep()
+            self._keep_client = KeepClient(api_client=self._my_api(),
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+    def _my_block_manager(self):
+        if self._block_manager is None:
+            if self._parent is not None:
+                return self._parent._my_block_manager()
+            self._block_manager = BlockManager(self._my_keep())
+        return self._block_manager
+
     def _populate_from_api_server(self):
         # As in KeepClient itself, we must wait until the last
         # possible moment to instantiate an API client, in order to
@@ -671,10 +698,7 @@ class Collection(CollectionBase):
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         try:
-            if self._api_client is None:
-                self._api_client = arvados.api('v1')
-                self._keep_client = None  # Make a new one with the new api.
-            self._api_response = self._api_client.collections().get(
+            self._api_response = self._my_api().collections().get(
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries)
             self._manifest_text = self._api_response['manifest_text']
@@ -739,7 +763,9 @@ class Collection(CollectionBase):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
-        self.save()
+        self.save(no_locator=True)
+        if self._block_manager is not None:
+            self._block_manager.stop_threads()
 
     @_populate_first
     def find(self, path, create=False):
@@ -753,13 +779,13 @@ class Collection(CollectionBase):
                 # item must be a file
                 if item is None and create:
                     # create new file
-                    item = ArvadosFile(keep=self._keep_client)
+                    item = ArvadosFile(self, keep=self._keep_client)
                     self._items[p[0]] = item
                 return item
             else:
                 if item is None and create:
                     # create new collection
-                    item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries)
+                    item = Collection(parent=self, num_retries=self.num_retries)
                     self._items[p[0]] = item
                 del p[0]
                 return item.find("/".join(p), create=create)
@@ -784,9 +810,9 @@ class Collection(CollectionBase):
 
         f = self.find(path, create=create)
         if f is None:
-            raise ArgumentError("File not found")
+            raise IOError((errno.ENOENT, "File not found"))
         if not isinstance(f, ArvadosFile):
-            raise ArgumentError("Path must refer to a file.")
+            raise IOError((errno.EISDIR, "Path must refer to a file."))
 
         if mode[0] == "w":
             f.truncate(0)
@@ -810,20 +836,19 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __iter__(self):
-        for k in self._items.keys():
-            yield k
+        self._items.iterkeys()
+
+    @_populate_first
+    def iterkeys(self):
+        self._items.iterkeys()
 
     @_populate_first
     def __getitem__(self, k):
-        r = self.find(k)
-        if r:
-            return r
-        else:
-            raise KeyError(k)
+        return self._items[k]
 
     @_populate_first
     def __contains__(self, k):
-        return self.find(k) is not None
+        return k in self._items
 
     @_populate_first
     def __len__(self):
@@ -831,6 +856,26 @@ class Collection(CollectionBase):
 
     @_populate_first
     def __delitem__(self, p):
+        del self._items[p]
+
+    @_populate_first
+    def keys(self):
+        return self._items.keys()
+
+    @_populate_first
+    def values(self):
+        return self._items.values()
+
+    @_populate_first
+    def items(self):
+        return self._items.items()
+
+    @_populate_first
+    def exists(self, path):
+        return self.find(path) != None
+
+    @_populate_first
+    def remove(self, path):
         p = path.split("/")
         if p[0] == '.':
             del p[0]
@@ -838,31 +883,55 @@ class Collection(CollectionBase):
         if len(p) > 0:
             item = self._items.get(p[0])
             if item is None:
-                raise NotFoundError()
+                raise IOError((errno.ENOENT, "File not found"))
             if len(p) == 1:
                 del self._items[p[0]]
             else:
                 del p[0]
-                del item["/".join(p)]
+                item.remove("/".join(p))
         else:
-            raise NotFoundError()
+            raise IOError((errno.ENOENT, "File not found"))
 
     @_populate_first
-    def keys(self):
-        return self._items.keys()
+    def manifest_text(self, strip=False, normalize=False):
+        if self.modified() or self._manifest_text is None or normalize:
+            return export_manifest(self, stream_name=".", portable_locators=strip)
+        else:
+            if strip:
+                return self.stripped_manifest()
+            else:
+                return self._manifest_text
 
-    @_populate_first
-    def values(self):
-        return self._items.values()
+    def portable_data_hash(self):
+        stripped = self.manifest_text(strip=True)
+        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     @_populate_first
-    def items(self):
-        return self._items.items()
+    def save(self, no_locator=False):
+        if self.modified():
+            self._my_block_manager().commit_all()
+            self._my_keep().put(self.manifest_text(strip=True))
+            if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
+                self._api_response = self._my_api().collections().update(
+                    uuid=self._manifest_locator,
+                    body={'manifest_text': self.manifest_text(strip=False)}
+                    ).execute(
+                        num_retries=self.num_retries)
+            elif not no_locator:
+                raise AssertionError("Collection manifest_locator must be a collection uuid.  Use save_as() for new collections.")
+            self.set_unmodified()
 
     @_populate_first
-    def save(self):
-        self._my_keep().put(self.portable_manifest_text())
-
+    def save_as(self, name, owner_uuid=None):
+        self._my_block_manager().commit_all()
+        self._my_keep().put(self.manifest_text(strip=True))
+        body = {"manifest_text": self.manifest_text(strip=False),
+                "name": name}
+        if owner_uuid:
+            body["owner_uuid"] = owner_uuid
+        self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
+        self._manifest_locator = self._api_response["uuid"]
+        self.set_unmodified()
 
 
 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
@@ -932,12 +1001,13 @@ def export_manifest(item, stream_name=".", portable_locators=False):
             for s in v._segments:
                 loc = s.locator
                 if loc.startswith("bufferblock"):
-                    loc = v._bufferblocks[loc].calculate_locator()
+                    loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
                 st.append(LocatorAndRange(loc, locator_block_size(loc),
                                      s.segment_offset, s.range_size))
             stream[k] = st
-        buf += ' '.join(normalize_stream(stream_name, stream))
-        buf += "\n"
+        if stream:
+            buf += ' '.join(normalize_stream(stream_name, stream))
+            buf += "\n"
         for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
             buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
     elif isinstance(item, ArvadosFile):