-import gflags
+import functools
import logging
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
from collections import deque
from stat import *
+from .arvfile import ArvadosFileBase
from keep import *
-from stream import *
+from .stream import StreamReader, split
import config
import errors
import util
else:
raise errors.ArgumentError(
"Argument to CollectionReader must be a manifest or a collection UUID")
+ self._api_response = None
self._streams = None
def _populate_from_api_server(self):
if self._api_client is None:
self._api_client = arvados.api('v1')
self._keep_client = None # Make a new one with the new api.
- c = self._api_client.collections().get(
+ self._api_response = self._api_client.collections().get(
uuid=self._manifest_locator).execute(
num_retries=self.num_retries)
- self._manifest_text = c['manifest_text']
+ self._manifest_text = self._api_response['manifest_text']
return None
except Exception as e:
return e
return e
def _populate(self):
- if self._streams is not None:
- return
error_via_api = None
error_via_keep = None
should_try_keep = ((self._manifest_text is None) and
for sline in self._manifest_text.split("\n")
if sline]
- def normalize(self):
- self._populate()
+ def _populate_first(orig_func):
+ # Decorator for methods that read actual Collection data.
+ @functools.wraps(orig_func)
+ def wrapper(self, *args, **kwargs):
+ if self._streams is None:
+ self._populate()
+ return orig_func(self, *args, **kwargs)
+ return wrapper
+
+ @_populate_first
+ def api_response(self):
+ """api_response() -> dict or None
+
+ Returns information about this Collection fetched from the API server.
+ If the Collection exists in Keep but not the API server, currently
+ returns None. Future versions may provide a synthetic response.
+ """
+ return self._api_response
+ @_populate_first
+ def normalize(self):
# Rearrange streams
streams = {}
for s in self.all_streams():
for f in s.all_files():
- filestream = s.name() + "/" + f.name()
- r = filestream.rindex("/")
- streamname = filestream[:r]
- filename = filestream[r+1:]
+ streamname, filename = split(s.name() + "/" + f.name())
if streamname not in streams:
streams[streamname] = {}
if filename not in streams[streamname]:
[StreamReader(stream, keep=self._my_keep()).manifest_text()
for stream in self._streams])
+ @_populate_first
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to read from the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object to read that file.
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ keep_client = self._my_keep()
+ for stream_s in self._streams:
+ stream = StreamReader(stream_s, keep_client,
+ num_retries=self.num_retries)
+ if stream.name() == streampath:
+ break
+ else:
+ raise ValueError("stream '{}' not found in Collection".
+ format(streampath))
+ try:
+ return stream.files()[filename]
+ except KeyError:
+ raise ValueError("file '{}' not found in Collection stream '{}'".
+ format(filename, streampath))
+
+ @_populate_first
def all_streams(self):
- self._populate()
return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
for s in self._streams]
for f in s.all_files():
yield f
+ @_populate_first
def manifest_text(self, strip=False, normalize=False):
if normalize:
cr = CollectionReader(self.manifest_text())
elif strip:
return self.stripped_manifest()
else:
- self._populate()
return self._manifest_text
+class _WriterFile(ArvadosFileBase):
+ def __init__(self, coll_writer, name):
+ super(_WriterFile, self).__init__(name, 'wb')
+ self.dest = coll_writer
+
+ def close(self):
+ super(_WriterFile, self).close()
+ self.dest.finish_current_file()
+
+ @ArvadosFileBase._before_close
+ def write(self, data):
+ self.dest.write(data)
+
+ @ArvadosFileBase._before_close
+ def writelines(self, seq):
+ for data in seq:
+ self.write(data)
+
+ @ArvadosFileBase._before_close
+ def flush(self):
+ self.dest.flush_data()
+
+
class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, num_retries=0, replication=None):
"""Instantiate a CollectionWriter.
CollectionWriter lets you build a new Arvados Collection from scratch.
service requests. Default 0. You may change this value
after instantiation, but note those changes may not
propagate to related objects like the Keep client.
+ * replication: The number of copies of each block to store.
+ If this argument is None or not supplied, replication is
+ the server-provided default if available, otherwise 2.
"""
self._api_client = api_client
self.num_retries = num_retries
+ self.replication = (2 if replication is None else replication)
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
self._queued_file = None
self._queued_dirents = deque()
self._queued_trees = deque()
+ self._last_open = None
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
self.flush_data()
+ def open(self, streampath, filename=None):
+ """open(streampath[, filename]) -> file-like object
+
+ Pass in the path of a file to write to the Collection, either as a
+ single string or as two separate stream name and file name arguments.
+ This method returns a file-like object you can write to add it to the
+ Collection.
+
+ You may only have one file object from the Collection open at a time,
+ so be sure to close the object when you're done. Using the object in
+ a with statement makes that easy::
+
+ with cwriter.open('./doc/page1.txt') as outfile:
+ outfile.write(page1_data)
+ with cwriter.open('./doc/page2.txt') as outfile:
+ outfile.write(page2_data)
+ """
+ if filename is None:
+ streampath, filename = split(streampath)
+ if self._last_open and not self._last_open.closed:
+ raise errors.AssertionError(
+ "can't open '{}' when '{}' is still open".format(
+ filename, self._last_open.name))
+ if streampath != self.current_stream_name():
+ self.start_new_stream(streampath)
+ self.set_current_file_name(filename)
+ self._last_open = _WriterFile(self, filename)
+ return self._last_open
+
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
if data_buffer:
self._current_stream_locators.append(
- self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(
+ data_buffer[0:self.KEEP_BLOCK_SIZE],
+ copies=self.replication))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
raise errors.AssertionError(
"Manifest filenames cannot contain whitespace: %s" %
newfilename)
+ elif re.search(r'\x00', newfilename):
+ raise errors.AssertionError(
+ "Manifest filenames cannot contain NUL characters: %s" %
+ newfilename)
self._current_file_name = newfilename
def current_file_name(self):
self._current_file_name = None
def finish(self):
- # Store the manifest in Keep and return its locator.
- return self._my_keep().put(self.manifest_text())
+ """Store the manifest in Keep and return its locator.
+
+ This is useful for storing manifest fragments (task outputs)
+ temporarily in Keep during a Crunch job.
+
+ In other cases you should make a collection instead, by
+ sending manifest_text() to the API server's "create
+ collection" endpoint.
+ """
+ return self._my_keep().put(self.manifest_text(), copies=self.replication)
def portable_data_hash(self):
stripped = self.stripped_manifest()
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None, num_retries=0):
+ def __init__(self, api_client=None, **kwargs):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(
- api_client, num_retries=num_retries)
+ super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):