9463: On failure, commit the cache file too, before exiting. Wrote missing test about...
authorLucas Di Pentima <lucas@curoverse.com>
Fri, 5 Aug 2016 18:14:04 +0000 (15:14 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Fri, 5 Aug 2016 18:14:04 +0000 (15:14 -0300)
sdk/python/arvados/commands/put.py
sdk/python/tests/test_arv_put.py

index b2c40f12f8c361ae988dde139c2b26b830adb372..43e3813255faf02025c99acc3349bdfdc75399fc 100644 (file)
@@ -334,12 +334,13 @@ class ArvPutUploadJob(object):
             # Stop the thread before doing anything else
             self._stop_checkpointer.set()
             self._checkpointer.join()
-        # Successful upload, one last _update()
-        self._update()
-        if self.resume:
-            self._cache_file.close()
-            # Correct the final written bytes count
-            self.bytes_written -= self.bytes_skipped
+            # Commit all & one last _update()
+            self.manifest_text()
+            self._update()
+            if self.resume:
+                self._cache_file.close()
+                # Correct the final written bytes count
+                self.bytes_written -= self.bytes_skipped
 
     def save_collection(self):
         with self._collection_lock:
@@ -387,7 +388,8 @@ class ArvPutUploadJob(object):
             # Update cache, if resume enabled
             if self.resume:
                 with self._state_lock:
-                    self._state['manifest'] = self._my_collection().manifest_text()
+                    # Get the manifest text without comitting pending blocks
+                    self._state['manifest'] = self._my_collection()._get_manifest_text(".", strip=False, normalize=False)
         if self.resume:
             self._save_state()
         # Call the reporter, if any
@@ -470,7 +472,7 @@ class ArvPutUploadJob(object):
                     output = self._my_collection().open(filename, 'w')
             self._write(source_fd, output)
             output.close()
-
+            
     def _write(self, source_fd, output):
         while True:
             data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
index 7d8d790949def3a8c0871add14166d5bd784486f..52063f2a153c6f23de0c692c7823b0b2af93f5e9 100644 (file)
@@ -21,6 +21,7 @@ from cStringIO import StringIO
 
 import arvados
 import arvados.commands.put as arv_put
+import arvados_testutil as tutil
 
 from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
 import run_test_server
@@ -242,8 +243,6 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
     def setUp(self):
         super(ArvPutUploadJobTest, self).setUp()
         run_test_server.authorize_with('active')
-        self.exit_lock = threading.Lock()
-        self.save_manifest_lock = threading.Lock()
         # Temp files creation
         self.tempdir = tempfile.mkdtemp()
         subdir = os.path.join(self.tempdir, 'subdir')
@@ -254,7 +253,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
                 f.write(data * i)
         with open(os.path.join(subdir, 'otherfile'), 'w') as f:
             f.write(data * 5)
-        # For large file resuming test
+        # Large temp file for resume test
         _, self.large_file_name = tempfile.mkstemp()
         fileobj = open(self.large_file_name, 'w')
         # Make sure to write just a little more than one block
@@ -262,6 +261,7 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
             data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
             fileobj.write(data)
         fileobj.close()
+        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
 
     def tearDown(self):
         super(ArvPutUploadJobTest, self).tearDown()
@@ -311,64 +311,29 @@ class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
         self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
 
     def test_resume_large_file_upload(self):
-        # Proxying ArvadosFile.writeto() method to be able to synchronize it
-        # with partial manifest saves
-        orig_writeto_func = getattr(arvados.arvfile.ArvadosFile, 'writeto')
-        orig_update_func = getattr(arv_put.ArvPutUploadJob, '_update')
-        def wrapped_update(*args, **kwargs):
-            job_instance = args[0]
-            orig_update_func(*args, **kwargs)
-            with self.save_manifest_lock:
-                # Allow abnormal termination when first block written
-                if job_instance._collection_size(job_instance._my_collection()) == arvados.config.KEEP_BLOCK_SIZE:
-                    self.exit_lock.release()
-        def wrapped_writeto(*args, **kwargs):
-            data = args[2]
+        def wrapped_write(*args, **kwargs):
+            data = args[1]
+            # Exit only on last block
             if len(data) < arvados.config.KEEP_BLOCK_SIZE:
-                # Lock on the last block write call, waiting for the
-                # manifest to be saved
-                with self.exit_lock:
-                    raise SystemExit('Test exception')
-            ret = orig_writeto_func(*args, **kwargs)
-            self.save_manifest_lock.release()
-            return ret
-        setattr(arvados.arvfile.ArvadosFile, 'writeto', wrapped_writeto)
-        setattr(arv_put.ArvPutUploadJob, '_update', wrapped_update)
-        # MD5 hash of random data to be uploaded
-        md5_original = hashlib.md5()
-        with open(self.large_file_name, 'r') as f:
-            data = f.read()
-            md5_original.update(data)
-        self.exit_lock.acquire()
-        self.save_manifest_lock.acquire()
-        writer = arv_put.ArvPutUploadJob([self.large_file_name],
-                                         update_time=0.1)
-        # First upload: partially completed with simulated error
-        try:
-            self.assertRaises(SystemExit, writer.start())
-        except SystemExit:
-            # Avoid getting a ResumeCacheConflict on the 2nd run
-            writer._cache_file.close()
-        self.assertGreater(writer.bytes_written, 0)
-        self.assertLess(writer.bytes_written,
-                        os.path.getsize(self.large_file_name))
-
-        # Restore the ArvadosFile.writeto() method to before retrying
-        setattr(arvados.arvfile.ArvadosFile, 'writeto', orig_writeto_func)
-        # Restore the ArvPutUploadJob._update() method to before retrying
-        setattr(arv_put.ArvPutUploadJob, '_update', orig_update_func)
-        writer_new = arv_put.ArvPutUploadJob([self.large_file_name])
-        writer_new.start()
-        writer_new.destroy_cache()
-        self.assertEqual(os.path.getsize(self.large_file_name),
-                         writer.bytes_written + writer_new.bytes_written)
-        # Read the uploaded file to compare its md5 hash
-        md5_uploaded = hashlib.md5()
-        c = arvados.collection.Collection(writer_new.manifest_text())
-        with c.open(os.path.basename(self.large_file_name), 'r') as f:
-            new_data = f.read()
-            md5_uploaded.update(new_data)
-        self.assertEqual(md5_original.hexdigest(), md5_uploaded.hexdigest())
+                raise SystemExit("Simulated error")
+            return self.arvfile_write(*args, **kwargs)
+
+        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
+                        autospec=True) as mocked_write:
+            mocked_write.side_effect = wrapped_write
+            writer = arv_put.ArvPutUploadJob([self.large_file_name],
+                                             replication_desired=1)
+            with self.assertRaises(SystemExit):
+                writer.start()
+                self.assertLess(writer.bytes_written,
+                                os.path.getsize(self.large_file_name))
+        # Retry the upload
+        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
+                                          replication_desired=1)
+        writer2.start()
+        self.assertEqual(writer.bytes_written + writer2.bytes_written,
+                         os.path.getsize(self.large_file_name))
+        writer2.destroy_cache()
 
 
 class ArvadosExpectedBytesTest(ArvadosBaseTestCase):