Merge branch '9463-pysdk-amended'
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 26 Aug 2016 21:04:45 +0000 (18:04 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 26 Aug 2016 21:04:45 +0000 (18:04 -0300)
Refs #9463

sdk/cwl/tests/test_submit.py
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py

index 2dff3e8f9dbe422a7ca6e95037384cc4618c920b..21be5c71b9f904668c4813f57958edca4216e134 100644 (file)
@@ -156,8 +156,9 @@ class TestSubmit(unittest.TestCase):
             mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
                             '0:0:blub.txt 0:0:submit_tool.cwl\n',
                             'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
-                            'name': 'New collection'},
-                      ensure_unique_name=True),
+                            'name': 'New collection',
+                            'replication_desired': None,
+            }, ensure_unique_name=True),
             mock.call().execute(num_retries=4),
             mock.call(body={
                 'manifest_text':
@@ -215,8 +216,9 @@ class TestSubmit(unittest.TestCase):
             mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
                             '0:0:blub.txt 0:0:submit_tool.cwl\n',
                             'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
-                            'name': 'New collection'},
-                      ensure_unique_name=True),
+                            'name': 'New collection',
+                            'replication_desired': None,
+            }, ensure_unique_name=True),
             mock.call().execute(num_retries=4),
             mock.call(body={
                 'manifest_text':
index b78c63e301b81d5ddb2644983e2e83017e98bbdf..f2f7df2dce2121b0c0c0e4b562f7c7963f1fc0ab 100644 (file)
@@ -402,7 +402,7 @@ class _BlockManager(object):
     DEFAULT_PUT_THREADS = 2
     DEFAULT_GET_THREADS = 2
 
-    def __init__(self, keep):
+    def __init__(self, keep, copies=None):
         """keep: KeepClient object to use"""
         self._keep = keep
         self._bufferblocks = {}
@@ -414,6 +414,7 @@ class _BlockManager(object):
         self.prefetch_enabled = True
         self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
         self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
+        self.copies = copies
 
     @synchronized
     def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
@@ -464,7 +465,10 @@ class _BlockManager(object):
                 if bufferblock is None:
                     return
 
-                loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
                 bufferblock.set_state(_BufferBlock.COMMITTED, loc)
 
             except Exception as e:
@@ -577,7 +581,10 @@ class _BlockManager(object):
 
         if sync:
             try:
-                loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                if self.copies is None:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
+                else:
+                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
                 block.set_state(_BufferBlock.COMMITTED, loc)
             except Exception as e:
                 block.set_state(_BufferBlock.ERROR, e)
index 8450bd1ca086687928c43e09b2d497ae598c5128..56d8b239331a8f65e8b5781376719244c03b2905 100644 (file)
@@ -922,7 +922,7 @@ class RichCollectionBase(CollectionBase):
         return self._get_manifest_text(stream_name, strip, normalize)
 
     @synchronized
-    def _get_manifest_text(self, stream_name, strip, normalize):
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
         """Get the manifest text for this collection, sub collections and files.
 
         :stream_name:
@@ -938,6 +938,9 @@ class RichCollectionBase(CollectionBase):
           is not modified, return the original manifest text even if it is not
           in normalized form.
 
+        :only_committed:
+          If True, only include blocks that were already committed to Keep.
+
         """
 
         if not self.committed() or self._manifest_text is None or normalize:
@@ -951,6 +954,8 @@ class RichCollectionBase(CollectionBase):
                 for segment in arvfile.segments():
                     loc = segment.locator
                     if arvfile.parent._my_block_manager().is_bufferblock(loc):
+                        if only_committed:
+                            continue
                         loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
                     if strip:
                         loc = KeepLocator(loc).stripped()
@@ -1135,7 +1140,8 @@ class Collection(RichCollectionBase):
                  num_retries=None,
                  parent=None,
                  apiconfig=None,
-                 block_manager=None):
+                 block_manager=None,
+                 replication_desired=None):
         """Collection constructor.
 
         :manifest_locator_or_text:
@@ -1143,24 +1149,35 @@ class Collection(RichCollectionBase):
           a manifest, raw manifest text, or None (to create an empty collection).
         :parent:
           the parent Collection, may be None.
+
         :apiconfig:
           A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
           Prefer this over supplying your own api_client and keep_client (except in testing).
           Will use default config settings if not specified.
+
         :api_client:
           The API client object to use for requests.  If not specified, create one using `apiconfig`.
+
         :keep_client:
           the Keep client to use for requests.  If not specified, create one using `apiconfig`.
+
         :num_retries:
           the number of retries for API and Keep requests.
+
         :block_manager:
           the block manager to use.  If not specified, create one.
 
+        :replication_desired:
+          How many copies should Arvados maintain. If None, API server default
+          configuration applies. If not None, this value will also be used
+          for determining the number of block copies being written.
+
         """
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
+        self.replication_desired = replication_desired
 
         if apiconfig:
             self._config = apiconfig
