Added --storage-classes argument to arv-put.
[arvados.git] / sdk / python / arvados / collection.py
index 5008450eded4c9414044115ebad7b42c619cbcdf..cce7d75685628525e17775b9b6767b0017d5d969 100644 (file)
@@ -1,3 +1,7 @@
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 from __future__ import absolute_import
 from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from __future__ import absolute_import
 from future.utils import listitems, listvalues, viewkeys
 from builtins import str
@@ -30,6 +34,8 @@ from arvados.retry import retry_method
 _logger = logging.getLogger('arvados.collection')
 
 class CollectionBase(object):
 _logger = logging.getLogger('arvados.collection')
 
 class CollectionBase(object):
+    """Abstract base class for Collection classes."""
+
     def __enter__(self):
         return self
 
     def __enter__(self):
         return self
 
@@ -87,6 +93,8 @@ class _WriterFile(_FileLikeObjectBase):
 
 
 class CollectionWriter(CollectionBase):
 
 
 class CollectionWriter(CollectionBase):
+    """Deprecated, use Collection instead."""
+
     def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
     def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
@@ -360,7 +368,7 @@ class CollectionWriter(CollectionBase):
 
     def portable_data_hash(self):
         stripped = self.stripped_manifest().encode()
 
     def portable_data_hash(self):
         stripped = self.stripped_manifest().encode()
-        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+        return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     def manifest_text(self):
         self.finish_current_stream()
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -382,8 +390,18 @@ class CollectionWriter(CollectionBase):
             ret += locators
         return ret
 
             ret += locators
         return ret
 
+    def save_new(self, name=None):
+        return self._api_client.collections().create(
+            ensure_unique_name=True,
+            body={
+                'name': name,
+                'manifest_text': self.manifest_text(),
+            }).execute(num_retries=self.num_retries)
+
 
 class ResumableCollectionWriter(CollectionWriter):
 
 class ResumableCollectionWriter(CollectionWriter):
+    """Deprecated, use Collection instead."""
+
     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
                    '_current_stream_locators', '_current_stream_name',
                    '_current_file_name', '_current_file_pos', '_close_file',
     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
                    '_current_stream_locators', '_current_stream_name',
                    '_current_file_name', '_current_file_pos', '_close_file',
@@ -725,7 +743,7 @@ class RichCollectionBase(CollectionBase):
     @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
     @synchronized
     def keys(self):
         """Get a list of names of files and collections directly contained in this collection."""
-        return viewkeys(self._items)
+        return self._items.keys()
 
     @synchronized
     def values(self):
 
     @synchronized
     def values(self):
@@ -1085,7 +1103,7 @@ class RichCollectionBase(CollectionBase):
             return self._portable_data_hash
         else:
             stripped = self.portable_manifest_text().encode()
             return self._portable_data_hash
         else:
             stripped = self.portable_manifest_text().encode()
-            return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
+            return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
 
     @synchronized
     def subscribe(self, callback):
 
     @synchronized
     def subscribe(self, callback):
@@ -1181,8 +1199,9 @@ class Collection(RichCollectionBase):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
-          One of Arvados collection UUID, block locator of
-          a manifest, raw manifest text, or None (to create an empty collection).
+          An Arvados collection UUID, portable data hash, raw manifest
+          text, or (if creating an empty collection) None.
+
         :parent:
           the parent Collection, may be None.
 
         :parent:
           the parent Collection, may be None.
 
@@ -1321,65 +1340,25 @@ class Collection(RichCollectionBase):
         # it.  If instantiation fails, we'll fall back to the except
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
         # it.  If instantiation fails, we'll fall back to the except
         # clause, just like any other Collection lookup
         # failure. Return an exception, or None if successful.
-        try:
-            self._remember_api_response(self._my_api().collections().get(
-                uuid=self._manifest_locator).execute(
-                    num_retries=self.num_retries))
-            self._manifest_text = self._api_response['manifest_text']
-            self._portable_data_hash = self._api_response['portable_data_hash']
-            # If not overriden via kwargs, we should try to load the
-            # replication_desired from the API server
-            if self.replication_desired is None:
-                self.replication_desired = self._api_response.get('replication_desired', None)
-            return None
-        except Exception as e:
-            return e
-
-    def _populate_from_keep(self):
-        # Retrieve a manifest directly from Keep. This has a chance of
-        # working if [a] the locator includes a permission signature
-        # or [b] the Keep services are operating in world-readable
-        # mode. Return an exception, or None if successful.
-        try:
-            self._manifest_text = self._my_keep().get(
-                self._manifest_locator, num_retries=self.num_retries).decode()
-        except Exception as e:
-            return e
+        self._remember_api_response(self._my_api().collections().get(
+            uuid=self._manifest_locator).execute(
+                num_retries=self.num_retries))
+        self._manifest_text = self._api_response['manifest_text']
+        self._portable_data_hash = self._api_response['portable_data_hash']
+        # If not overriden via kwargs, we should try to load the
+        # replication_desired from the API server
+        if self.replication_desired is None:
+            self.replication_desired = self._api_response.get('replication_desired', None)
 
     def _populate(self):
 
     def _populate(self):
