20933: Use [0-9] instead of \d in regex
[arvados.git] / sdk / python / arvados / collection.py
index 1744cc7be6db20aaaf11ac0917a7db447c8637b4..bfb43be5eb85401e332915419f2a52ea71eb2e19 100644 (file)
@@ -37,21 +37,6 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
-
-if sys.version_info >= (3, 0):
-    TextIOWrapper = io.TextIOWrapper
-else:
-    class TextIOWrapper(io.TextIOWrapper):
-        """To maintain backward compatibility, cast str to unicode in
-        write('foo').
-
-        """
-        def write(self, data):
-            if isinstance(data, basestring):
-                data = unicode(data)
-            return super(TextIOWrapper, self).write(data)
-
-
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -114,6 +99,7 @@ class _WriterFile(_FileLikeObjectBase):
 class CollectionWriter(CollectionBase):
     """Deprecated, use Collection instead."""
 
+    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
     def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
@@ -427,6 +413,7 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
+    @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
     def __init__(self, api_client=None, **kwargs):
         self._dependencies = {}
         super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
@@ -721,7 +708,7 @@ class RichCollectionBase(CollectionBase):
         f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
         if 'b' not in mode:
             bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
-            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+            f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
         return f
 
     def modified(self):
@@ -827,7 +814,7 @@ class RichCollectionBase(CollectionBase):
             self.set_committed(False)
             self.notify(DEL, self, pathcomponents[0], deleteditem)
         else:
-            item.remove(pathcomponents[1])
+            item.remove(pathcomponents[1], recursive=recursive)
 
     def _clonefrom(self, source):
         for k,v in listitems(source):
@@ -1256,7 +1243,7 @@ class Collection(RichCollectionBase):
     def __init__(self, manifest_locator_or_text=None,
                  api_client=None,
                  keep_client=None,
-                 num_retries=None,
+                 num_retries=10,
                  parent=None,
                  apiconfig=None,
                  block_manager=None,
@@ -1307,6 +1294,11 @@ class Collection(RichCollectionBase):
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
+
+        # Use the keep client from ThreadSafeApiCache
+        if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
+            self._keep_client = self._api_client.keep
+
         self._block_manager = block_manager
         self.replication_desired = replication_desired
         self._storage_classes_desired = storage_classes_desired
@@ -1317,7 +1309,7 @@ class Collection(RichCollectionBase):
         else:
             self._config = config.settings()
 
-        self.num_retries = num_retries if num_retries is not None else 0
+        self.num_retries = num_retries
         self._manifest_locator = None
         self._manifest_text = None
         self._portable_data_hash = None
@@ -1395,7 +1387,7 @@ class Collection(RichCollectionBase):
                 # our tokens.
                 return
             else:
-                self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
+                self._remember_api_response(response)
             other = CollectionReader(response["manifest_text"])
         baseline = CollectionReader(self._manifest_text)
         self.apply(baseline.diff(other))
@@ -1404,7 +1396,7 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_api(self):
         if self._api_client is None:
-            self._api_client = ThreadSafeApiCache(self._config)
+            self._api_client = ThreadSafeApiCache(self._config, version='v1')
             if self._keep_client is None:
                 self._keep_client = self._api_client.keep
         return self._api_client
@@ -1424,7 +1416,11 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
+            self._block_manager = _BlockManager(self._my_keep(),
+                                                copies=copies,
+                                                put_threads=self.put_threads,
+                                                num_retries=self.num_retries,
+                                                storage_classes_func=self.storage_classes_desired)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1546,7 +1542,8 @@ class Collection(RichCollectionBase):
              storage_classes=None,
              trash_at=None,
              merge=True,
-             num_retries=None):
+             num_retries=None,
+             preserve_version=False):
         """Save collection to an existing collection record.
 
         Commit pending buffer blocks to Keep, merge with remote record (if
@@ -1576,6 +1573,13 @@ class Collection(RichCollectionBase):
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
+        :preserve_version:
+          If True, indicate that the collection content being saved right now
+          should be preserved in a version snapshot if the collection record is
+          updated in the future. Requires that the API server has
+          Collections.CollectionVersioning enabled, if not, setting this will
+          raise an exception.
+
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
@@ -1588,6 +1592,9 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
         body={}
         if properties:
             body["properties"] = properties
@@ -1596,6 +1603,8 @@ class Collection(RichCollectionBase):
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
+        if preserve_version:
+            body["preserve_version"] = preserve_version
 
         if not self.committed():
             if self._has_remote_blocks:
@@ -1641,7 +1650,8 @@ class Collection(RichCollectionBase):
                  storage_classes=None,
                  trash_at=None,
                  ensure_unique_name=False,
-                 num_retries=None):
+                 num_retries=None,
+                 preserve_version=False):
         """Save collection to a new collection record.
 
         Commit pending buffer blocks to Keep and, when create_collection_record
@@ -1680,6 +1690,13 @@ class Collection(RichCollectionBase):
         :num_retries:
           Retry count on API calls (if None,  use the collection default)
 
+        :preserve_version:
+          If True, indicate that the collection content being saved right now
+          should be preserved in a version snapshot if the collection record is
+          updated in the future. Requires that the API server has
+          Collections.CollectionVersioning enabled, if not, setting this will
+          raise an exception.
+
         """
         if properties and type(properties) is not dict:
             raise errors.ArgumentError("properties must be dictionary type.")
@@ -1690,6 +1707,9 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
+            raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
+
         if self._has_remote_blocks:
             # Copy any remote blocks to the local cluster.
             self._copy_remote_blocks(remote_blocks={})
@@ -1718,6 +1738,8 @@ class Collection(RichCollectionBase):
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
+            if preserve_version:
+                body["preserve_version"] = preserve_version
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
             text = self._api_response["manifest_text"]
@@ -1735,7 +1757,7 @@ class Collection(RichCollectionBase):
     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
 
     def _unescape_manifest_path(self, path):
-        return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+        return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
 
     @synchronized
     def _import_manifest(self, manifest_text):
@@ -1790,7 +1812,13 @@ class Collection(RichCollectionBase):
                             self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
                         filepath = os.path.join(stream_name, name)
-                        afile = self.find_or_create(filepath, FILE)
+                        try:
+                            afile = self.find_or_create(filepath, FILE)
+                        except IOError as e:
+                            if e.errno == errno.ENOTDIR:
+                                raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
+                            else:
+                                raise e from None
                         if isinstance(afile, ArvadosFile):
                             afile.add_segment(blocks, pos, size)
                         else:
@@ -1930,11 +1958,14 @@ class CollectionReader(Collection):
 
         self._streams = [normalize_stream(s, streams[s])
                          for s in sorted(streams)]
+
+    @arvados.util._deprecated('3.0', 'Collection iteration')
     @_populate_streams
     def all_streams(self):
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
                 for s in self._streams]
 
+    @arvados.util._deprecated('3.0', 'Collection iteration')
     @_populate_streams
     def all_files(self):
         for s in self.all_streams():