import time
import threading
+from collections import deque
+
from keep import *
from stream import *
import config
self._current_file_name = None
self._current_file_pos = 0
self._finished_streams = []
+ self._close_file = None
+ self._queued_file = None
+ self._queued_dirents = deque()
+ self._queued_trees = deque()
def __enter__(self):
pass
def __exit__(self):
self.finish()
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self.start_new_stream(stream_name)
- todo = []
- if max_manifest_depth == 0:
- dirents = sorted(util.listdir_recursive(path))
- else:
- dirents = sorted(os.listdir(path))
- for dirent in dirents:
- target = os.path.join(path, dirent)
- if os.path.isdir(target):
- todo += [[target,
- os.path.join(stream_name, dirent),
- max_manifest_depth-1]]
+ def _do_queued_work(self):
+ # The work queue consists of three pieces:
+ # * _queued_file: The file object we're currently writing to the
+ # Collection.
+ # * _queued_dirents: Entries under the current directory
+ # (_queued_trees[0]) that we want to write or recurse through.
+ # This may contain files from subdirectories if
+ # max_manifest_depth == 0 for this directory.
+ # * _queued_trees: Directories that should be written as separate
+ # streams to the Collection.
+ # This function handles the smallest piece of work currently queued
+ # (current file, then current directory, then next directory) until
+ # no work remains. The _work_THING methods each do a unit of work on
+ # THING. _queue_THING methods add a THING to the work queue.
+ while True:
+ if self._queued_file:
+ self._work_file()
+ elif self._queued_dirents:
+ self._work_dirents()
+ elif self._queued_trees:
+ self._work_trees()
else:
- self.write_file(target, dirent)
- self.finish_current_stream()
- map(lambda x: self.write_directory_tree(*x), todo)
+ break
- def write_file(self, source, filename=None):
- if not hasattr(source, 'read'):
- with open(source, 'rb') as srcfile:
- return self.write_file(srcfile, filename)
- elif filename is None:
- filename = os.path.basename(source.name)
- self.start_new_file(filename)
+ def _work_file(self):
while True:
- buf = source.read(self.KEEP_BLOCK_SIZE)
+ buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
if not buf:
break
self.write(buf)
self.finish_current_file()
+ if self._close_file:
+ self._queued_file.close()
+ self._close_file = None
+ self._queued_file = None
+
+ def _work_dirents(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ while self._queued_dirents:
+ dirent = self._queued_dirents.popleft()
+ target = os.path.join(path, dirent)
+ if os.path.isdir(target):
+ self._queue_tree(target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth - 1)
+ else:
+ self._queue_file(target, dirent)
+ break
+ if not self._queued_dirents:
+ self._queued_trees.popleft()
+
+ def _work_trees(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
+ else os.listdir)
+ self._queue_dirents(stream_name, make_dirents(path))
+
+ def _queue_file(self, source, filename=None):
+ assert (self._queued_file is None), "tried to queue more than one file"
+ if not hasattr(source, 'read'):
+ source = open(source, 'rb')
+ self._close_file = True
+ else:
+ self._close_file = False
+ if filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ self._queued_file = source
+
+ def _queue_dirents(self, stream_name, dirents):
+ assert (not self._queued_dirents), "tried to queue more than one tree"
+ self._queued_dirents = deque(sorted(dirents))
+ self.start_new_stream(stream_name)
+
+ def _queue_tree(self, path, stream_name, max_manifest_depth):
+ self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+ def write_file(self, source, filename=None):
+ self._queue_file(source, filename)
+ self._do_queued_work()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self._queue_tree(path, stream_name, max_manifest_depth)
+ self._do_queued_work()
def write(self, newdata):
if hasattr(newdata, '__iter__'):