18941: Add --threads option to arv-get
[arvados.git] / sdk / python / arvados / collection.py
index 7ad07cc607206fe32f46fe0c94cf9ea34e115224..a44d42b6ac7cd7a4156ab3b8bc4f72f86060e3a0 100644 (file)
@@ -283,7 +283,7 @@ class CollectionWriter(CollectionBase):
             streampath, filename = split(streampath)
         if self._last_open and not self._last_open.closed:
             raise errors.AssertionError(
             streampath, filename = split(streampath)
         if self._last_open and not self._last_open.closed:
             raise errors.AssertionError(
-                "can't open '{}' when '{}' is still open".format(
+                u"can't open '{}' when '{}' is still open".format(
                     filename, self._last_open.name))
         if streampath != self.current_stream_name():
             self.start_new_stream(streampath)
                     filename, self._last_open.name))
         if streampath != self.current_stream_name():
             self.start_new_stream(streampath)
@@ -461,22 +461,22 @@ class ResumableCollectionWriter(CollectionWriter):
                 writer._queued_file.seek(pos)
             except IOError as error:
                 raise errors.StaleWriterStateError(
                 writer._queued_file.seek(pos)
             except IOError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to reopen active file {}: {}".format(path, error))
+                    u"failed to reopen active file {}: {}".format(path, error))
         return writer
 
     def check_dependencies(self):
         for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
         return writer
 
     def check_dependencies(self):
         for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
-                raise errors.StaleWriterStateError("{} not file".format(path))
+                raise errors.StaleWriterStateError(u"{} not file".format(path))
             try:
                 now_stat = tuple(os.stat(path))
             except OSError as error:
                 raise errors.StaleWriterStateError(
             try:
                 now_stat = tuple(os.stat(path))
             except OSError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to stat {}: {}".format(path, error))
+                    u"failed to stat {}: {}".format(path, error))
             if ((not S_ISREG(now_stat[ST_MODE])) or
                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
             if ((not S_ISREG(now_stat[ST_MODE])) or
                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
-                raise errors.StaleWriterStateError("{} changed".format(path))
+                raise errors.StaleWriterStateError(u"{} changed".format(path))
 
     def dump_state(self, copy_func=lambda x: x):
         state = {attr: copy_func(getattr(self, attr))
 
     def dump_state(self, copy_func=lambda x: x):
         state = {attr: copy_func(getattr(self, attr))
@@ -492,7 +492,7 @@ class ResumableCollectionWriter(CollectionWriter):
         try:
             src_path = os.path.realpath(source)
         except Exception:
         try:
             src_path = os.path.realpath(source)
         except Exception:
-            raise errors.AssertionError("{} not a file path".format(source))
+            raise errors.AssertionError(u"{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
         except OSError as stat_error:
         try:
             path_stat = os.stat(src_path)
         except OSError as stat_error:
@@ -505,10 +505,10 @@ class ResumableCollectionWriter(CollectionWriter):
             self._dependencies[source] = tuple(fd_stat)
         elif path_stat is None:
             raise errors.AssertionError(
             self._dependencies[source] = tuple(fd_stat)
         elif path_stat is None:
             raise errors.AssertionError(
-                "could not stat {}: {}".format(source, stat_error))
+                u"could not stat {}: {}".format(source, stat_error))
         elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
         elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
-                "{} changed between open and stat calls".format(source))
+                u"{} changed between open and stat calls".format(source))
         else:
             self._dependencies[src_path] = tuple(fd_stat)
 
         else:
             self._dependencies[src_path] = tuple(fd_stat)
 
@@ -1261,7 +1261,9 @@ class Collection(RichCollectionBase):
                  apiconfig=None,
                  block_manager=None,
                  replication_desired=None,
                  apiconfig=None,
                  block_manager=None,
                  replication_desired=None,
-                 put_threads=None):
+                 storage_classes_desired=None,
+                 put_threads=None,
+                 get_threads=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1293,13 +1295,24 @@ class Collection(RichCollectionBase):
           configuration applies. If not None, this value will also be used
           for determining the number of block copies being written.
 
           configuration applies. If not None, this value will also be used
           for determining the number of block copies being written.
 
+        :storage_classes_desired:
+          A list of storage class names where to upload the data. If None,
+          the keep client is expected to store the data into the cluster's
+          default storage class(es).
+
         """
         """
+
+        if storage_classes_desired and type(storage_classes_desired) is not list:
+            raise errors.ArgumentError("storage_classes_desired must be list type.")
+
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
+        self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
         self.put_threads = put_threads
+        self.get_threads = get_threads
 
         if apiconfig:
             self._config = apiconfig
 
         if apiconfig:
             self._config = apiconfig
@@ -1333,8 +1346,11 @@ class Collection(RichCollectionBase):
 
             try:
                 self._populate()
 
             try:
                 self._populate()
-            except (IOError, errors.SyntaxError) as e:
-                raise errors.ArgumentError("Error processing manifest text: %s", e)
+            except errors.SyntaxError as e:
+                raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
+
+    def storage_classes_desired(self):
+        return self._storage_classes_desired or []
 
     def root_collection(self):
         return self
 
     def root_collection(self):
         return self
@@ -1347,7 +1363,10 @@ class Collection(RichCollectionBase):
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
-            return ciso8601.parse_datetime(self._api_response["trash_at"])
+            try:
+                return ciso8601.parse_datetime(self._api_response["trash_at"])
+            except ValueError:
+                return None
         else:
             return None
 
         else:
             return None
 
@@ -1378,7 +1397,7 @@ class Collection(RichCollectionBase):
                 # our tokens.
                 return
             else:
                 # our tokens.
                 return
             else:
-                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+                self._remember_api_response(response)
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1407,7 +1426,12 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired,
+                                                get_threads=self.get_threads,)
         return self._block_manager
 
     def _remember_api_response(self, response):
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1428,9 +1452,11 @@ class Collection(RichCollectionBase):
         self._manifest_text = self._api_response['manifest_text']
         self._portable_data_hash = self._api_response['portable_data_hash']
         # If not overriden via kwargs, we should try to load the
         self._manifest_text = self._api_response['manifest_text']
         self._portable_data_hash = self._api_response['portable_data_hash']
         # If not overriden via kwargs, we should try to load the
-        # replication_desired from the API server
+        # replication_desired and storage_classes_desired from the API server
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
+        if self._storage_classes_desired is None:
+            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
 
     def _populate(self):
         if self._manifest_text is None:
 
     def _populate(self):
         if self._manifest_text is None:
@@ -1527,7 +1553,8 @@ class Collection(RichCollectionBase):
              storage_classes=None,
              trash_at=None,
              merge=True,
              storage_classes=None,
              trash_at=None,
              merge=True,
-             num_retries=None):
+             num_retries=None,
+             preserve_version=False):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
@@ -1557,24 +1584,38 @@ class Collection(RichCollectionBase):
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
+        :preserve_version:
+          If True, indicate that the collection content being saved right now
+          should be preserved in a version snapshot if the collection record is
+          updated in the future. Requires that the API server has
+          Collections.CollectionVersioning enabled, if not, setting this will
+          raise an exception.
+
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
 
         if storage_classes and type(storage_classes) is not list:
             raise errors.ArgumentError("storage_classes must be list type.")
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
 
         if storage_classes and type(storage_classes) is not list:
             raise errors.ArgumentError("storage_classes must be list type.")
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
         body={}
         if properties:
             body["properties"] = properties
         body={}
         if properties:
             body["properties"] = properties
