26 def normalize(collection):
28 for s in collection.all_streams():
29 for f in s.all_files():
30 filestream = s.name() + "/" + f.name()
31 r = filestream.rindex("/")
32 streamname = filestream[:r]
33 filename = filestream[r+1:]
34 if streamname not in streams:
35 streams[streamname] = {}
36 if filename not in streams[streamname]:
37 streams[streamname][filename] = []
38 streams[streamname][filename].extend(s.locators_and_ranges(f.stream_offset(), f.size()))
40 normalized_streams = []
41 sortedstreams = list(streams.keys())
43 for s in sortedstreams:
47 sortedfiles = list(stream.keys())
54 if b[StreamReader.LOCATOR] not in blocks:
55 stream_tokens.append(b[StreamReader.LOCATOR])
56 blocks[b[StreamReader.LOCATOR]] = streamoffset
57 streamoffset += b[StreamReader.BLOCKSIZE]
61 fout = f.replace(' ', '\\040')
62 for chunk in stream[f]:
63 chunkoffset = blocks[chunk[StreamReader.LOCATOR]] + chunk[StreamReader.OFFSET]
64 if current_span == None:
65 current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
67 if chunkoffset == current_span[1]:
68 current_span[1] += chunk[StreamReader.CHUNKSIZE]
70 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
71 current_span = [chunkoffset, chunkoffset + chunk[StreamReader.CHUNKSIZE]]
73 if current_span != None:
74 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
76 normalized_streams.append(stream_tokens)
77 return normalized_streams
80 class CollectionReader(object):
81 def __init__(self, manifest_locator_or_text):
82 if re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
83 self._manifest_text = manifest_locator_or_text
84 self._manifest_locator = None
86 self._manifest_locator = manifest_locator_or_text
87 self._manifest_text = None
97 if self._streams != None:
99 if not self._manifest_text:
101 c = arvados.api('v1').collections().get(
102 uuid=self._manifest_locator).execute()
103 self._manifest_text = c['manifest_text']
104 except Exception as e:
105 logging.warning("API lookup failed for collection %s (%s: %s)" %
106 (self._manifest_locator, type(e), str(e)))
107 self._manifest_text = Keep.get(self._manifest_locator)
109 for stream_line in self._manifest_text.split("\n"):
110 if stream_line != '':
111 stream_tokens = stream_line.split()
112 self._streams += [stream_tokens]
113 self._streams = normalize(self)
115 # now regenerate the manifest text based on the normalized stream
117 #print "normalizing", self._manifest_text
118 self._manifest_text = ''
119 for stream in self._streams:
120 self._manifest_text += stream[0].replace(' ', '\\040')
122 self._manifest_text += (" " + t.replace(' ', '\\040'))
123 self._manifest_text += "\n"
124 #print "result ", self._manifest_text
126 def all_streams(self):
129 for s in self._streams:
130 resp.append(StreamReader(s))
134 for s in self.all_streams():
135 for f in s.all_files():
138 def manifest_text(self):
140 return self._manifest_text
142 class CollectionWriter(object):
143 KEEP_BLOCK_SIZE = 2**26
146 self._data_buffer = []
147 self._data_buffer_len = 0
148 self._current_stream_files = []
149 self._current_stream_length = 0
150 self._current_stream_locators = []
151 self._current_stream_name = '.'
152 self._current_file_name = None
153 self._current_file_pos = 0
154 self._finished_streams = []
162 def write_directory_tree(self,
163 path, stream_name='.', max_manifest_depth=-1):
164 self.start_new_stream(stream_name)
166 if max_manifest_depth == 0:
167 dirents = sorted(util.listdir_recursive(path))
169 dirents = sorted(os.listdir(path))
170 for dirent in dirents:
171 target = os.path.join(path, dirent)
172 if os.path.isdir(target):
174 os.path.join(stream_name, dirent),
175 max_manifest_depth-1]]
177 self.start_new_file(dirent)
178 with open(target, 'rb') as f:
184 self.finish_current_stream()
185 map(lambda x: self.write_directory_tree(*x), todo)
187 def write(self, newdata):
188 if hasattr(newdata, '__iter__'):
192 self._data_buffer += [newdata]
193 self._data_buffer_len += len(newdata)
194 self._current_stream_length += len(newdata)
195 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
198 def flush_data(self):
199 data_buffer = ''.join(self._data_buffer)
200 if data_buffer != '':
201 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
202 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
203 self._data_buffer_len = len(self._data_buffer[0])
205 def start_new_file(self, newfilename=None):
206 self.finish_current_file()
207 self.set_current_file_name(newfilename)
209 def set_current_file_name(self, newfilename):
210 if re.search(r'[\t\n]', newfilename):
211 raise errors.AssertionError(
212 "Manifest filenames cannot contain whitespace: %s" %
214 self._current_file_name = newfilename
216 def current_file_name(self):
217 return self._current_file_name
219 def finish_current_file(self):
220 if self._current_file_name == None:
221 if self._current_file_pos == self._current_stream_length:
223 raise errors.AssertionError(
224 "Cannot finish an unnamed file " +
225 "(%d bytes at offset %d in '%s' stream)" %
226 (self._current_stream_length - self._current_file_pos,
227 self._current_file_pos,
228 self._current_stream_name))
229 self._current_stream_files += [[self._current_file_pos,
230 self._current_stream_length - self._current_file_pos,
231 self._current_file_name]]
232 self._current_file_pos = self._current_stream_length
234 def start_new_stream(self, newstreamname='.'):
235 self.finish_current_stream()
236 self.set_current_stream_name(newstreamname)
238 def set_current_stream_name(self, newstreamname):
239 if re.search(r'[\t\n]', newstreamname):
240 raise errors.AssertionError(
241 "Manifest stream names cannot contain whitespace")
242 self._current_stream_name = '.' if newstreamname=='' else newstreamname
244 def current_stream_name(self):
245 return self._current_stream_name
247 def finish_current_stream(self):
248 self.finish_current_file()
250 if len(self._current_stream_files) == 0:
252 elif self._current_stream_name == None:
253 raise errors.AssertionError(
254 "Cannot finish an unnamed stream (%d bytes in %d files)" %
255 (self._current_stream_length, len(self._current_stream_files)))
257 if len(self._current_stream_locators) == 0:
258 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
259 self._finished_streams += [[self._current_stream_name,
260 self._current_stream_locators,
261 self._current_stream_files]]
262 self._current_stream_files = []
263 self._current_stream_length = 0
264 self._current_stream_locators = []
265 self._current_stream_name = None
266 self._current_file_pos = 0
267 self._current_file_name = None
270 return Keep.put(self.manifest_text())
273 def manifest_text(self):
274 self.finish_current_stream()
276 for stream in self._finished_streams:
277 if not re.search(r'^\.(/.*)?$', stream[0]):
279 manifest += stream[0].replace(' ', '\\040')
280 for locator in stream[1]:
281 manifest += " %s" % locator
282 for sfile in stream[2]:
283 manifest += " %d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040'))
285 return CollectionReader(manifest).manifest_text()
287 def data_locators(self):
289 for name, locators, files in self._finished_streams: