2752: Implement CollectionWriter with a work queue.
authorBrett Smith <brett@curoverse.com>
Tue, 20 May 2014 16:14:07 +0000 (12:14 -0400)
committerBrett Smith <brett@curoverse.com>
Fri, 30 May 2014 14:39:51 +0000 (10:39 -0400)
This will make it easier to capture and restore state.

sdk/python/arvados/collection.py

index 71f30daa5bd55f355b282582283177bb0799ecb2..cbf8a8512215edf4f112f48776218767694d2c92 100644 (file)
@@ -18,6 +18,8 @@ import fcntl
 import time
 import threading
 
+from collections import deque
+
 from keep import *
 from stream import *
 import config
@@ -157,6 +159,10 @@ class CollectionWriter(object):
         self._current_file_name = None
         self._current_file_pos = 0
         self._finished_streams = []
+        self._close_file = None
+        self._queued_file = None
+        self._queued_dirents = deque()
+        self._queued_trees = deque()
 
     def __enter__(self):
         pass
@@ -164,38 +170,91 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
-    def write_directory_tree(self,
-                             path, stream_name='.', max_manifest_depth=-1):
-        self.start_new_stream(stream_name)
-        todo = []
-        if max_manifest_depth == 0:
-            dirents = sorted(util.listdir_recursive(path))
-        else:
-            dirents = sorted(os.listdir(path))
-        for dirent in dirents:
-            target = os.path.join(path, dirent)
-            if os.path.isdir(target):
-                todo += [[target,
-                          os.path.join(stream_name, dirent),
-                          max_manifest_depth-1]]
+    def _do_queued_work(self):
+        # The work queue consists of three pieces:
+        # * _queued_file: The file object we're currently writing to the
+        #   Collection.
+        # * _queued_dirents: Entries under the current directory
+        #   (_queued_trees[0]) that we want to write or recurse through.
+        #   This may contain files from subdirectories if
+        #   max_manifest_depth == 0 for this directory.
+        # * _queued_trees: Directories that should be written as separate
+        #   streams to the Collection.
+        # This function handles the smallest piece of work currently queued
+        # (current file, then current directory, then next directory) until
+        # no work remains.  The _work_THING methods each do a unit of work on
+        # THING.  _queue_THING methods add a THING to the work queue.
+        while True:
+            if self._queued_file:
+                self._work_file()
+            elif self._queued_dirents:
+                self._work_dirents()
+            elif self._queued_trees:
+                self._work_trees()
             else:
-                self.write_file(target, dirent)
-        self.finish_current_stream()
-        map(lambda x: self.write_directory_tree(*x), todo)
+                break
 
-    def write_file(self, source, filename=None):
-        if not hasattr(source, 'read'):
-            with open(source, 'rb') as srcfile:
-                return self.write_file(srcfile, filename)
-        elif filename is None:
-            filename = os.path.basename(source.name)
-        self.start_new_file(filename)
+    def _work_file(self):
         while True:
-            buf = source.read(self.KEEP_BLOCK_SIZE)
+            buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
             if not buf:
                 break
             self.write(buf)
         self.finish_current_file()
+        if self._close_file:
+            self._queued_file.close()
+        self._close_file = None
+        self._queued_file = None
+
+    def _work_dirents(self):
+        path, stream_name, max_manifest_depth = self._queued_trees[0]
+        while self._queued_dirents:
+            dirent = self._queued_dirents.popleft()
+            target = os.path.join(path, dirent)
+            if os.path.isdir(target):
+                self._queue_tree(target,
+                                 os.path.join(stream_name, dirent),
+                                 max_manifest_depth - 1)
+            else:
+                self._queue_file(target, dirent)
+                break
+        if not self._queued_dirents:
+            self._queued_trees.popleft()
+
+    def _work_trees(self):
+        path, stream_name, max_manifest_depth = self._queued_trees[0]
+        make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
+                        else os.listdir)
+        self._queue_dirents(stream_name, make_dirents(path))
+
+    def _queue_file(self, source, filename=None):
+        assert (self._queued_file is None), "tried to queue more than one file"
+        if not hasattr(source, 'read'):
+            source = open(source, 'rb')
+            self._close_file = True
+        else:
+            self._close_file = False
+        if filename is None:
+            filename = os.path.basename(source.name)
+        self.start_new_file(filename)
+        self._queued_file = source
+
+    def _queue_dirents(self, stream_name, dirents):
+        assert (not self._queued_dirents), "tried to queue more than one tree"
+        self._queued_dirents = deque(sorted(dirents))
+        self.start_new_stream(stream_name)
+
+    def _queue_tree(self, path, stream_name, max_manifest_depth):
+        self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+    def write_file(self, source, filename=None):
+        self._queue_file(source, filename)
+        self._do_queued_work()
+
+    def write_directory_tree(self,
+                             path, stream_name='.', max_manifest_depth=-1):
+        self._queue_tree(path, stream_name, max_manifest_depth)
+        self._do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):