import errors
import util
+def normalize_stream(s, stream):
+ stream_tokens = [s]
+ sortedfiles = list(stream.keys())
+ sortedfiles.sort()
+
+ blocks = {}
+ streamoffset = 0L
+ for f in sortedfiles:
+ for b in stream[f]:
+ if b[arvados.LOCATOR] not in blocks:
+ stream_tokens.append(b[arvados.LOCATOR])
+ blocks[b[arvados.LOCATOR]] = streamoffset
+ streamoffset += b[arvados.BLOCKSIZE]
+
+ for f in sortedfiles:
+ current_span = None
+ fout = f.replace(' ', '\\040')
+ for segment in stream[f]:
+ segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
+ if current_span == None:
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+ else:
+ if segmentoffset == current_span[1]:
+ current_span[1] += segment[arvados.SEGMENTSIZE]
+ else:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+ current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
+
+ if current_span != None:
+ stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
+
+ if len(stream[f]) == 0:
+ stream_tokens.append("0:0:{0}".format(fout))
+
+ return stream_tokens
+
+def normalize(collection):
+ streams = {}
+ for s in collection.all_streams():
+ for f in s.all_files():
+ filestream = s.name() + "/" + f.name()
+ r = filestream.rindex("/")
+ streamname = filestream[:r]
+ filename = filestream[r+1:]
+ if streamname not in streams:
+ streams[streamname] = {}
+ if filename not in streams[streamname]:
+ streams[streamname][filename] = []
+ for r in f.segments:
+ streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+ normalized_streams = []
+ sortedstreams = list(streams.keys())
+ sortedstreams.sort()
+ for s in sortedstreams:
+ normalized_streams.append(normalize_stream(s, streams[s]))
+ return normalized_streams
+
+
class CollectionReader(object):
def __init__(self, manifest_locator_or_text):
- if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+ if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+ self._manifest_locator = manifest_locator_or_text
+ self._manifest_text = None
+ elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
self._manifest_text = manifest_locator_or_text
self._manifest_locator = None
else:
- self._manifest_locator = manifest_locator_or_text
- self._manifest_text = None
+ raise errors.ArgumentError(
+ "Argument to CollectionReader must be a manifest or a collection UUID")
self._streams = None
def __enter__(self):
if self._streams != None:
return
if not self._manifest_text:
- self._manifest_text = Keep.get(self._manifest_locator)
+ try:
+ c = arvados.api('v1').collections().get(
+ uuid=self._manifest_locator).execute()
+ self._manifest_text = c['manifest_text']
+ except Exception as e:
+ logging.warning("API lookup failed for collection %s (%s: %s)" %
+ (self._manifest_locator, type(e), str(e)))
+ self._manifest_text = Keep.get(self._manifest_locator)
self._streams = []
for stream_line in self._manifest_text.split("\n"):
if stream_line != '':
stream_tokens = stream_line.split()
self._streams += [stream_tokens]
+ self._streams = normalize(self)
+
+ # now regenerate the manifest text based on the normalized stream
+
+ #print "normalizing", self._manifest_text
+ self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+ #print "result", self._manifest_text
+
def all_streams(self):
self._populate()
resp = []
for s in self._streams:
- resp += [StreamReader(s)]
+ resp.append(StreamReader(s))
return resp
def all_files(self):
def manifest_text(self):
self.finish_current_stream()
manifest = ''
+
for stream in self._finished_streams:
if not re.search(r'^\.(/.*)?$', stream[0]):
manifest += './'
manifest += stream[0].replace(' ', '\\040')
- for locator in stream[1]:
- manifest += " %s" % locator
- for sfile in stream[2]:
- manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
+ manifest += ' ' + ' '.join(stream[1])
+ manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
manifest += "\n"
- return manifest
+
+ #print 'writer',manifest
+ #print 'after reader',CollectionReader(manifest).manifest_text()
+
+ return CollectionReader(manifest).manifest_text()
def data_locators(self):
ret = []