-        if storage_classes:
-            body["storage_classes_desired"] = storage_classes
+        if self.storage_classes_desired():
+            body["storage_classes_desired"] = self.storage_classes_desired()
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
+        if preserve_version:
+            body["preserve_version"] = preserve_version
 
         if not self.committed():
             if self._has_remote_blocks:
 
         if not self.committed():
             if self._has_remote_blocks:
@@ -1620,7 +1661,8 @@ class Collection(RichCollectionBase):
                  storage_classes=None,
                  trash_at=None,
                  ensure_unique_name=False,
                  storage_classes=None,
                  trash_at=None,
                  ensure_unique_name=False,
-                 num_retries=None):
+                 num_retries=None,
+                 preserve_version=False):
         """Save collection to a new collection record.
 
         Commit pending buffer blocks to Keep and, when create_collection_record
         """Save collection to a new collection record.
 
         Commit pending buffer blocks to Keep and, when create_collection_record
@@ -1659,6 +1701,13 @@ class Collection(RichCollectionBase):
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
+        :preserve_version:
+          If True, indicate that the collection content being saved right now
+          should be preserved in a version snapshot if the collection record is
+          updated in the future. Requires that the API server has
+          Collections.CollectionVersioning enabled, if not, setting this will
+          raise an exception.
+
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
@@ -1669,11 +1718,17 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
         if self._has_remote_blocks:
             # Copy any remote blocks to the local cluster.
             self._copy_remote_blocks(remote_blocks={})
             self._has_remote_blocks = False
 
         if self._has_remote_blocks:
             # Copy any remote blocks to the local cluster.
             self._copy_remote_blocks(remote_blocks={})
             self._has_remote_blocks = False
 
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1689,11 +1744,13 @@ class Collection(RichCollectionBase):
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
-            if storage_classes:
-                body["storage_classes_desired"] = storage_classes
+            if self.storage_classes_desired():
+                body["storage_classes_desired"] = self.storage_classes_desired()
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
+            if preserve_version:
+                body["preserve_version"] = preserve_version
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
@@ -1766,7 +1823,13 @@ class Collection(RichCollectionBase):
                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
                         filepath = os.path.join(stream_name, name)
                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
                         filepath = os.path.join(stream_name, name)
-                        afile = self.find_or_create(filepath, FILE)
+                        try:
+                            afile = self.find_or_create(filepath, FILE)
+                        except IOError as e:
+                            if e.errno == errno.ENOTDIR:
+                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
+                            else:
+                                raise e from None
                         if isinstance(afile, ArvadosFile):
                             afile.add_segment(blocks, pos, size)
                         else:
                         if isinstance(afile, ArvadosFile):
                             afile.add_segment(blocks, pos, size)
                         else: