2752: Add arvados.collections.ResumableCollectionWriter.
authorBrett Smith <brett@curoverse.com>
Fri, 30 May 2014 14:40:07 +0000 (10:40 -0400)
committerBrett Smith <brett@curoverse.com>
Fri, 30 May 2014 14:40:07 +0000 (10:40 -0400)
This is a subclass of CollectionWriter that only accepts data from the
filesystem.  In exchange, it can record its own state, and resume
writing from one of those states.  arv-put will use this to make the
user experience nicer if a long upload is interrupted.

sdk/python/arvados/collection.py
sdk/python/arvados/errors.py
sdk/python/test_collections.py

index cbf8a8512215edf4f112f48776218767694d2c92..814bd75a1ed0d0ccaad6c362385eac406660773e 100644 (file)
@@ -19,6 +19,7 @@ import time
 import threading
 
 from collections import deque
+from stat import *
 
 from keep import *
 from stream import *
@@ -193,6 +194,11 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
+            self.checkpoint_state()
+
+    def checkpoint_state(self):
+        # Subclasses can implement this method to, e.g., report or record state.
+        pass
 
     def _work_file(self):
         while True:
@@ -208,6 +214,8 @@ class CollectionWriter(object):
 
     def _work_dirents(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
+        if stream_name != self.current_stream_name():
+            self.start_new_stream(stream_name)
         while self._queued_dirents:
             dirent = self._queued_dirents.popleft()
             target = os.path.join(path, dirent)
@@ -242,7 +250,6 @@ class CollectionWriter(object):
     def _queue_dirents(self, stream_name, dirents):
         assert (not self._queued_dirents), "tried to queue more than one tree"
         self._queued_dirents = deque(sorted(dirents))
-        self.start_new_stream(stream_name)
 
     def _queue_tree(self, path, stream_name, max_manifest_depth):
         self._queued_trees.append((path, stream_name, max_manifest_depth))
@@ -273,6 +280,7 @@ class CollectionWriter(object):
             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
+            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
@@ -363,3 +371,86 @@ class CollectionWriter(object):
         for name, locators, files in self._finished_streams:
             ret += locators
         return ret
+
+
+class ResumableCollectionWriter(CollectionWriter):
+    STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+                   '_current_stream_locators', '_current_stream_name',
+                   '_current_file_name', '_current_file_pos', '_close_file',
+                   '_data_buffer', '_dependencies', '_finished_streams',
+                   '_queued_dirents', '_queued_trees']
+
+    def __init__(self):
+        self._dependencies = {}
+        super(ResumableCollectionWriter, self).__init__()
+
+    @classmethod
+    def from_state(cls, state):
+        writer = cls()
+        for attr_name in cls.STATE_PROPS:
+            attr_value = state[attr_name]
+            attr_class = getattr(writer, attr_name).__class__
+            # Coerce the value into the same type as the initial value, if
+            # needed.
+            if attr_class not in (type(None), attr_value.__class__):
+                attr_value = attr_class(attr_value)
+            setattr(writer, attr_name, attr_value)
+        # Check dependencies before we try to resume anything.
+        writer.check_dependencies()
+        if state['_current_file'] is not None:
+            path, pos = state['_current_file']
+            try:
+                writer._queued_file = open(path, 'rb')
+                writer._queued_file.seek(pos)
+            except IOError as error:
+                raise errors.StaleWriterStateError(
+                    "failed to reopen active file {}: {}".format(path, error))
+        writer._do_queued_work()
+        return writer
+
+    def check_dependencies(self):
+        for path, orig_stat in self._dependencies.items():
+            if not S_ISREG(orig_stat[ST_MODE]):
+                raise errors.StaleWriterStateError("{} not file".format(path))
+            try:
+                now_stat = tuple(os.stat(path))
+            except OSError as error:
+                raise errors.StaleWriterStateError(
+                    "failed to stat {}: {}".format(path, error))
+            if ((not S_ISREG(now_stat[ST_MODE])) or
+                (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+                (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+                raise errors.StaleWriterStateError("{} changed".format(path))
+
+    def dump_state(self, copy_func=lambda x: x):
+        state = {attr: copy_func(getattr(self, attr))
+                 for attr in self.STATE_PROPS}
+        if self._queued_file is None:
+            state['_current_file'] = None
+        else:
+            state['_current_file'] = (os.path.realpath(self._queued_file.name),
+                                      self._queued_file.tell())
+        return state
+
+    def _queue_file(self, source, filename=None):
+        try:
+            src_path = os.path.realpath(source)
+        except Exception:
+            raise errors.AssertionError("{} not a file path".format(source))
+        try:
+            path_stat = os.stat(src_path)
+        except OSError as error:
+            raise errors.AssertionError(
+                "could not stat {}: {}".format(source, error))
+        super(ResumableCollectionWriter, self)._queue_file(source, filename)
+        fd_stat = os.fstat(self._queued_file.fileno())
+        if path_stat.st_ino != fd_stat.st_ino:
+            raise errors.AssertionError(
+                "{} changed between open and stat calls".format(source))
+        self._dependencies[src_path] = tuple(fd_stat)
+
+    def write(self, data):
+        if self._queued_file is None:
+            raise errors.AssertionError(
+                "resumable writer can't accept unsourced data")
+        return super(ResumableCollectionWriter, self).write(data)
index b4afb21f1131f16199e0011952d418f30218765d..85472d8cdf0bcb729933709aa7aee4d45a8df18e 100644 (file)
@@ -16,3 +16,5 @@ class NotImplementedError(Exception):
     pass
 class NoKeepServersError(Exception):
     pass
+class StaleWriterStateError(Exception):
+    pass
index f9236ee874dc11f9a8174be4e480ecf3693ec12f..d8cf8e93b1db414c501c625fe9abcaf52284d326 100644 (file)
@@ -4,14 +4,31 @@
 
 import arvados
 import bz2
+import copy
 import errno
 import os
+import pprint
 import shutil
 import subprocess
 import sys
 import tempfile
 import unittest
 
+class TestResumableWriter(arvados.ResumableCollectionWriter):
+    KEEP_BLOCK_SIZE = 1024  # PUT to Keep every 1K.
+
+    def __init__(self):
+        self.saved_states = []
+        return super(TestResumableWriter, self).__init__()
+
+    def checkpoint_state(self):
+        self.saved_states.append(self.dump_state(copy.deepcopy))
+
+    def last_state(self):
+        assert self.saved_states, "resumable writer did not save any state"
+        return self.saved_states[-1]
+
+
 class ArvadosCollectionsTest(unittest.TestCase):
     def _make_tmpdir(self):
         self._tempdirs.append(tempfile.mkdtemp())
@@ -543,6 +560,97 @@ class ArvadosCollectionsTest(unittest.TestCase):
             cwriter.manifest_text(),
             ". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
 
+    def test_checkpoint_after_put(self):
+        cwriter = TestResumableWriter()
+        with self.make_test_file(
+              't' * (cwriter.KEEP_BLOCK_SIZE + 10)) as testfile:
+            testpath = os.path.realpath(testfile.name)
+            cwriter.write_file(testpath, 'test')
+        for state in cwriter.saved_states:
+            if state.get('_current_file') == (testpath,
+                                              cwriter.KEEP_BLOCK_SIZE):
+                break
+        else:
+            self.fail("can't find state immediately after PUT to Keep")
+        self.assertIn('d45107e93f9052fa88a82fc08bb1d316+1024',  # 't' * 1024
+                      state['_current_stream_locators'])
+
+    def test_basic_resume(self):
+        cwriter = TestResumableWriter()
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+            last_state = cwriter.last_state()
+            resumed = TestResumableWriter.from_state(last_state)
+        self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
+                          "resumed CollectionWriter had different manifest")
+
+    def test_resume_fails_when_missing_dependency(self):
+        cwriter = TestResumableWriter()
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+        self.assertRaises(arvados.errors.StaleWriterStateError,
+                          TestResumableWriter.from_state,
+                          cwriter.last_state())
+
+    def test_resume_fails_when_dependency_mtime_changed(self):
+        cwriter = TestResumableWriter()
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+            os.utime(testfile.name, (0, 0))
+            self.assertRaises(arvados.errors.StaleWriterStateError,
+                              TestResumableWriter.from_state,
+                              cwriter.last_state())
+
+    def test_resume_fails_when_dependency_is_nonfile(self):
+        cwriter = TestResumableWriter()
+        cwriter.write_file('/dev/null', 'empty')
+        self.assertRaises(arvados.errors.StaleWriterStateError,
+                          TestResumableWriter.from_state,
+                          cwriter.last_state())
+
+    def test_resume_fails_when_dependency_size_changed(self):
+        cwriter = TestResumableWriter()
+        with self.make_test_file() as testfile:
+            cwriter.write_file(testfile.name, 'test')
+            orig_mtime = os.fstat(testfile.fileno()).st_mtime
+            testfile.write('extra')
+            testfile.flush()
+            os.utime(testfile.name, (orig_mtime, orig_mtime))
+            self.assertRaises(arvados.errors.StaleWriterStateError,
+                              TestResumableWriter.from_state,
+                              cwriter.last_state())
+
+    def test_successful_resumes(self):
+        # FIXME: This is more of an integration test than a unit test.
+        cwriter = TestResumableWriter()
+        source_tree = self.build_directory_tree()
+        with open(os.path.join(source_tree, 'long'), 'w') as longfile:
+            longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
+        cwriter.write_directory_tree(source_tree)
+        # A state for each file, plus a fourth for mid-longfile.
+        self.assertGreater(len(cwriter.saved_states), 3,
+                           "CollectionWriter didn't save enough states to test")
+
+        for state in cwriter.saved_states:
+            new_writer = TestResumableWriter.from_state(state)
+            manifests = [writer.manifest_text()
+                         for writer in (cwriter, new_writer)]
+            self.assertEquals(
+                manifests[0], manifests[1],
+                "\n".join(["manifest mismatch after resuming from state:",
+                           pprint.pformat(state), ""] + manifests))
+
+    def test_arbitrary_objects_not_resumable(self):
+        cwriter = TestResumableWriter()
+        with open('/dev/null') as badfile:
+            self.assertRaises(arvados.errors.AssertionError,
+                              cwriter.write_file, badfile)
+
+    def test_arbitrary_writes_not_resumable(self):
+        cwriter = TestResumableWriter()
+        self.assertRaises(arvados.errors.AssertionError,
+                          cwriter.write, "badtext")
+
 
 if __name__ == '__main__':
     unittest.main()