5011: Add replication argument to CollectionWriter.
authorTom Clegg <tom@curoverse.com>
Fri, 30 Jan 2015 17:36:43 +0000 (12:36 -0500)
committerTom Clegg <tom@curoverse.com>
Fri, 30 Jan 2015 22:12:28 +0000 (17:12 -0500)
sdk/python/arvados/collection.py
sdk/python/arvados/keep.py
sdk/python/tests/test_collections.py

index d530f58b03e70f2983280bf673c937df0653669f..45ed093e2f82144bfc457850648aa11f0d4c3b92 100644 (file)
@@ -304,7 +304,7 @@ class _WriterFile(ArvadosFileBase):
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, num_retries=0, replication=0):
         """Instantiate a CollectionWriter.
 
         CollectionWriter lets you build a new Arvados Collection from scratch.
@@ -320,9 +320,13 @@ class CollectionWriter(CollectionBase):
           service requests.  Default 0.  You may change this value
           after instantiation, but note those changes may not
           propagate to related objects like the Keep client.
+        * replication: The number of copies of each block to store.
+          If this argument is 0 or not supplied, replication is
+          the server-provided default if available, otherwise 2.
         """
         self._api_client = api_client
         self.num_retries = num_retries
+        self.replication = (replication if replication>0 else 2)
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
@@ -477,7 +481,9 @@ class CollectionWriter(CollectionBase):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
-                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+                self._my_keep().put(
+                    data_buffer[0:self.KEEP_BLOCK_SIZE],
+                    copies=self.replication))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -552,8 +558,13 @@ class CollectionWriter(CollectionBase):
         self._current_file_name = None
 
     def finish(self):
-        # Store the manifest in Keep and return its locator.
-        return self._my_keep().put(self.manifest_text())
+        # Store the manifest in Keep and return its locator. Beware,
+        # this is only useful in special cases like storing manifest
+        # fragments temporarily in Keep during a Crunch job. In most
+        # cases you should make a collection instead, by sending
+        # manifest_text() to the API server's "create collection"
+        # endpoint.
+        return self._my_keep().put(self.manifest_text(), copies=self.replication)
 
     def portable_data_hash(self):
         stripped = self.stripped_manifest()
@@ -587,10 +598,9 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, **kwargs):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__(
-            api_client, num_retries=num_retries)
+        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
index 7c53339650f622260263cbded25e16622cf77189..46987897576edc498ff5417dbc690bc30865e37f 100644 (file)
@@ -739,7 +739,7 @@ class KeepClient(object):
     # Local storage methods need no-op num_retries arguments to keep
     # integration tests happy.  With better isolation they could
     # probably be removed again.
-    def local_store_put(self, data, num_retries=0):
+    def local_store_put(self, data, num_retries=0, copies=1):
         md5 = hashlib.md5(data).hexdigest()
         locator = '%s+%d' % (md5, len(data))
         with open(os.path.join(self.local_store, md5 + '.tmp'), 'w') as f:
@@ -748,7 +748,7 @@ class KeepClient(object):
                   os.path.join(self.local_store, md5))
         return locator
 
-    def local_store_get(self, loc_s, num_retries=0):
+    def local_store_get(self, loc_s, num_retries=0, copies=1):
         try:
             locator = KeepLocator(loc_s)
         except ValueError:
index c991154e7f669ff2e92dc80c3cafbf2a62309d86..d47d318b11b6ad72f2531fa8494e6b8c2605d8da 100644 (file)
@@ -536,6 +536,19 @@ class CollectionTestMixin(object):
         self.mock_keep_services(client, code, self.PROXY_RESPONSE)
         return client
 
+    def disk_services_available(self, number_of_disks):
+        return {
+            'items_available': number_of_disks,
+            'items': [{
+                'uuid': 'zzzzz-bi6l4-mockdisk%07d' % i,
+                'owner_uuid': 'zzzzz-tpzed-000000000000000',
+                'service_host': tutil.TEST_HOST,
+                'service_port': 65535-i,
+                'service_ssl_flag': True,
+                'service_type': 'disk',
+            } for i in range(0, number_of_disks)]
+        }
+
 
 @tutil.skip_sleep
 class CollectionReaderTestCase(unittest.TestCase, CollectionTestMixin):
@@ -703,8 +716,8 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
         return tutil.mock_put_responses(body, *codes, **headers)
 
     def foo_writer(self, **kwargs):
-        api_client = self.api_client_mock()
-        writer = arvados.CollectionWriter(api_client, **kwargs)
+        kwargs.setdefault('api_client', self.api_client_mock())
+        writer = arvados.CollectionWriter(**kwargs)
         writer.start_new_file('foo')
         writer.write('foo')
         return writer
@@ -720,6 +733,32 @@ class CollectionWriterTestCase(unittest.TestCase, CollectionTestMixin):
             with self.assertRaises(arvados.errors.KeepWriteError):
                 writer.finish()
 
+    def test_write_insufficient_replicas_via_proxy(self):
+        writer = self.foo_writer(replication=3)
+        with self.mock_keep(None, 200, headers={'x-keep-replicas-stored': 2}):
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.manifest_text()
+
+    def test_write_insufficient_replicas_via_disks(self):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, 200, self.disk_services_available(2))
+        writer = self.foo_writer(api_client=client, replication=3)
+        with self.mock_keep(
+                None, 200, 200,
+                **{'x-keep-replicas-stored': 1}) as keepmock:
+            with self.assertRaises(arvados.errors.KeepWriteError):
+                writer.manifest_text()
+
+    def test_write_three_replicas(self):
+        client = mock.MagicMock(name='api_client')
+        self.mock_keep_services(client, 200, self.disk_services_available(6))
+        writer = self.foo_writer(api_client=client, replication=3)
+        with self.mock_keep(
+                None, 200, 500, 200, 500, 200, 200,
+                **{'x-keep-replicas-stored': 1}) as keepmock:
+            writer.manifest_text()
+            self.assertEqual(5, keepmock.call_count)
+
     def test_write_whole_collection_through_retries(self):
         writer = self.foo_writer(num_retries=2)
         with self.mock_keep(self.DEFAULT_DATA_HASH,