import threading
from collections import deque
+from stat import *
from keep import *
from stream import *
self._work_trees()
else:
break
+ self.checkpoint_state()
+
+ def checkpoint_state(self):
+ # Subclasses can implement this method to, e.g., report or record state.
+ pass
def _work_file(self):
while True:
def _work_dirents(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
while self._queued_dirents:
dirent = self._queued_dirents.popleft()
target = os.path.join(path, dirent)
def _queue_dirents(self, stream_name, dirents):
assert (not self._queued_dirents), "tried to queue more than one tree"
self._queued_dirents = deque(sorted(dirents))
- self.start_new_stream(stream_name)
def _queue_tree(self, path, stream_name, max_manifest_depth):
self._queued_trees.append((path, stream_name, max_manifest_depth))
self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
+ self.checkpoint_state()
def start_new_file(self, newfilename=None):
self.finish_current_file()
for name, locators, files in self._finished_streams:
ret += locators
return ret
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ def __init__(self):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__()
+
+ @classmethod
+ def from_state(cls, state):
+ writer = cls()
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ "failed to reopen active file {}: {}".format(path, error))
+ writer._do_queued_work()
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in self._dependencies.items():
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError("{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ "failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError("{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError("{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as error:
+ raise errors.AssertionError(
+ "could not stat {}: {}".format(source, error))
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ "{} changed between open and stat calls".format(source))
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)
import arvados
import bz2
+import copy
import errno
import os
+import pprint
import shutil
import subprocess
import sys
import tempfile
import unittest
+class TestResumableWriter(arvados.ResumableCollectionWriter):
+ KEEP_BLOCK_SIZE = 1024 # PUT to Keep every 1K.
+
+ def __init__(self):
+ self.saved_states = []
+ return super(TestResumableWriter, self).__init__()
+
+ def checkpoint_state(self):
+ self.saved_states.append(self.dump_state(copy.deepcopy))
+
+ def last_state(self):
+ assert self.saved_states, "resumable writer did not save any state"
+ return self.saved_states[-1]
+
+
class ArvadosCollectionsTest(unittest.TestCase):
def _make_tmpdir(self):
self._tempdirs.append(tempfile.mkdtemp())
cwriter.manifest_text(),
". 902fbdd2b1df0c4f70b4a5d23525e932+3 0:1:A 1:1:B 2:1:C\n")
+ def test_checkpoint_after_put(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file(
+ 't' * (cwriter.KEEP_BLOCK_SIZE + 10)) as testfile:
+ testpath = os.path.realpath(testfile.name)
+ cwriter.write_file(testpath, 'test')
+ for state in cwriter.saved_states:
+ if state.get('_current_file') == (testpath,
+ cwriter.KEEP_BLOCK_SIZE):
+ break
+ else:
+ self.fail("can't find state immediately after PUT to Keep")
+ self.assertIn('d45107e93f9052fa88a82fc08bb1d316+1024', # 't' * 1024
+ state['_current_stream_locators'])
+
+ def test_basic_resume(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ last_state = cwriter.last_state()
+ resumed = TestResumableWriter.from_state(last_state)
+ self.assertEquals(cwriter.manifest_text(), resumed.manifest_text(),
+ "resumed CollectionWriter had different manifest")
+
+ def test_resume_fails_when_missing_dependency(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_mtime_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ os.utime(testfile.name, (0, 0))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_is_nonfile(self):
+ cwriter = TestResumableWriter()
+ cwriter.write_file('/dev/null', 'empty')
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_resume_fails_when_dependency_size_changed(self):
+ cwriter = TestResumableWriter()
+ with self.make_test_file() as testfile:
+ cwriter.write_file(testfile.name, 'test')
+ orig_mtime = os.fstat(testfile.fileno()).st_mtime
+ testfile.write('extra')
+ testfile.flush()
+ os.utime(testfile.name, (orig_mtime, orig_mtime))
+ self.assertRaises(arvados.errors.StaleWriterStateError,
+ TestResumableWriter.from_state,
+ cwriter.last_state())
+
+ def test_successful_resumes(self):
+ # FIXME: This is more of an integration test than a unit test.
+ cwriter = TestResumableWriter()
+ source_tree = self.build_directory_tree()
+ with open(os.path.join(source_tree, 'long'), 'w') as longfile:
+ longfile.write('t' * (cwriter.KEEP_BLOCK_SIZE + 10))
+ cwriter.write_directory_tree(source_tree)
+ # A state for each file, plus a fourth for mid-longfile.
+ self.assertGreater(len(cwriter.saved_states), 3,
+ "CollectionWriter didn't save enough states to test")
+
+ for state in cwriter.saved_states:
+ new_writer = TestResumableWriter.from_state(state)
+ manifests = [writer.manifest_text()
+ for writer in (cwriter, new_writer)]
+ self.assertEquals(
+ manifests[0], manifests[1],
+ "\n".join(["manifest mismatch after resuming from state:",
+ pprint.pformat(state), ""] + manifests))
+
+ def test_arbitrary_objects_not_resumable(self):
+ cwriter = TestResumableWriter()
+ with open('/dev/null') as badfile:
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write_file, badfile)
+
+ def test_arbitrary_writes_not_resumable(self):
+ cwriter = TestResumableWriter()
+ self.assertRaises(arvados.errors.AssertionError,
+ cwriter.write, "badtext")
+
if __name__ == '__main__':
unittest.main()