-        if self._manifest_locator is None and self._manifest_text is None:
-            return
-        error_via_api = None
-        error_via_keep = None
-        should_try_keep = ((self._manifest_text is None) and
-                           arvados.util.keep_locator_pattern.match(
-                               self._manifest_locator))
-        if ((self._manifest_text is None) and
-            arvados.util.signed_locator_pattern.match(self._manifest_locator)):
-            error_via_keep = self._populate_from_keep()
-        if self._manifest_text is None:
-            error_via_api = self._populate_from_api_server()
-            if error_via_api is not None and not should_try_keep:
-                raise error_via_api
-        if ((self._manifest_text is None) and
-            not error_via_keep and
-            should_try_keep):
-            # Looks like a keep locator, and we didn't already try keep above
-            error_via_keep = self._populate_from_keep()
         if self._manifest_text is None:
         if self._manifest_text is None:
-            # Nothing worked!
-            raise errors.NotFoundError(
-                ("Failed to retrieve collection '{}' " +
-                 "from either API server ({}) or Keep ({})."
-                 ).format(
-                    self._manifest_locator,
-                    error_via_api,
-                    error_via_keep))
-        # populate
+            if self._manifest_locator is None:
+                return
+            else:
+                self._populate_from_api_server()
         self._baseline_manifest = self._manifest_text
         self._import_manifest(self._manifest_text)
 
         self._baseline_manifest = self._manifest_text
         self._import_manifest(self._manifest_text)
 
-
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
@@ -1457,7 +1436,7 @@ class Collection(RichCollectionBase):
     @must_be_writable
     @synchronized
     @retry_method
     @must_be_writable
     @synchronized
     @retry_method
-    def save(self, merge=True, num_retries=None):
+    def save(self, storage_classes=None, merge=True, num_retries=None):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
@@ -1486,9 +1465,13 @@ class Collection(RichCollectionBase):
                 self.update()
 
             text = self.manifest_text(strip=False)
                 self.update()
 
             text = self.manifest_text(strip=False)
+            body={'manifest_text': text}
+            if storage_classes:
+                body["storage_classes_desired"] = storage_classes
+
             self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
             self._remember_api_response(self._my_api().collections().update(
                 uuid=self._manifest_locator,
-                body={'manifest_text': text}
+                body=body
                 ).execute(
                     num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
                 ).execute(
                     num_retries=num_retries))
             self._manifest_text = self._api_response["manifest_text"]
@@ -1504,6 +1487,7 @@ class Collection(RichCollectionBase):
     def save_new(self, name=None,
                  create_collection_record=True,
                  owner_uuid=None,
     def save_new(self, name=None,
                  create_collection_record=True,
                  owner_uuid=None,
+                 storage_classes=None,
                  ensure_unique_name=False,
                  num_retries=None):
         """Save collection to a new collection record.
                  ensure_unique_name=False,
                  num_retries=None):
         """Save collection to a new collection record.
@@ -1546,6 +1530,8 @@ class Collection(RichCollectionBase):
                     "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
                     "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
+            if storage_classes:
+                body["storage_classes_desired"] = storage_classes
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
@@ -1558,6 +1544,10 @@ class Collection(RichCollectionBase):
 
         return text
 
 
         return text
 
+    _token_re = re.compile(r'(\S+)(\s+|$)')
+    _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
+    _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1576,7 +1566,7 @@ class Collection(RichCollectionBase):
         stream_name = None
         state = STREAM_NAME
 
         stream_name = None
         state = STREAM_NAME
 
-        for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
+        for token_and_separator in self._token_re.finditer(manifest_text):
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
             tok = token_and_separator.group(1)
             sep = token_and_separator.group(2)
 
@@ -1591,7 +1581,7 @@ class Collection(RichCollectionBase):
                 continue
 
             if state == BLOCKS:
                 continue
 
             if state == BLOCKS:
-                block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
+                block_locator = self._block_re.match(tok)
                 if block_locator:
                     blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
                 if block_locator:
                     blocksize = int(block_locator.group(1))
                     blocks.append(Range(tok, streamoffset, blocksize, 0))
@@ -1600,7 +1590,7 @@ class Collection(RichCollectionBase):
                     state = SEGMENTS
 
             if state == SEGMENTS:
                     state = SEGMENTS
 
             if state == SEGMENTS:
-                file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
+                file_segment = self._segment_re.match(tok)
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
@@ -1680,9 +1670,8 @@ class Subcollection(RichCollectionBase):
 class CollectionReader(Collection):
     """A read-only collection object.
 
 class CollectionReader(Collection):
     """A read-only collection object.
 
-    Initialize from an api collection record locator, a portable data hash of a
-    manifest, or raw manifest text.  See `Collection` constructor for detailed
-    options.
+    Initialize from a collection UUID or portable data hash, or raw
+    manifest text.  See `Collection` constructor for detailed options.
 
     """
     def __init__(self, manifest_locator_or_text, *args, **kwargs):
 
     """
     def __init__(self, manifest_locator_or_text, *args, **kwargs):