@@ -1232,7 +1249,8 @@ class Collection(RichCollectionBase):
     def _my_api(self):
         if self._api_client is None:
             self._api_client = ThreadSafeApiCache(self._config)
-            self._keep_client = self._api_client.keep
+            if self._keep_client is None:
+                self._keep_client = self._api_client.keep
         return self._api_client
 
     @synchronized
@@ -1247,7 +1265,10 @@ class Collection(RichCollectionBase):
     @synchronized
     def _my_block_manager(self):
         if self._block_manager is None:
-            self._block_manager = _BlockManager(self._my_keep())
+            copies = (self.replication_desired or
+                      self._my_api()._rootDesc.get('defaultCollectionReplication',
+                                                   2))
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies)
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1267,6 +1288,10 @@ class Collection(RichCollectionBase):
                 uuid=self._manifest_locator).execute(
                     num_retries=self.num_retries))
             self._manifest_text = self._api_response['manifest_text']
+            # If not overriden via kwargs, we should try to load the
+            # replication_desired from the API server
+            if self.replication_desired is None:
+                self.replication_desired = self._api_response.get('replication_desired', None)
             return None
         except Exception as e:
             return e
@@ -1477,7 +1502,8 @@ class Collection(RichCollectionBase):
                 ensure_unique_name = True
 
             body = {"manifest_text": text,
-                    "name": name}
+                    "name": name,
+                    "replication_desired": self.replication_desired}
             if owner_uuid:
                 body["owner_uuid"] = owner_uuid
 
index ea8661437c04f3c0d2dec8d4a01c0c8176a2e87c..6b3562602aa69601021b04d93a116c04972abab5 100644 (file)
@@ -28,7 +28,7 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def get_from_cache(self, locator):
             self.requests.append(locator)
             return self.blocks.get(locator)
-        def put(self, data, num_retries=None):
+        def put(self, data, num_retries=None, copies=None):
             pdh = tutil.str_keep_locator(data)
             self.blocks[pdh] = str(data)
             return pdh
@@ -37,6 +37,11 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         def __init__(self, b, r):
             self.body = b
             self.response = r
+            self._schema = ArvadosFileWriterTestCase.MockApi.MockSchema()
+            self._rootDesc = {}
+        class MockSchema(object):
+            def __init__(self):
+                self.schemas = {'Collection': {'properties': {'replication_desired': {'type':'integer'}}}}
         class MockCollections(object):
             def __init__(self, b, r):
                 self.body = b
@@ -59,7 +64,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_truncate(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_truncate",
-                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"},
+                                                 "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text":". 781e5e245d69b566979b86e28d23f2c7+10 0:8:count.txt\n"})
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -86,7 +92,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_write_to_end(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_append",
-                                                 "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"},
+                                                 "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text": ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:13:count.txt\n"})
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -222,7 +229,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_write_large(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
-                                                 "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"},
+                                                 "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text": ". a5de24f4417cfba9d5825eadc2f4ca49+67108000 598cc1a4ccaef8ab6e4724d87e675d78+32892000 0:100000000:count.txt\n"})
         with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
