27 def normalize_stream(s, stream):
29 sortedfiles = list(stream.keys())
36 if b[arvados.LOCATOR] not in blocks:
37 stream_tokens.append(b[arvados.LOCATOR])
38 blocks[b[arvados.LOCATOR]] = streamoffset
39 streamoffset += b[arvados.BLOCKSIZE]
43 fout = f.replace(' ', '\\040')
44 for segment in stream[f]:
45 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
46 if current_span == None:
47 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
49 if segmentoffset == current_span[1]:
50 current_span[1] += segment[arvados.SEGMENTSIZE]
52 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
55 if current_span != None:
56 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
58 if len(stream[f]) == 0:
59 stream_tokens.append("0:0:{0}".format(fout))
63 def normalize(collection):
65 for s in collection.all_streams():
66 for f in s.all_files():
67 filestream = s.name() + "/" + f.name()
68 r = filestream.rindex("/")
69 streamname = filestream[:r]
70 filename = filestream[r+1:]
71 if streamname not in streams:
72 streams[streamname] = {}
73 if filename not in streams[streamname]:
74 streams[streamname][filename] = []
76 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
78 normalized_streams = []
79 sortedstreams = list(streams.keys())
81 for s in sortedstreams:
82 normalized_streams.append(normalize_stream(s, streams[s]))
83 return normalized_streams
86 class CollectionReader(object):
87 def __init__(self, manifest_locator_or_text):
88 if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
89 self._manifest_locator = manifest_locator_or_text
90 self._manifest_text = None
91 elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
92 self._manifest_text = manifest_locator_or_text
93 self._manifest_locator = None
95 raise errors.ArgumentError(
96 "Argument to CollectionReader must be a manifest or a collection UUID")
106 if self._streams != None:
108 if not self._manifest_text:
110 c = arvados.api('v1').collections().get(
111 uuid=self._manifest_locator).execute()
112 self._manifest_text = c['manifest_text']
113 except Exception as e:
114 logging.warning("API lookup failed for collection %s (%s: %s)" %
115 (self._manifest_locator, type(e), str(e)))
116 self._manifest_text = Keep.get(self._manifest_locator)
118 for stream_line in self._manifest_text.split("\n"):
119 if stream_line != '':
120 stream_tokens = stream_line.split()
121 self._streams += [stream_tokens]
122 self._streams = normalize(self)
124 # now regenerate the manifest text based on the normalized stream
126 #print "normalizing", self._manifest_text
127 self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
128 #print "result", self._manifest_text
131 def all_streams(self):
134 for s in self._streams:
135 resp.append(StreamReader(s))
139 for s in self.all_streams():
140 for f in s.all_files():
143 def manifest_text(self):
145 return self._manifest_text
147 class CollectionWriter(object):
148 KEEP_BLOCK_SIZE = 2**26
151 self._data_buffer = []
152 self._data_buffer_len = 0
153 self._current_stream_files = []
154 self._current_stream_length = 0
155 self._current_stream_locators = []
156 self._current_stream_name = '.'
157 self._current_file_name = None
158 self._current_file_pos = 0
159 self._finished_streams = []
167 def write_directory_tree(self,
168 path, stream_name='.', max_manifest_depth=-1):
169 self.start_new_stream(stream_name)
171 if max_manifest_depth == 0:
172 dirents = sorted(util.listdir_recursive(path))
174 dirents = sorted(os.listdir(path))
175 for dirent in dirents:
176 target = os.path.join(path, dirent)
177 if os.path.isdir(target):
179 os.path.join(stream_name, dirent),
180 max_manifest_depth-1]]
182 self.start_new_file(dirent)
183 with open(target, 'rb') as f:
189 self.finish_current_stream()
190 map(lambda x: self.write_directory_tree(*x), todo)
192 def write(self, newdata):
193 if hasattr(newdata, '__iter__'):
197 self._data_buffer += [newdata]
198 self._data_buffer_len += len(newdata)
199 self._current_stream_length += len(newdata)
200 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
203 def flush_data(self):
204 data_buffer = ''.join(self._data_buffer)
205 if data_buffer != '':
206 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
207 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
208 self._data_buffer_len = len(self._data_buffer[0])
210 def start_new_file(self, newfilename=None):
211 self.finish_current_file()
212 self.set_current_file_name(newfilename)
214 def set_current_file_name(self, newfilename):
215 if re.search(r'[\t\n]', newfilename):
216 raise errors.AssertionError(
217 "Manifest filenames cannot contain whitespace: %s" %
219 self._current_file_name = newfilename
221 def current_file_name(self):
222 return self._current_file_name
224 def finish_current_file(self):
225 if self._current_file_name == None:
226 if self._current_file_pos == self._current_stream_length:
228 raise errors.AssertionError(
229 "Cannot finish an unnamed file " +
230 "(%d bytes at offset %d in '%s' stream)" %
231 (self._current_stream_length - self._current_file_pos,
232 self._current_file_pos,
233 self._current_stream_name))
234 self._current_stream_files += [[self._current_file_pos,
235 self._current_stream_length - self._current_file_pos,
236 self._current_file_name]]
237 self._current_file_pos = self._current_stream_length
239 def start_new_stream(self, newstreamname='.'):
240 self.finish_current_stream()
241 self.set_current_stream_name(newstreamname)
243 def set_current_stream_name(self, newstreamname):
244 if re.search(r'[\t\n]', newstreamname):
245 raise errors.AssertionError(
246 "Manifest stream names cannot contain whitespace")
247 self._current_stream_name = '.' if newstreamname=='' else newstreamname
249 def current_stream_name(self):
250 return self._current_stream_name
252 def finish_current_stream(self):
253 self.finish_current_file()
255 if len(self._current_stream_files) == 0:
257 elif self._current_stream_name == None:
258 raise errors.AssertionError(
259 "Cannot finish an unnamed stream (%d bytes in %d files)" %
260 (self._current_stream_length, len(self._current_stream_files)))
262 if len(self._current_stream_locators) == 0:
263 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
264 self._finished_streams += [[self._current_stream_name,
265 self._current_stream_locators,
266 self._current_stream_files]]
267 self._current_stream_files = []
268 self._current_stream_length = 0
269 self._current_stream_locators = []
270 self._current_stream_name = None
271 self._current_file_pos = 0
272 self._current_file_name = None
275 return Keep.put(self.manifest_text())
277 def manifest_text(self):
278 self.finish_current_stream()
281 for stream in self._finished_streams:
282 if not re.search(r'^\.(/.*)?$', stream[0]):
284 manifest += stream[0].replace(' ', '\\040')
285 manifest += ' ' + ' '.join(stream[1])
286 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
289 #print 'writer',manifest
290 #print 'after reader',CollectionReader(manifest).manifest_text()
292 return CollectionReader(manifest).manifest_text()
294 def data_locators(self):
296 for name, locators, files in self._finished_streams: