import errors
import util
+_logger = logging.getLogger('arvados.collection')
+
def normalize_stream(s, stream):
stream_tokens = [s]
sortedfiles = list(stream.keys())
class CollectionReader(object):
- def __init__(self, manifest_locator_or_text):
- if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+ def __init__(self, manifest_locator_or_text, api_client=None):
+ self._api_client = api_client
+ self._keep_client = None
+ if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ self._manifest_text = None
+ elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
- elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+ elif re.match(r'((\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
self._manifest_text = manifest_locator_or_text
self._manifest_locator = None
else:
pass
def _populate(self):
- if self._streams != None:
+ if self._streams is not None:
return
if not self._manifest_text:
try:
- c = arvados.api('v1').collections().get(
+ # As in KeepClient itself, we must wait until the last possible
+ # moment to instantiate an API client, in order to avoid
+ # tripping up clients that don't have access to an API server.
+ # If we do build one, make sure our Keep client uses it.
+ # If instantiation fails, we'll fall back to the except clause,
+ # just like any other Collection lookup failure.
+ if self._api_client is None:
+ self._api_client = arvados.api('v1')
+ self._keep_client = KeepClient(api_client=self._api_client)
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client)
+ c = self._api_client.collections().get(
uuid=self._manifest_locator).execute()
self._manifest_text = c['manifest_text']
except Exception as e:
- logging.warning("API lookup failed for collection %s (%s: %s)" %
- (self._manifest_locator, type(e), str(e)))
- self._manifest_text = Keep.get(self._manifest_locator)
+ if not util.portable_data_hash_pattern.match(
+ self._manifest_locator):
+ raise
+ _logger.warning("API lookup failed for collection %s (%s: %s)",
+ self._manifest_locator, type(e), str(e))
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client)
+ self._manifest_text = self._keep_client.get(self._manifest_locator)
self._streams = []
for stream_line in self._manifest_text.split("\n"):
if stream_line != '':
class CollectionWriter(object):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self):
+ def __init__(self, api_client=None):
+ self._api_client = api_client
+ self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
self._current_stream_files = []
def __exit__(self):
self.finish()
+ def _prep_keep_client(self):
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client)
+
def do_queued_work(self):
# The work queue consists of three pieces:
# * _queued_file: The file object we're currently writing to the
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
if data_buffer != '':
- self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+ self._prep_keep_client()
+ self._current_stream_locators.append(
+ self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
self._current_file_pos,
self._current_stream_name))
self._current_stream_files += [[self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name]]
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name]]
self._current_file_pos = self._current_stream_length
def start_new_stream(self, newstreamname='.'):
if len(self._current_stream_locators) == 0:
self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
self._finished_streams += [[self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files]]
+ self._current_stream_locators,
+ self._current_stream_files]]
self._current_stream_files = []
self._current_stream_length = 0
self._current_stream_locators = []
self._current_file_name = None
def finish(self):
- # Send the stripped manifest to Keep, to ensure that we use the
- # same UUID regardless of what hints are used on the collection.
- return Keep.put(self.stripped_manifest())
+ # Store the manifest in Keep and return its locator.
+ self._prep_keep_client()
+ return self._keep_client.put(self.manifest_text())
def stripped_manifest(self):
"""
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self):
+ def __init__(self, api_client=None):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__()
+ super(ResumableCollectionWriter, self).__init__(api_client)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):