@@ -313,7 +321,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_write_large_rewrite(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_write_large",
-                                                 "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"},
+                                                 "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text": ". 37400a68af9abdd76ca5bf13e819e42a+32892003 a5de24f4417cfba9d5825eadc2f4ca49+67108000 32892000:3:count.txt 32892006:67107997:count.txt 0:32892000:count.txt\n"})
         with Collection('. ' + arvados.config.EMPTY_BLOCK_LOCATOR + ' 0:0:count.txt',
@@ -335,7 +344,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_create(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_create",
-                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
         with Collection(api_client=api, keep_client=keep) as c:
@@ -356,7 +366,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_create_subdir(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_create",
-                                                 "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+                                                 "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text":"./foo/bar 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
         with Collection(api_client=api, keep_client=keep) as c:
@@ -371,7 +382,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_overwrite(self):
         keep = ArvadosFileWriterTestCase.MockKeep({"781e5e245d69b566979b86e28d23f2c7+10": "0123456789"})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_overwrite",
-                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"},
+                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 0:8:count.txt\n"})
         with Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n',
@@ -400,7 +412,8 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
     def test_create_multiple(self):
         keep = ArvadosFileWriterTestCase.MockKeep({})
         api = ArvadosFileWriterTestCase.MockApi({"name":"test_create_multiple",
-                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"},
+                                                 "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n",
+                                                 "replication_desired":None},
                                                 {"uuid":"zzzzz-4zz18-mockcollection0",
                                                  "manifest_text":". 2e9ec317e197819358fbc43afca7d837+8 e8dc4081b13434b45189a720b77b6818+8 0:8:count1.txt 8:8:count2.txt\n"})
         with Collection(api_client=api, keep_client=keep) as c:
index ff0d6847ceeb9c0e7d01ab09d929dacae8d37d80..cf8f23e375f66835aca09c1b2085113da1cf7c67 100644 (file)
@@ -804,6 +804,24 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
 
 class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
 
+    def test_replication_desired_kept_on_load(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, replication_desired=1)
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc)
+        self.assertEqual(c1.manifest_text, c2.manifest_text)
+        self.assertEqual(c1.replication_desired, c2.replication_desired)
+
+    def test_replication_desired_not_loaded_if_provided(self):
+        m = '. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count1.txt 0:10:count2.txt\n'
+        c1 = Collection(m, replication_desired=1)
+        c1.save_new()
+        loc = c1.manifest_locator()
+        c2 = Collection(loc, replication_desired=2)
+        self.assertEqual(c1.manifest_text, c2.manifest_text)
+        self.assertNotEqual(c1.replication_desired, c2.replication_desired)
+
     def test_init_manifest(self):
         m1 = """. 5348b82a029fd9e971a811ce1f71360b+43 0:43:md5sum.txt
 . 085c37f02916da1cad16f93c54d899b7+41 0:41:md5sum.txt
@@ -1064,6 +1082,30 @@ class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
         self.assertEqual(c1["count1.txt"].size(), 0)
 
 
+class NewCollectionTestCaseWithServers(run_test_server.TestCaseWithServers):
+    def test_get_manifest_text_only_committed(self):
+        c = Collection()
+        with c.open("count.txt", "w") as f:
+            # One file committed
+            with c.open("foo.txt", "w") as foo:
+                foo.write("foo")
+            f.write("0123456789")
+            # Other file not committed. Block not written to keep yet.
+            self.assertEqual(
+                c._get_manifest_text(".",
+                                     strip=False,
+                                     normalize=False,
+                                     only_committed=True),
+                '. acbd18db4cc2f85cedef654fccc4a4d8+3 0:0:count.txt 0:3:foo.txt\n')
+        # And now with the file closed...
+        self.assertEqual(
+            c._get_manifest_text(".",
+                                 strip=False,
+                                 normalize=False,
+                                 only_committed=True),
+            ". 781e5e245d69b566979b86e28d23f2c7+10 acbd18db4cc2f85cedef654fccc4a4d8+3 0:10:count.txt 10:3:foo.txt\n")
+
+
 class CollectionCreateUpdateTest(run_test_server.TestCaseWithServers):
     MAIN_SERVER = {}
     KEEP_SERVER = {}