8460: Use new ws server for integration tests only if ARVADOS_TEST_EXPERIMENTAL_WS...
[arvados.git] / sdk / python / arvados / collection.py
index 93ee167fa42210382dff8af9219a1332050693d2..27aad033ae55523de4fd14ae7bf9cf8a7d2b654f 100644 (file)
@@ -307,7 +307,8 @@ class CollectionWriter(CollectionBase):
     def set_current_stream_name(self, newstreamname):
         if re.search(r'[\t\n]', newstreamname):
             raise errors.AssertionError(
-                "Manifest stream names cannot contain whitespace")
+                "Manifest stream names cannot contain whitespace: '%s'" %
+                (newstreamname))
         self._current_stream_name = '.' if newstreamname=='' else newstreamname
 
     def current_stream_name(self):
@@ -556,7 +557,7 @@ class RichCollectionBase(CollectionBase):
                 if isinstance(item, RichCollectionBase):
                     return item.find_or_create(pathcomponents[1], create_type)
                 else:
-                    raise IOError(errno.ENOTDIR, "Not a directory: '%s'" % pathcomponents[0])
+                    raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
         else:
             return self
 
@@ -582,7 +583,7 @@ class RichCollectionBase(CollectionBase):
                 else:
                     return item
             else:
-                raise IOError(errno.ENOTDIR, "Is not a directory: %s" % pathcomponents[0])
+                raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
 
     @synchronized
     def mkdirs(self, path):
@@ -594,7 +595,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if self.find(path) != None:
-            raise IOError(errno.EEXIST, "Directory or file exists: '%s'" % path)
+            raise IOError(errno.EEXIST, "Directory or file exists", path)
 
         return self.find_or_create(path, COLLECTION)
 
@@ -630,9 +631,9 @@ class RichCollectionBase(CollectionBase):
             arvfile = self.find(path)
 
         if arvfile is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if not isinstance(arvfile, ArvadosFile):
-            raise IOError(errno.EISDIR, "Is a directory: %s" % path)
+            raise IOError(errno.EISDIR, "Is a directory", path)
 
         if mode[0] == "w":
             arvfile.truncate(0)
@@ -732,10 +733,10 @@ class RichCollectionBase(CollectionBase):
         pathcomponents = path.split("/", 1)
         item = self._items.get(pathcomponents[0])
         if item is None:
-            raise IOError(errno.ENOENT, "File not found")
+            raise IOError(errno.ENOENT, "File not found", path)
         if len(pathcomponents) == 1:
             if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
-                raise IOError(errno.ENOTEMPTY, "Subcollection not empty")
+                raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
             deleteditem = self._items[pathcomponents[0]]
             del self._items[pathcomponents[0]]
             self._committed = False
@@ -773,7 +774,7 @@ class RichCollectionBase(CollectionBase):
         """
 
         if target_name in self and not overwrite:
-            raise IOError(errno.EEXIST, "File already exists")
+            raise IOError(errno.EEXIST, "File already exists", target_name)
 
         modified_from = None
         if target_name in self:
@@ -802,7 +803,7 @@ class RichCollectionBase(CollectionBase):
         if isinstance(source, basestring):
             source_obj = source_collection.find(source)
             if source_obj is None:
-                raise IOError(errno.ENOENT, "File not found")
+                raise IOError(errno.ENOENT, "File not found", source)
             sourcecomponents = source.split("/")
         else:
             source_obj = source
@@ -826,7 +827,7 @@ class RichCollectionBase(CollectionBase):
                 target_dir = self
 
         if target_dir is None:
-            raise IOError(errno.ENOENT, "Target directory not found.")
+            raise IOError(errno.ENOENT, "Target directory not found", target_name)
 
         if target_name in target_dir and isinstance(self[target_name], RichCollectionBase) and sourcecomponents:
             target_dir = target_dir[target_name]
@@ -881,7 +882,7 @@ class RichCollectionBase(CollectionBase):
 
         source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
         if not source_obj.writable():
-            raise IOError(errno.EROFS, "Source collection is read only.")
+            raise IOError(errno.EROFS, "Source collection is read only", source)
         target_dir.add(source_obj, target_name, overwrite, True)
 
     def portable_manifest_text(self, stream_name="."):
@@ -922,7 +923,7 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -938,6 +939,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -951,6 +955,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -1135,7 +1141,8 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1143,24 +1150,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
@@ -1232,7 +1250,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
@@ -1247,7 +1266,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1267,6 +1289,10 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
@@ -1477,12 +1503,12 @@ class Collection(RichCollectionBase):
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
             self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
-            print self._api_response
             text = self._api_response["manifest_text"]
 
             self._manifest_locator = self._api_response["uuid"]