-class CollectionWriter(CollectionBase):
- """Deprecated, use Collection instead."""
-
- def __init__(self, api_client=None, num_retries=0, replication=None):
- """Instantiate a CollectionWriter.
-
- CollectionWriter lets you build a new Arvados Collection from scratch.
- Write files to it. The CollectionWriter will upload data to Keep as
- appropriate, and provide you with the Collection manifest text when
- you're finished.
-
- Arguments:
- * api_client: The API client to use to look up Collections. If not
- provided, CollectionReader will build one from available Arvados
- configuration.
- * num_retries: The default number of times to retry failed
- service requests. Default 0. You may change this value
- after instantiation, but note those changes may not
- propagate to related objects like the Keep client.
- * replication: The number of copies of each block to store.
- If this argument is None or not supplied, replication is
- the server-provided default if available, otherwise 2.
- """
- self._api_client = api_client
- self.num_retries = num_retries
- self.replication = (2 if replication is None else replication)
- self._keep_client = None
- self._data_buffer = []
- self._data_buffer_len = 0
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = '.'
- self._current_file_name = None
- self._current_file_pos = 0
- self._finished_streams = []
- self._close_file = None
- self._queued_file = None
- self._queued_dirents = deque()
- self._queued_trees = deque()
- self._last_open = None
-
- def __exit__(self, exc_type, exc_value, traceback):
- if exc_type is None:
- self.finish()
-
- def do_queued_work(self):
- # The work queue consists of three pieces:
- # * _queued_file: The file object we're currently writing to the
- # Collection.
- # * _queued_dirents: Entries under the current directory
- # (_queued_trees[0]) that we want to write or recurse through.
- # This may contain files from subdirectories if
- # max_manifest_depth == 0 for this directory.
- # * _queued_trees: Directories that should be written as separate
- # streams to the Collection.
- # This function handles the smallest piece of work currently queued
- # (current file, then current directory, then next directory) until
- # no work remains. The _work_THING methods each do a unit of work on
- # THING. _queue_THING methods add a THING to the work queue.
- while True:
- if self._queued_file:
- self._work_file()
- elif self._queued_dirents:
- self._work_dirents()
- elif self._queued_trees:
- self._work_trees()
- else:
- break
-
- def _work_file(self):
- while True:
- buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
- if not buf:
- break
- self.write(buf)
- self.finish_current_file()
- if self._close_file:
- self._queued_file.close()
- self._close_file = None
- self._queued_file = None
-
- def _work_dirents(self):
- path, stream_name, max_manifest_depth = self._queued_trees[0]
- if stream_name != self.current_stream_name():
- self.start_new_stream(stream_name)
- while self._queued_dirents:
- dirent = self._queued_dirents.popleft()
- target = os.path.join(path, dirent)
- if os.path.isdir(target):
- self._queue_tree(target,
- os.path.join(stream_name, dirent),
- max_manifest_depth - 1)
- else:
- self._queue_file(target, dirent)
- break
- if not self._queued_dirents:
- self._queued_trees.popleft()
-
- def _work_trees(self):
- path, stream_name, max_manifest_depth = self._queued_trees[0]
- d = arvados.util.listdir_recursive(
- path, max_depth = (None if max_manifest_depth == 0 else 0))
- if d:
- self._queue_dirents(stream_name, d)
- else:
- self._queued_trees.popleft()
-
- def _queue_file(self, source, filename=None):
- assert (self._queued_file is None), "tried to queue more than one file"
- if not hasattr(source, 'read'):
- source = open(source, 'rb')
- self._close_file = True
- else:
- self._close_file = False
- if filename is None:
- filename = os.path.basename(source.name)
- self.start_new_file(filename)
- self._queued_file = source
-
- def _queue_dirents(self, stream_name, dirents):
- assert (not self._queued_dirents), "tried to queue more than one tree"
- self._queued_dirents = deque(sorted(dirents))
-
- def _queue_tree(self, path, stream_name, max_manifest_depth):
- self._queued_trees.append((path, stream_name, max_manifest_depth))
-
- def write_file(self, source, filename=None):
- self._queue_file(source, filename)
- self.do_queued_work()
-
- def write_directory_tree(self,
- path, stream_name='.', max_manifest_depth=-1):
- self._queue_tree(path, stream_name, max_manifest_depth)
- self.do_queued_work()
-
- def write(self, newdata):
- if isinstance(newdata, bytes):
- pass
- elif isinstance(newdata, str):
- newdata = newdata.encode()
- elif hasattr(newdata, '__iter__'):
- for s in newdata:
- self.write(s)
- return
- self._data_buffer.append(newdata)
- self._data_buffer_len += len(newdata)
- self._current_stream_length += len(newdata)
- while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
- self.flush_data()
-
- def open(self, streampath, filename=None):
- """open(streampath[, filename]) -> file-like object
-
- Pass in the path of a file to write to the Collection, either as a
- single string or as two separate stream name and file name arguments.
- This method returns a file-like object you can write to add it to the
- Collection.
-
- You may only have one file object from the Collection open at a time,
- so be sure to close the object when you're done. Using the object in
- a with statement makes that easy::
-
- with cwriter.open('./doc/page1.txt') as outfile:
- outfile.write(page1_data)
- with cwriter.open('./doc/page2.txt') as outfile:
- outfile.write(page2_data)
- """
- if filename is None:
- streampath, filename = split(streampath)
- if self._last_open and not self._last_open.closed:
- raise errors.AssertionError(
- u"can't open '{}' when '{}' is still open".format(
- filename, self._last_open.name))
- if streampath != self.current_stream_name():
- self.start_new_stream(streampath)
- self.set_current_file_name(filename)
- self._last_open = _WriterFile(self, filename)
- return self._last_open
-
- def flush_data(self):
- data_buffer = b''.join(self._data_buffer)
- if data_buffer:
- self._current_stream_locators.append(
- self._my_keep().put(
- data_buffer[0:config.KEEP_BLOCK_SIZE],
- copies=self.replication))
- self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
- self._data_buffer_len = len(self._data_buffer[0])
-
- def start_new_file(self, newfilename=None):
- self.finish_current_file()
- self.set_current_file_name(newfilename)
-
- def set_current_file_name(self, newfilename):
- if re.search(r'[\t\n]', newfilename):
- raise errors.AssertionError(
- "Manifest filenames cannot contain whitespace: %s" %
- newfilename)
- elif re.search(r'\x00', newfilename):
- raise errors.AssertionError(
- "Manifest filenames cannot contain NUL characters: %s" %
- newfilename)
- self._current_file_name = newfilename
-
- def current_file_name(self):
- return self._current_file_name
-
- def finish_current_file(self):
- if self._current_file_name is None:
- if self._current_file_pos == self._current_stream_length:
- return
- raise errors.AssertionError(
- "Cannot finish an unnamed file " +
- "(%d bytes at offset %d in '%s' stream)" %
- (self._current_stream_length - self._current_file_pos,
- self._current_file_pos,
- self._current_stream_name))
- self._current_stream_files.append([
- self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name])
- self._current_file_pos = self._current_stream_length
- self._current_file_name = None
-
- def start_new_stream(self, newstreamname='.'):
- self.finish_current_stream()
- self.set_current_stream_name(newstreamname)
-
- def set_current_stream_name(self, newstreamname):
- if re.search(r'[\t\n]', newstreamname):
- raise errors.AssertionError(
- "Manifest stream names cannot contain whitespace: '%s'" %
- (newstreamname))
- self._current_stream_name = '.' if newstreamname=='' else newstreamname
-
- def current_stream_name(self):
- return self._current_stream_name
-
- def finish_current_stream(self):
- self.finish_current_file()
- self.flush_data()
- if not self._current_stream_files:
- pass
- elif self._current_stream_name is None:
- raise errors.AssertionError(
- "Cannot finish an unnamed stream (%d bytes in %d files)" %
- (self._current_stream_length, len(self._current_stream_files)))
- else:
- if not self._current_stream_locators:
- self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
- self._finished_streams.append([self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files])
- self._current_stream_files = []
- self._current_stream_length = 0
- self._current_stream_locators = []
- self._current_stream_name = None
- self._current_file_pos = 0
- self._current_file_name = None
-
- def finish(self):
- """Store the manifest in Keep and return its locator.
-
- This is useful for storing manifest fragments (task outputs)
- temporarily in Keep during a Crunch job.
-
- In other cases you should make a collection instead, by
- sending manifest_text() to the API server's "create
- collection" endpoint.
- """
- return self._my_keep().put(self.manifest_text().encode(),
- copies=self.replication)
-
- def portable_data_hash(self):
- stripped = self.stripped_manifest().encode()
- return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
-
- def manifest_text(self):
- self.finish_current_stream()
- manifest = ''
-
- for stream in self._finished_streams:
- if not re.search(r'^\.(/.*)?$', stream[0]):
- manifest += './'
- manifest += stream[0].replace(' ', '\\040')
- manifest += ' ' + ' '.join(stream[1])
- manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
- manifest += "\n"
-
- return manifest
-
- def data_locators(self):
- ret = []
- for name, locators, files in self._finished_streams:
- ret += locators
- return ret
-
- def save_new(self, name=None):
- return self._api_client.collections().create(
- ensure_unique_name=True,
- body={
- 'name': name,
- 'manifest_text': self.manifest_text(),
- }).execute(num_retries=self.num_retries)
-
-
-class ResumableCollectionWriter(CollectionWriter):
- """Deprecated, use Collection instead."""
-
- STATE_PROPS = ['_current_stream_files', '_current_stream_length',
- '_current_stream_locators', '_current_stream_name',
- '_current_file_name', '_current_file_pos', '_close_file',
- '_data_buffer', '_dependencies', '_finished_streams',
- '_queued_dirents', '_queued_trees']
-
- def __init__(self, api_client=None, **kwargs):
- self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
-
- @classmethod
- def from_state(cls, state, *init_args, **init_kwargs):
- # Try to build a new writer from scratch with the given state.
- # If the state is not suitable to resume (because files have changed,
- # been deleted, aren't predictable, etc.), raise a
- # StaleWriterStateError. Otherwise, return the initialized writer.
- # The caller is responsible for calling writer.do_queued_work()
- # appropriately after it's returned.
- writer = cls(*init_args, **init_kwargs)
- for attr_name in cls.STATE_PROPS:
- attr_value = state[attr_name]
- attr_class = getattr(writer, attr_name).__class__
- # Coerce the value into the same type as the initial value, if
- # needed.
- if attr_class not in (type(None), attr_value.__class__):
- attr_value = attr_class(attr_value)
- setattr(writer, attr_name, attr_value)
- # Check dependencies before we try to resume anything.
- if any(KeepLocator(ls).permission_expired()
- for ls in writer._current_stream_locators):
- raise errors.StaleWriterStateError(
- "locators include expired permission hint")
- writer.check_dependencies()
- if state['_current_file'] is not None:
- path, pos = state['_current_file']
- try:
- writer._queued_file = open(path, 'rb')
- writer._queued_file.seek(pos)
- except IOError as error:
- raise errors.StaleWriterStateError(
- u"failed to reopen active file {}: {}".format(path, error))
- return writer
-
- def check_dependencies(self):
- for path, orig_stat in listitems(self._dependencies):
- if not S_ISREG(orig_stat[ST_MODE]):
- raise errors.StaleWriterStateError(u"{} not file".format(path))
- try:
- now_stat = tuple(os.stat(path))
- except OSError as error:
- raise errors.StaleWriterStateError(
- u"failed to stat {}: {}".format(path, error))
- if ((not S_ISREG(now_stat[ST_MODE])) or
- (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
- (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
- raise errors.StaleWriterStateError(u"{} changed".format(path))
-
- def dump_state(self, copy_func=lambda x: x):
- state = {attr: copy_func(getattr(self, attr))
- for attr in self.STATE_PROPS}
- if self._queued_file is None:
- state['_current_file'] = None
- else:
- state['_current_file'] = (os.path.realpath(self._queued_file.name),
- self._queued_file.tell())
- return state
-
- def _queue_file(self, source, filename=None):
- try:
- src_path = os.path.realpath(source)
- except Exception:
- raise errors.AssertionError(u"{} not a file path".format(source))
- try:
- path_stat = os.stat(src_path)
- except OSError as stat_error:
- path_stat = None
- super(ResumableCollectionWriter, self)._queue_file(source, filename)
- fd_stat = os.fstat(self._queued_file.fileno())
- if not S_ISREG(fd_stat.st_mode):
- # We won't be able to resume from this cache anyway, so don't
- # worry about further checks.
- self._dependencies[source] = tuple(fd_stat)
- elif path_stat is None:
- raise errors.AssertionError(
- u"could not stat {}: {}".format(source, stat_error))
- elif path_stat.st_ino != fd_stat.st_ino:
- raise errors.AssertionError(
- u"{} changed between open and stat calls".format(source))
- else:
- self._dependencies[src_path] = tuple(fd_stat)
-
- def write(self, data):
- if self._queued_file is None:
- raise errors.AssertionError(
- "resumable writer can't accept unsourced data")
- return super(ResumableCollectionWriter, self).write(data)
-
-
-ADD = "add"
-DEL = "del"
-MOD = "mod"
-TOK = "tok"
-FILE = "file"
-COLLECTION = "collection"
-