+
+
+class CollectionWriter(CollectionBase):
+ """Create a new collection from scratch
+
+ .. WARNING:: Deprecated
+ This class is deprecated. Prefer `arvados.collection.Collection`
+ instead.
+ """
+
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+ def __init__(self, api_client=None, num_retries=0, replication=None):
+ """Instantiate a CollectionWriter.
+
+ CollectionWriter lets you build a new Arvados Collection from scratch.
+ Write files to it. The CollectionWriter will upload data to Keep as
+ appropriate, and provide you with the Collection manifest text when
+ you're finished.
+
+ Arguments:
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ * replication: The number of copies of each block to store.
+ If this argument is None or not supplied, replication is
+ the server-provided default if available, otherwise 2.
+ """
+ self._api_client = api_client
+ self.num_retries = num_retries
+ self.replication = (2 if replication is None else replication)
+ self._keep_client = None
+ self._data_buffer = []
+ self._data_buffer_len = 0
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = '.'
+ self._current_file_name = None
+ self._current_file_pos = 0
+ self._finished_streams = []
+ self._close_file = None
+ self._queued_file = None
+ self._queued_dirents = deque()
+ self._queued_trees = deque()
+ self._last_open = None
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.finish()
+
+ def do_queued_work(self):
+ # The work queue consists of three pieces:
+ # * _queued_file: The file object we're currently writing to the
+ # Collection.
+ # * _queued_dirents: Entries under the current directory
+ # (_queued_trees[0]) that we want to write or recurse through.
+ # This may contain files from subdirectories if
+ # max_manifest_depth == 0 for this directory.
+ # * _queued_trees: Directories that should be written as separate
+ # streams to the Collection.
+ # This function handles the smallest piece of work currently queued
+ # (current file, then current directory, then next directory) until
+ # no work remains. The _work_THING methods each do a unit of work on
+ # THING. _queue_THING methods add a THING to the work queue.
+ while True:
+ if self._queued_file:
+ self._work_file()
+ elif self._queued_dirents:
+ self._work_dirents()
+ elif self._queued_trees:
+ self._work_trees()
+ else:
+ break
+
+ def _work_file(self):
+ while True:
+ buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
+ if not buf:
+ break
+ self.write(buf)
+ self.finish_current_file()
+ if self._close_file:
+ self._queued_file.close()
+ self._close_file = None
+ self._queued_file = None
+
+ def _work_dirents(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ if stream_name != self.current_stream_name():
+ self.start_new_stream(stream_name)
+ while self._queued_dirents:
+ dirent = self._queued_dirents.popleft()
+ target = os.path.join(path, dirent)
+ if os.path.isdir(target):
+ self._queue_tree(target,
+ os.path.join(stream_name, dirent),
+ max_manifest_depth - 1)
+ else:
+ self._queue_file(target, dirent)
+ break
+ if not self._queued_dirents:
+ self._queued_trees.popleft()
+
+ def _work_trees(self):
+ path, stream_name, max_manifest_depth = self._queued_trees[0]
+ d = arvados.util.listdir_recursive(
+ path, max_depth = (None if max_manifest_depth == 0 else 0))
+ if d:
+ self._queue_dirents(stream_name, d)
+ else:
+ self._queued_trees.popleft()
+
+ def _queue_file(self, source, filename=None):
+ assert (self._queued_file is None), "tried to queue more than one file"
+ if not hasattr(source, 'read'):
+ source = open(source, 'rb')
+ self._close_file = True
+ else:
+ self._close_file = False
+ if filename is None:
+ filename = os.path.basename(source.name)
+ self.start_new_file(filename)
+ self._queued_file = source
+
+ def _queue_dirents(self, stream_name, dirents):
+ assert (not self._queued_dirents), "tried to queue more than one tree"
+ self._queued_dirents = deque(sorted(dirents))
+
+ def _queue_tree(self, path, stream_name, max_manifest_depth):
+ self._queued_trees.append((path, stream_name, max_manifest_depth))
+
+ def write_file(self, source, filename=None):
+ self._queue_file(source, filename)
+ self.do_queued_work()
+
+ def write_directory_tree(self,
+ path, stream_name='.', max_manifest_depth=-1):
+ self._queue_tree(path, stream_name, max_manifest_depth)
+ self.do_queued_work()
+
+ def write(self, newdata):
+ if isinstance(newdata, bytes):
+ pass
+ elif isinstance(newdata, str):
+ newdata = newdata.encode()
+ elif hasattr(newdata, '__iter__'):
+ for s in newdata:
+ self.write(s)
+ return
+ self._data_buffer.append(newdata)
+ self._data_buffer_len += len(newdata)
+ self._current_stream_length += len(newdata)
+ while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
+ self.flush_data()
+
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to write to the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object you can write to add it to the
+ Collection.
+
+ You may only have one file object from the Collection open at a time,
+ so be sure to close the object when you're done. Using the object in
+ a with statement makes that easy::
+
+ with cwriter.open('./doc/page1.txt') as outfile:
+ outfile.write(page1_data)
+ with cwriter.open('./doc/page2.txt') as outfile:
+ outfile.write(page2_data)
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ if self._last_open and not self._last_open.closed:
+ raise errors.AssertionError(
+ u"can't open '{}' when '{}' is still open".format(
+ filename, self._last_open.name))
+ if streampath != self.current_stream_name():
+ self.start_new_stream(streampath)
+ self.set_current_file_name(filename)
+ self._last_open = _WriterFile(self, filename)
+ return self._last_open
+
+ def flush_data(self):
+ data_buffer = b''.join(self._data_buffer)
+ if data_buffer:
+ self._current_stream_locators.append(
+ self._my_keep().put(
+ data_buffer[0:config.KEEP_BLOCK_SIZE],
+ copies=self.replication))
+ self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
+ self._data_buffer_len = len(self._data_buffer[0])
+
+ def start_new_file(self, newfilename=None):
+ self.finish_current_file()
+ self.set_current_file_name(newfilename)
+
+ def set_current_file_name(self, newfilename):
+ if re.search(r'[\t\n]', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain whitespace: %s" %
+ newfilename)
+ elif re.search(r'\x00', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain NUL characters: %s" %
+ newfilename)
+ self._current_file_name = newfilename
+
+ def current_file_name(self):
+ return self._current_file_name
+
+ def finish_current_file(self):
+ if self._current_file_name is None:
+ if self._current_file_pos == self._current_stream_length:
+ return
+ raise errors.AssertionError(
+ "Cannot finish an unnamed file " +
+ "(%d bytes at offset %d in '%s' stream)" %
+ (self._current_stream_length - self._current_file_pos,
+ self._current_file_pos,
+ self._current_stream_name))
+ self._current_stream_files.append([
+ self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name])
+ self._current_file_pos = self._current_stream_length
+ self._current_file_name = None
+
+ def start_new_stream(self, newstreamname='.'):
+ self.finish_current_stream()
+ self.set_current_stream_name(newstreamname)
+
+ def set_current_stream_name(self, newstreamname):
+ if re.search(r'[\t\n]', newstreamname):
+ raise errors.AssertionError(
+ "Manifest stream names cannot contain whitespace: '%s'" %
+ (newstreamname))
+ self._current_stream_name = '.' if newstreamname=='' else newstreamname
+
+ def current_stream_name(self):
+ return self._current_stream_name
+
+ def finish_current_stream(self):
+ self.finish_current_file()
+ self.flush_data()
+ if not self._current_stream_files:
+ pass
+ elif self._current_stream_name is None:
+ raise errors.AssertionError(
+ "Cannot finish an unnamed stream (%d bytes in %d files)" %
+ (self._current_stream_length, len(self._current_stream_files)))
+ else:
+ if not self._current_stream_locators:
+ self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+ self._finished_streams.append([self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files])
+ self._current_stream_files = []
+ self._current_stream_length = 0
+ self._current_stream_locators = []
+ self._current_stream_name = None
+ self._current_file_pos = 0
+ self._current_file_name = None
+
+ def finish(self):
+ """Store the manifest in Keep and return its locator.
+
+ This is useful for storing manifest fragments (task outputs)
+ temporarily in Keep during a Crunch job.
+
+ In other cases you should make a collection instead, by
+ sending manifest_text() to the API server's "create
+ collection" endpoint.
+ """
+ return self._my_keep().put(self.manifest_text().encode(),
+ copies=self.replication)
+
+ def portable_data_hash(self):
+ stripped = self.stripped_manifest().encode()
+ return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
+
+ def manifest_text(self):
+ self.finish_current_stream()
+ manifest = ''
+
+ for stream in self._finished_streams:
+ if not re.search(r'^\.(/.*)?$', stream[0]):
+ manifest += './'
+ manifest += stream[0].replace(' ', '\\040')
+ manifest += ' ' + ' '.join(stream[1])
+ manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
+ manifest += "\n"
+
+ return manifest
+
+ def data_locators(self):
+ ret = []
+ for name, locators, files in self._finished_streams:
+ ret += locators
+ return ret
+
+ def save_new(self, name=None):
+ return self._api_client.collections().create(
+ ensure_unique_name=True,
+ body={
+ 'name': name,
+ 'manifest_text': self.manifest_text(),
+ }).execute(num_retries=self.num_retries)
+
+
+class ResumableCollectionWriter(CollectionWriter):
+ """CollectionWriter that can serialize internal state to disk
+
+ .. WARNING:: Deprecated
+ This class is deprecated. Prefer `arvados.collection.Collection`
+ instead.
+ """
+
+ STATE_PROPS = ['_current_stream_files', '_current_stream_length',
+ '_current_stream_locators', '_current_stream_name',
+ '_current_file_name', '_current_file_pos', '_close_file',
+ '_data_buffer', '_dependencies', '_finished_streams',
+ '_queued_dirents', '_queued_trees']
+
+ @arvados.util._deprecated('3.0', 'arvados.collection.Collection')
+ def __init__(self, api_client=None, **kwargs):
+ self._dependencies = {}
+ super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
+
+ @classmethod
+ def from_state(cls, state, *init_args, **init_kwargs):
+ # Try to build a new writer from scratch with the given state.
+ # If the state is not suitable to resume (because files have changed,
+ # been deleted, aren't predictable, etc.), raise a
+ # StaleWriterStateError. Otherwise, return the initialized writer.
+ # The caller is responsible for calling writer.do_queued_work()
+ # appropriately after it's returned.
+ writer = cls(*init_args, **init_kwargs)
+ for attr_name in cls.STATE_PROPS:
+ attr_value = state[attr_name]
+ attr_class = getattr(writer, attr_name).__class__
+ # Coerce the value into the same type as the initial value, if
+ # needed.
+ if attr_class not in (type(None), attr_value.__class__):
+ attr_value = attr_class(attr_value)
+ setattr(writer, attr_name, attr_value)
+ # Check dependencies before we try to resume anything.
+ if any(KeepLocator(ls).permission_expired()
+ for ls in writer._current_stream_locators):
+ raise errors.StaleWriterStateError(
+ "locators include expired permission hint")
+ writer.check_dependencies()
+ if state['_current_file'] is not None:
+ path, pos = state['_current_file']
+ try:
+ writer._queued_file = open(path, 'rb')
+ writer._queued_file.seek(pos)
+ except IOError as error:
+ raise errors.StaleWriterStateError(
+ u"failed to reopen active file {}: {}".format(path, error))
+ return writer
+
+ def check_dependencies(self):
+ for path, orig_stat in listitems(self._dependencies):
+ if not S_ISREG(orig_stat[ST_MODE]):
+ raise errors.StaleWriterStateError(u"{} not file".format(path))
+ try:
+ now_stat = tuple(os.stat(path))
+ except OSError as error:
+ raise errors.StaleWriterStateError(
+ u"failed to stat {}: {}".format(path, error))
+ if ((not S_ISREG(now_stat[ST_MODE])) or
+ (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
+ (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
+ raise errors.StaleWriterStateError(u"{} changed".format(path))
+
+ def dump_state(self, copy_func=lambda x: x):
+ state = {attr: copy_func(getattr(self, attr))
+ for attr in self.STATE_PROPS}
+ if self._queued_file is None:
+ state['_current_file'] = None
+ else:
+ state['_current_file'] = (os.path.realpath(self._queued_file.name),
+ self._queued_file.tell())
+ return state
+
+ def _queue_file(self, source, filename=None):
+ try:
+ src_path = os.path.realpath(source)
+ except Exception:
+ raise errors.AssertionError(u"{} not a file path".format(source))
+ try:
+ path_stat = os.stat(src_path)
+ except OSError as stat_error:
+ path_stat = None
+ super(ResumableCollectionWriter, self)._queue_file(source, filename)
+ fd_stat = os.fstat(self._queued_file.fileno())
+ if not S_ISREG(fd_stat.st_mode):
+ # We won't be able to resume from this cache anyway, so don't
+ # worry about further checks.
+ self._dependencies[source] = tuple(fd_stat)
+ elif path_stat is None:
+ raise errors.AssertionError(
+ u"could not stat {}: {}".format(source, stat_error))
+ elif path_stat.st_ino != fd_stat.st_ino:
+ raise errors.AssertionError(
+ u"{} changed between open and stat calls".format(source))
+ else:
+ self._dependencies[src_path] = tuple(fd_stat)
+
+ def write(self, data):
+ if self._queued_file is None:
+ raise errors.AssertionError(
+ "resumable writer can't accept unsourced data")
+ return super(ResumableCollectionWriter, self).write(data)