-import gflags
-import httplib
-import httplib2
import logging
import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
from collections import deque
from stat import *
fout = f.replace(' ', '\\040')
for segment in stream[f]:
segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
- if current_span == None:
+ if current_span is None:
current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
else:
if segmentoffset == current_span[1]:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
- if current_span != None:
+ if current_span is not None:
stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
- if len(stream[f]) == 0:
+ if not stream[f]:
stream_tokens.append("0:0:{0}".format(fout))
return stream_tokens
-def normalize(collection):
- streams = {}
- for s in collection.all_streams():
- for f in s.all_files():
- filestream = s.name() + "/" + f.name()
- r = filestream.rindex("/")
- streamname = filestream[:r]
- filename = filestream[r+1:]
- if streamname not in streams:
- streams[streamname] = {}
- if filename not in streams[streamname]:
- streams[streamname][filename] = []
- for r in f.segments:
- streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
-
- normalized_streams = []
- sortedstreams = list(streams.keys())
- sortedstreams.sort()
- for s in sortedstreams:
- normalized_streams.append(normalize_stream(s, streams[s]))
- return normalized_streams
-
-
-class CollectionReader(object):
- def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
+
+class CollectionBase(object):
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ pass
+
+ def _my_keep(self):
+ if self._keep_client is None:
+ self._keep_client = KeepClient(api_client=self._api_client,
+ num_retries=self.num_retries)
+ return self._keep_client
+
+ def stripped_manifest(self):
+ """
+ Return the manifest for the current collection with all
+ non-portable hints (i.e., permission signatures and other
+ hints other than size hints) removed from the locators.
+ """
+ raw = self.manifest_text()
+ clean = []
+ for line in raw.split("\n"):
+ fields = line.split()
+ if fields:
+ clean_fields = fields[:1] + [
+ (re.sub(r'\+[^\d][^\+]*', '', x)
+ if re.match(util.keep_locator_pattern, x)
+ else x)
+ for x in fields[1:]]
+ clean += [' '.join(clean_fields), "\n"]
+ return ''.join(clean)
+
+
+class CollectionReader(CollectionBase):
+ def __init__(self, manifest_locator_or_text, api_client=None,
+ keep_client=None, num_retries=0):
+ """Instantiate a CollectionReader.
+
+ This class parses Collection manifests to provide a simple interface
+ to read its underlying files.
+
+ Arguments:
+ * manifest_locator_or_text: One of a Collection UUID, portable data
+ hash, or full manifest text.
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * keep_client: The KeepClient to use to download Collection data.
+ If not provided, CollectionReader will build one from available
+ Arvados configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ """
self._api_client = api_client
self._keep_client = keep_client
- if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+ self.num_retries = num_retries
+ if re.match(util.keep_locator_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
- elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
+ elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
self._manifest_locator = manifest_locator_or_text
self._manifest_text = None
- elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
+ elif re.match(util.manifest_pattern, manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
self._manifest_locator = None
else:
"Argument to CollectionReader must be a manifest or a collection UUID")
self._streams = None
- def __enter__(self):
- pass
-
- def __exit__(self):
- pass
+ def _populate_from_api_server(self):
+ # As in KeepClient itself, we must wait until the last
+ # possible moment to instantiate an API client, in order to
+ # avoid tripping up clients that don't have access to an API
+ # server. If we do build one, make sure our Keep client uses
+ # it. If instantiation fails, we'll fall back to the except
+ # clause, just like any other Collection lookup
+ # failure. Return an exception, or None if successful.
+ try:
+ if self._api_client is None:
+ self._api_client = arvados.api('v1')
+ self._keep_client = None # Make a new one with the new api.
+ c = self._api_client.collections().get(
+ uuid=self._manifest_locator).execute(
+ num_retries=self.num_retries)
+ self._manifest_text = c['manifest_text']
+ return None
+ except Exception as e:
+ return e
+
+ def _populate_from_keep(self):
+ # Retrieve a manifest directly from Keep. This has a chance of
+ # working if [a] the locator includes a permission signature
+ # or [b] the Keep services are operating in world-readable
+ # mode. Return an exception, or None if successful.
+ try:
+ self._manifest_text = self._my_keep().get(
+ self._manifest_locator, num_retries=self.num_retries)
+ except Exception as e:
+ return e
def _populate(self):
if self._streams is not None:
return
- if not self._manifest_text:
- try:
- # As in KeepClient itself, we must wait until the last possible
- # moment to instantiate an API client, in order to avoid
- # tripping up clients that don't have access to an API server.
- # If we do build one, make sure our Keep client uses it.
- # If instantiation fails, we'll fall back to the except clause,
- # just like any other Collection lookup failure.
- if self._api_client is None:
- self._api_client = arvados.api('v1')
- self._keep_client = KeepClient(api_client=self._api_client)
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
- c = self._api_client.collections().get(
- uuid=self._manifest_locator).execute()
- self._manifest_text = c['manifest_text']
- except Exception as e:
- if not util.portable_data_hash_pattern.match(
- self._manifest_locator):
- raise
- _logger.warning("API lookup failed for collection %s (%s: %s)",
- self._manifest_locator, type(e), str(e))
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
- self._manifest_text = self._keep_client.get(self._manifest_locator)
- self._streams = []
- for stream_line in self._manifest_text.split("\n"):
- if stream_line != '':
- stream_tokens = stream_line.split()
- self._streams += [stream_tokens]
- self._streams = normalize(self)
-
- # now regenerate the manifest text based on the normalized stream
-
- #print "normalizing", self._manifest_text
- self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
- #print "result", self._manifest_text
+ error_via_api = None
+ error_via_keep = None
+ should_try_keep = ((self._manifest_text is None) and
+ util.keep_locator_pattern.match(
+ self._manifest_locator))
+ if ((self._manifest_text is None) and
+ util.signed_locator_pattern.match(self._manifest_locator)):
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ error_via_api = self._populate_from_api_server()
+ if error_via_api is not None and not should_try_keep:
+ raise error_via_api
+ if ((self._manifest_text is None) and
+ not error_via_keep and
+ should_try_keep):
+ # Looks like a keep locator, and we didn't already try keep above
+ error_via_keep = self._populate_from_keep()
+ if self._manifest_text is None:
+ # Nothing worked!
+ raise arvados.errors.NotFoundError(
+ ("Failed to retrieve collection '{}' " +
+ "from either API server ({}) or Keep ({})."
+ ).format(
+ self._manifest_locator,
+ error_via_api,
+ error_via_keep))
+ self._streams = [sline.split()
+ for sline in self._manifest_text.split("\n")
+ if sline]
+
+ def normalize(self):
+ self._populate()
+ # Rearrange streams
+ streams = {}
+ for s in self.all_streams():
+ for f in s.all_files():
+ filestream = s.name() + "/" + f.name()
+ r = filestream.rindex("/")
+ streamname = filestream[:r]
+ filename = filestream[r+1:]
+ if streamname not in streams:
+ streams[streamname] = {}
+ if filename not in streams[streamname]:
+ streams[streamname][filename] = []
+ for r in f.segments:
+ streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+ self._streams = [normalize_stream(s, streams[s])
+ for s in sorted(streams)]
+
+ # Regenerate the manifest text based on the normalized streams
+ self._manifest_text = ''.join(
+ [StreamReader(stream, keep=self._my_keep()).manifest_text()
+ for stream in self._streams])
def all_streams(self):
self._populate()
- resp = []
- for s in self._streams:
- resp.append(StreamReader(s, keep=self._keep_client))
- return resp
+ return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+ for s in self._streams]
def all_files(self):
for s in self.all_streams():
for f in s.all_files():
yield f
- def manifest_text(self, strip=False):
- self._populate()
- if strip:
- m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
- return m
+ def manifest_text(self, strip=False, normalize=False):
+ if normalize:
+ cr = CollectionReader(self.manifest_text())
+ cr.normalize()
+ return cr.manifest_text(strip=strip, normalize=False)
+ elif strip:
+ return self.stripped_manifest()
else:
+ self._populate()
return self._manifest_text
-class CollectionWriter(object):
+
+class CollectionWriter(CollectionBase):
KEEP_BLOCK_SIZE = 2**26
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
+ """Instantiate a CollectionWriter.
+
+ CollectionWriter lets you build a new Arvados Collection from scratch.
+ Write files to it. The CollectionWriter will upload data to Keep as
+ appropriate, and provide you with the Collection manifest text when
+ you're finished.
+
+ Arguments:
+ * api_client: The API client to use to look up Collections. If not
+ provided, CollectionReader will build one from available Arvados
+ configuration.
+ * num_retries: The default number of times to retry failed
+ service requests. Default 0. You may change this value
+ after instantiation, but note those changes may not
+ propagate to related objects like the Keep client.
+ """
self._api_client = api_client
+ self.num_retries = num_retries
self._keep_client = None
self._data_buffer = []
self._data_buffer_len = 0
self._queued_dirents = deque()
self._queued_trees = deque()
- def __enter__(self):
- pass
-
- def __exit__(self):
- self.finish()
-
- def _prep_keep_client(self):
- if self._keep_client is None:
- self._keep_client = KeepClient(api_client=self._api_client)
+ def __exit__(self, exc_type, exc_value, traceback):
+ if exc_type is None:
+ self.finish()
def do_queued_work(self):
# The work queue consists of three pieces:
def _work_trees(self):
path, stream_name, max_manifest_depth = self._queued_trees[0]
- make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
- else os.listdir)
- d = make_dirents(path)
- if len(d) > 0:
+ d = util.listdir_recursive(
+ path, max_depth = (None if max_manifest_depth == 0 else 0))
+ if d:
self._queue_dirents(stream_name, d)
else:
self._queued_trees.popleft()
for s in newdata:
self.write(s)
return
- self._data_buffer += [newdata]
+ self._data_buffer.append(newdata)
self._data_buffer_len += len(newdata)
self._current_stream_length += len(newdata)
while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
def flush_data(self):
data_buffer = ''.join(self._data_buffer)
- if data_buffer != '':
- self._prep_keep_client()
+ if data_buffer:
self._current_stream_locators.append(
- self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+ self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
self._data_buffer_len = len(self._data_buffer[0])
return self._current_file_name
def finish_current_file(self):
- if self._current_file_name == None:
+ if self._current_file_name is None:
if self._current_file_pos == self._current_stream_length:
return
raise errors.AssertionError(
(self._current_stream_length - self._current_file_pos,
self._current_file_pos,
self._current_stream_name))
- self._current_stream_files += [[self._current_file_pos,
- self._current_stream_length - self._current_file_pos,
- self._current_file_name]]
+ self._current_stream_files.append([
+ self._current_file_pos,
+ self._current_stream_length - self._current_file_pos,
+ self._current_file_name])
self._current_file_pos = self._current_stream_length
+ self._current_file_name = None
def start_new_stream(self, newstreamname='.'):
self.finish_current_stream()
def finish_current_stream(self):
self.finish_current_file()
self.flush_data()
- if len(self._current_stream_files) == 0:
+ if not self._current_stream_files:
pass
- elif self._current_stream_name == None:
+ elif self._current_stream_name is None:
raise errors.AssertionError(
"Cannot finish an unnamed stream (%d bytes in %d files)" %
(self._current_stream_length, len(self._current_stream_files)))
else:
- if len(self._current_stream_locators) == 0:
- self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
- self._finished_streams += [[self._current_stream_name,
- self._current_stream_locators,
- self._current_stream_files]]
+ if not self._current_stream_locators:
+ self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+ self._finished_streams.append([self._current_stream_name,
+ self._current_stream_locators,
+ self._current_stream_files])
self._current_stream_files = []
self._current_stream_length = 0
self._current_stream_locators = []
def finish(self):
# Store the manifest in Keep and return its locator.
- self._prep_keep_client()
- return self._keep_client.put(self.manifest_text())
+ return self._my_keep().put(self.manifest_text())
- def stripped_manifest(self):
- """
- Return the manifest for the current collection with all permission
- hints removed from the locators in the manifest.
- """
- raw = self.manifest_text()
- clean = ''
- for line in raw.split("\n"):
- fields = line.split()
- if len(fields) > 0:
- locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
- for x in fields[1:-1] ]
- clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
- return clean
+ def portable_data_hash(self):
+ stripped = self.stripped_manifest()
+ return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
def manifest_text(self):
self.finish_current_stream()
manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- if len(manifest) > 0:
- return CollectionReader(manifest).manifest_text()
- else:
- return ""
+ return manifest
def data_locators(self):
ret = []
'_data_buffer', '_dependencies', '_finished_streams',
'_queued_dirents', '_queued_trees']
- def __init__(self, api_client=None):
+ def __init__(self, api_client=None, num_retries=0):
self._dependencies = {}
- super(ResumableCollectionWriter, self).__init__(api_client)
+ super(ResumableCollectionWriter, self).__init__(
+ api_client, num_retries=num_retries)
@classmethod
def from_state(cls, state, *init_args, **init_kwargs):