From fc2b0d0b96456bd260f2f508c2da10d74aba22f2 Mon Sep 17 00:00:00 2001 From: Brett Smith Date: Tue, 20 May 2014 12:14:07 -0400 Subject: [PATCH] 2752: Implement CollectionWriter with a work queue. This will make it easier to capture and restore state. --- sdk/python/arvados/collection.py | 109 ++++++++++++++++++++++++------- 1 file changed, 84 insertions(+), 25 deletions(-) diff --git a/sdk/python/arvados/collection.py b/sdk/python/arvados/collection.py index 71f30daa5b..cbf8a85122 100644 --- a/sdk/python/arvados/collection.py +++ b/sdk/python/arvados/collection.py @@ -18,6 +18,8 @@ import fcntl import time import threading +from collections import deque + from keep import * from stream import * import config @@ -157,6 +159,10 @@ class CollectionWriter(object): self._current_file_name = None self._current_file_pos = 0 self._finished_streams = [] + self._close_file = None + self._queued_file = None + self._queued_dirents = deque() + self._queued_trees = deque() def __enter__(self): pass @@ -164,38 +170,91 @@ class CollectionWriter(object): def __exit__(self): self.finish() - def write_directory_tree(self, - path, stream_name='.', max_manifest_depth=-1): - self.start_new_stream(stream_name) - todo = [] - if max_manifest_depth == 0: - dirents = sorted(util.listdir_recursive(path)) - else: - dirents = sorted(os.listdir(path)) - for dirent in dirents: - target = os.path.join(path, dirent) - if os.path.isdir(target): - todo += [[target, - os.path.join(stream_name, dirent), - max_manifest_depth-1]] + def _do_queued_work(self): + # The work queue consists of three pieces: + # * _queued_file: The file object we're currently writing to the + # Collection. + # * _queued_dirents: Entries under the current directory + # (_queued_trees[0]) that we want to write or recurse through. + # This may contain files from subdirectories if + # max_manifest_depth == 0 for this directory. + # * _queued_trees: Directories that should be written as separate + # streams to the Collection. + # This function handles the smallest piece of work currently queued + # (current file, then current directory, then next directory) until + # no work remains. The _work_THING methods each do a unit of work on + # THING. _queue_THING methods add a THING to the work queue. + while True: + if self._queued_file: + self._work_file() + elif self._queued_dirents: + self._work_dirents() + elif self._queued_trees: + self._work_trees() else: - self.write_file(target, dirent) - self.finish_current_stream() - map(lambda x: self.write_directory_tree(*x), todo) + break - def write_file(self, source, filename=None): - if not hasattr(source, 'read'): - with open(source, 'rb') as srcfile: - return self.write_file(srcfile, filename) - elif filename is None: - filename = os.path.basename(source.name) - self.start_new_file(filename) + def _work_file(self): while True: - buf = source.read(self.KEEP_BLOCK_SIZE) + buf = self._queued_file.read(self.KEEP_BLOCK_SIZE) if not buf: break self.write(buf) self.finish_current_file() + if self._close_file: + self._queued_file.close() + self._close_file = None + self._queued_file = None + + def _work_dirents(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + while self._queued_dirents: + dirent = self._queued_dirents.popleft() + target = os.path.join(path, dirent) + if os.path.isdir(target): + self._queue_tree(target, + os.path.join(stream_name, dirent), + max_manifest_depth - 1) + else: + self._queue_file(target, dirent) + break + if not self._queued_dirents: + self._queued_trees.popleft() + + def _work_trees(self): + path, stream_name, max_manifest_depth = self._queued_trees[0] + make_dirents = (util.listdir_recursive if (max_manifest_depth == 0) + else os.listdir) + self._queue_dirents(stream_name, make_dirents(path)) + + def _queue_file(self, source, filename=None): + assert (self._queued_file is None), "tried to queue more than one file" + if not hasattr(source, 'read'): + source = open(source, 'rb') + self._close_file = True + else: + self._close_file = False + if filename is None: + filename = os.path.basename(source.name) + self.start_new_file(filename) + self._queued_file = source + + def _queue_dirents(self, stream_name, dirents): + assert (not self._queued_dirents), "tried to queue more than one tree" + self._queued_dirents = deque(sorted(dirents)) + self.start_new_stream(stream_name) + + def _queue_tree(self, path, stream_name, max_manifest_depth): + self._queued_trees.append((path, stream_name, max_manifest_depth)) + + def write_file(self, source, filename=None): + self._queue_file(source, filename) + self._do_queued_work() + + def write_directory_tree(self, + path, stream_name='.', max_manifest_depth=-1): + self._queue_tree(path, stream_name, max_manifest_depth) + self._do_queued_work() def write(self, newdata): if hasattr(newdata, '__iter__'): -- 2.30.2