From 6d23a7362308b808a10b698c84a022287d1668a6 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Thu, 29 May 2014 16:36:14 -0400 Subject: [PATCH] 2752: Resumed collection writer doesn't do_queued_work immediately. As noted in the comments, callers that build a writer from resumed state must do_queued_work on it before anything else. But this split makes it easier to treat initialization problems and work problems separately, which is critical. This required refactor progress reporting a bit. --- sdk/python/arvados/collection.py | 17 +++++++++-------- sdk/python/arvados/commands/put.py | 29 ++++++++++++++++------------- sdk/python/tests/test_arv-put.py | 9 ++------- 3 files changed, 27 insertions(+), 28 deletions(-) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 29c44b416d..e4c008efb8 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -171,7 +171,7 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def _do_queued_work(self): + def do_queued_work(self): # The work queue consists of three pieces: # * _queued_file: The file object we're currently writing to the # Collection. @@ -251,12 +251,12 @@ class CollectionWriter(object): def write_file(self, source, filename=None): self._queue_file(source, filename) - self._do_queued_work() + self.do_queued_work() def write_directory_tree(self, path, stream_name='.', max_manifest_depth=-1): self._queue_tree(path, stream_name, max_manifest_depth) - self._do_queued_work() + self.do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): @@ -380,6 +380,12 @@ class ResumableCollectionWriter(CollectionWriter): @classmethod def from_state(cls, state, *init_args, **init_kwargs): + # Try to build a new writer from scratch with the given state. + # If the state is not suitable to resume (because files have changed, + # been deleted, aren't predictable, etc.), raise a + # StaleWriterStateError. Otherwise, return the initialized writer. + # The caller is responsible for calling writer.do_queued_work() + # appropriately after it's returned. writer = cls(*init_args, **init_kwargs) for attr_name in cls.STATE_PROPS: attr_value = state[attr_name] @@ -403,13 +409,8 @@ class ResumableCollectionWriter(CollectionWriter): except IOError as error: raise errors.StaleWriterStateError( "failed to reopen active file {}: {}".format(path, error)) - writer.preresume_hook() - writer._do_queued_work() return writer - def preresume_hook(self): - pass # Subclasses can override this as desired. - def check_dependencies(self): for path, orig_stat in self._dependencies.items(): if not S_ISREG(orig_stat[ST_MODE]): diff --git a/sdk/python/arvados/commands/put.py b/sdk/python/arvados/commands/put.py index 86670266ae..912224aa83 100644 --- a/sdk/python/arvados/commands/put.py +++ b/sdk/python/arvados/commands/put.py @@ -227,10 +227,7 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): self.bytes_written = 0 self._seen_inputs = [] self.cache = cache - if reporter is None: - self.report_progress = lambda bytes_w, bytes_e: None - else: - self.report_progress = reporter + self.reporter = reporter self.bytes_expected = bytes_expected super(ArvPutCollectionWriter, self).__init__() @@ -246,10 +243,6 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): else: return writer - def preresume_hook(self): - print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option." - self.report_progress(self.bytes_written, self.bytes_expected) - def cache_state(self): state = self.dump_state() # Transform attributes for serialization. @@ -260,11 +253,17 @@ class ArvPutCollectionWriter(arvados.ResumableCollectionWriter): state[attr] = list(value) self.cache.save(state) + def report_progress(self): + if self.reporter is not None: + self.reporter(self.bytes_written, self.bytes_expected) + def flush_data(self): bytes_buffered = self._data_buffer_len super(ArvPutCollectionWriter, self).flush_data() - self.bytes_written += (bytes_buffered - self._data_buffer_len) - self.report_progress(self.bytes_written, self.bytes_expected) + # Checkpoint and report progress if data was PUT to Keep. + if self._data_buffer_len < start_buffer_len: + self.bytes_written += (start_buffer_len - self._data_buffer_len) + self.report_progress() def _record_new_input(self, input_type, source_name, dest_name): # The key needs to be a list because that's what we'll get back @@ -338,10 +337,14 @@ def main(arguments=None): print "arv-put: Another process is already uploading this data." sys.exit(1) - try: - writer = ArvPutCollectionWriter.from_cache( - resume_cache, reporter, expected_bytes_for(args.paths)) + writer = ArvPutCollectionWriter.from_cache( + resume_cache, reporter, expected_bytes_for(args.paths)) + if writer.bytes_written > 0: # We're resuming a previous upload. + print >>sys.stderr, "arv-put: Resuming previous upload. Bypass with the --no-resume option." + writer.report_progress() + try: + writer.do_queued_work() # Do work resumed from cache. # Copy file data to Keep. for path in args.paths: if os.path.isdir(path): diff --git a/sdk/python/tests/test_arv-put.py b/sdk/python/tests/test_arv-put.py index dde42e6f0f..41fb33041a 100644 --- a/sdk/python/tests/test_arv-put.py +++ b/sdk/python/tests/test_arv-put.py @@ -265,13 +265,8 @@ class ArvadosPutCollectionWriterTest(ArvadosKeepLocalStoreTestCase): cwriter.write_file(testfile.name, 'test') cwriter.finish_current_stream() cwriter.cache_state() - # Restore a writer from that state and check its progress report. - # We're also checking that progress is reported immediately after - # resuming. - progression, reporter = self.make_progress_tester() - new_writer = arv_put.ArvPutCollectionWriter.from_cache( - self.cache, reporter, bytes_expected=4) - self.assertIn((4, 4), progression) + new_writer = arv_put.ArvPutCollectionWriter.from_cache(self.cache) + self.assertEqual(new_writer.bytes_written, 4) class ArvadosExpectedBytesTest(ArvadosBaseTestCase): -- 2.39.5