21 from collections import deque
30 def normalize_stream(s, stream):
32 sortedfiles = list(stream.keys())
39 if b[arvados.LOCATOR] not in blocks:
40 stream_tokens.append(b[arvados.LOCATOR])
41 blocks[b[arvados.LOCATOR]] = streamoffset
42 streamoffset += b[arvados.BLOCKSIZE]
46 fout = f.replace(' ', '\\040')
47 for segment in stream[f]:
48 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
49 if current_span == None:
50 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
52 if segmentoffset == current_span[1]:
53 current_span[1] += segment[arvados.SEGMENTSIZE]
55 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
58 if current_span != None:
59 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 if len(stream[f]) == 0:
62 stream_tokens.append("0:0:{0}".format(fout))
66 def normalize(collection):
68 for s in collection.all_streams():
69 for f in s.all_files():
70 filestream = s.name() + "/" + f.name()
71 r = filestream.rindex("/")
72 streamname = filestream[:r]
73 filename = filestream[r+1:]
74 if streamname not in streams:
75 streams[streamname] = {}
76 if filename not in streams[streamname]:
77 streams[streamname][filename] = []
79 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
81 normalized_streams = []
82 sortedstreams = list(streams.keys())
84 for s in sortedstreams:
85 normalized_streams.append(normalize_stream(s, streams[s]))
86 return normalized_streams
89 class CollectionReader(object):
90 def __init__(self, manifest_locator_or_text):
91 if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
92 self._manifest_locator = manifest_locator_or_text
93 self._manifest_text = None
94 elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
95 self._manifest_text = manifest_locator_or_text
96 self._manifest_locator = None
98 raise errors.ArgumentError(
99 "Argument to CollectionReader must be a manifest or a collection UUID")
109 if self._streams != None:
111 if not self._manifest_text:
113 c = arvados.api('v1').collections().get(
114 uuid=self._manifest_locator).execute()
115 self._manifest_text = c['manifest_text']
116 except Exception as e:
117 logging.warning("API lookup failed for collection %s (%s: %s)" %
118 (self._manifest_locator, type(e), str(e)))
119 self._manifest_text = Keep.get(self._manifest_locator)
121 for stream_line in self._manifest_text.split("\n"):
122 if stream_line != '':
123 stream_tokens = stream_line.split()
124 self._streams += [stream_tokens]
125 self._streams = normalize(self)
127 # now regenerate the manifest text based on the normalized stream
129 #print "normalizing", self._manifest_text
130 self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
131 #print "result", self._manifest_text
134 def all_streams(self):
137 for s in self._streams:
138 resp.append(StreamReader(s))
142 for s in self.all_streams():
143 for f in s.all_files():
146 def manifest_text(self):
148 return self._manifest_text
150 class CollectionWriter(object):
151 KEEP_BLOCK_SIZE = 2**26
154 self._data_buffer = []
155 self._data_buffer_len = 0
156 self._current_stream_files = []
157 self._current_stream_length = 0
158 self._current_stream_locators = []
159 self._current_stream_name = '.'
160 self._current_file_name = None
161 self._current_file_pos = 0
162 self._finished_streams = []
163 self._close_file = None
164 self._queued_file = None
165 self._queued_dirents = deque()
166 self._queued_trees = deque()
174 def _do_queued_work(self):
175 # The work queue consists of three pieces:
176 # * _queued_file: The file object we're currently writing to the
178 # * _queued_dirents: Entries under the current directory
179 # (_queued_trees[0]) that we want to write or recurse through.
180 # This may contain files from subdirectories if
181 # max_manifest_depth == 0 for this directory.
182 # * _queued_trees: Directories that should be written as separate
183 # streams to the Collection.
184 # This function handles the smallest piece of work currently queued
185 # (current file, then current directory, then next directory) until
186 # no work remains. The _work_THING methods each do a unit of work on
187 # THING. _queue_THING methods add a THING to the work queue.
189 if self._queued_file:
191 elif self._queued_dirents:
193 elif self._queued_trees:
197 self.checkpoint_state()
199 def checkpoint_state(self):
200 # Subclasses can implement this method to, e.g., report or record state.
203 def _work_file(self):
205 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
209 self.finish_current_file()
211 self._queued_file.close()
212 self._close_file = None
213 self._queued_file = None
215 def _work_dirents(self):
216 path, stream_name, max_manifest_depth = self._queued_trees[0]
217 if stream_name != self.current_stream_name():
218 self.start_new_stream(stream_name)
219 while self._queued_dirents:
220 dirent = self._queued_dirents.popleft()
221 target = os.path.join(path, dirent)
222 if os.path.isdir(target):
223 self._queue_tree(target,
224 os.path.join(stream_name, dirent),
225 max_manifest_depth - 1)
227 self._queue_file(target, dirent)
229 if not self._queued_dirents:
230 self._queued_trees.popleft()
232 def _work_trees(self):
233 path, stream_name, max_manifest_depth = self._queued_trees[0]
234 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
236 self._queue_dirents(stream_name, make_dirents(path))
238 def _queue_file(self, source, filename=None):
239 assert (self._queued_file is None), "tried to queue more than one file"
240 if not hasattr(source, 'read'):
241 source = open(source, 'rb')
242 self._close_file = True
244 self._close_file = False
246 filename = os.path.basename(source.name)
247 self.start_new_file(filename)
248 self._queued_file = source
250 def _queue_dirents(self, stream_name, dirents):
251 assert (not self._queued_dirents), "tried to queue more than one tree"
252 self._queued_dirents = deque(sorted(dirents))
254 def _queue_tree(self, path, stream_name, max_manifest_depth):
255 self._queued_trees.append((path, stream_name, max_manifest_depth))
257 def write_file(self, source, filename=None):
258 self._queue_file(source, filename)
259 self._do_queued_work()
261 def write_directory_tree(self,
262 path, stream_name='.', max_manifest_depth=-1):
263 self._queue_tree(path, stream_name, max_manifest_depth)
264 self._do_queued_work()
266 def write(self, newdata):
267 if hasattr(newdata, '__iter__'):
271 self._data_buffer += [newdata]
272 self._data_buffer_len += len(newdata)
273 self._current_stream_length += len(newdata)
274 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
277 def flush_data(self):
278 data_buffer = ''.join(self._data_buffer)
279 if data_buffer != '':
280 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
281 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
282 self._data_buffer_len = len(self._data_buffer[0])
283 self.checkpoint_state()
285 def start_new_file(self, newfilename=None):
286 self.finish_current_file()
287 self.set_current_file_name(newfilename)
289 def set_current_file_name(self, newfilename):
290 if re.search(r'[\t\n]', newfilename):
291 raise errors.AssertionError(
292 "Manifest filenames cannot contain whitespace: %s" %
294 self._current_file_name = newfilename
296 def current_file_name(self):
297 return self._current_file_name
299 def finish_current_file(self):
300 if self._current_file_name == None:
301 if self._current_file_pos == self._current_stream_length:
303 raise errors.AssertionError(
304 "Cannot finish an unnamed file " +
305 "(%d bytes at offset %d in '%s' stream)" %
306 (self._current_stream_length - self._current_file_pos,
307 self._current_file_pos,
308 self._current_stream_name))
309 self._current_stream_files += [[self._current_file_pos,
310 self._current_stream_length - self._current_file_pos,
311 self._current_file_name]]
312 self._current_file_pos = self._current_stream_length
314 def start_new_stream(self, newstreamname='.'):
315 self.finish_current_stream()
316 self.set_current_stream_name(newstreamname)
318 def set_current_stream_name(self, newstreamname):
319 if re.search(r'[\t\n]', newstreamname):
320 raise errors.AssertionError(
321 "Manifest stream names cannot contain whitespace")
322 self._current_stream_name = '.' if newstreamname=='' else newstreamname
324 def current_stream_name(self):
325 return self._current_stream_name
327 def finish_current_stream(self):
328 self.finish_current_file()
330 if len(self._current_stream_files) == 0:
332 elif self._current_stream_name == None:
333 raise errors.AssertionError(
334 "Cannot finish an unnamed stream (%d bytes in %d files)" %
335 (self._current_stream_length, len(self._current_stream_files)))
337 if len(self._current_stream_locators) == 0:
338 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
339 self._finished_streams += [[self._current_stream_name,
340 self._current_stream_locators,
341 self._current_stream_files]]
342 self._current_stream_files = []
343 self._current_stream_length = 0
344 self._current_stream_locators = []
345 self._current_stream_name = None
346 self._current_file_pos = 0
347 self._current_file_name = None
350 return Keep.put(self.manifest_text())
352 def manifest_text(self):
353 self.finish_current_stream()
356 for stream in self._finished_streams:
357 if not re.search(r'^\.(/.*)?$', stream[0]):
359 manifest += stream[0].replace(' ', '\\040')
360 manifest += ' ' + ' '.join(stream[1])
361 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
364 #print 'writer',manifest
365 #print 'after reader',CollectionReader(manifest).manifest_text()
367 return CollectionReader(manifest).manifest_text()
369 def data_locators(self):
371 for name, locators, files in self._finished_streams:
376 class ResumableCollectionWriter(CollectionWriter):
377 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
378 '_current_stream_locators', '_current_stream_name',
379 '_current_file_name', '_current_file_pos', '_close_file',
380 '_data_buffer', '_dependencies', '_finished_streams',
381 '_queued_dirents', '_queued_trees']
384 self._dependencies = {}
385 super(ResumableCollectionWriter, self).__init__()
388 def from_state(cls, state):
390 for attr_name in cls.STATE_PROPS:
391 attr_value = state[attr_name]
392 attr_class = getattr(writer, attr_name).__class__
393 # Coerce the value into the same type as the initial value, if
395 if attr_class not in (type(None), attr_value.__class__):
396 attr_value = attr_class(attr_value)
397 setattr(writer, attr_name, attr_value)
398 # Check dependencies before we try to resume anything.
399 writer.check_dependencies()
400 if state['_current_file'] is not None:
401 path, pos = state['_current_file']
403 writer._queued_file = open(path, 'rb')
404 writer._queued_file.seek(pos)
405 except IOError as error:
406 raise errors.StaleWriterStateError(
407 "failed to reopen active file {}: {}".format(path, error))
408 writer._do_queued_work()
411 def check_dependencies(self):
412 for path, orig_stat in self._dependencies.items():
413 if not S_ISREG(orig_stat[ST_MODE]):
414 raise errors.StaleWriterStateError("{} not file".format(path))
416 now_stat = tuple(os.stat(path))
417 except OSError as error:
418 raise errors.StaleWriterStateError(
419 "failed to stat {}: {}".format(path, error))
420 if ((not S_ISREG(now_stat[ST_MODE])) or
421 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
422 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
423 raise errors.StaleWriterStateError("{} changed".format(path))
425 def dump_state(self, copy_func=lambda x: x):
426 state = {attr: copy_func(getattr(self, attr))
427 for attr in self.STATE_PROPS}
428 if self._queued_file is None:
429 state['_current_file'] = None
431 state['_current_file'] = (os.path.realpath(self._queued_file.name),
432 self._queued_file.tell())
435 def _queue_file(self, source, filename=None):
437 src_path = os.path.realpath(source)
439 raise errors.AssertionError("{} not a file path".format(source))
441 path_stat = os.stat(src_path)
442 except OSError as error:
443 raise errors.AssertionError(
444 "could not stat {}: {}".format(source, error))
445 super(ResumableCollectionWriter, self)._queue_file(source, filename)
446 fd_stat = os.fstat(self._queued_file.fileno())
447 if path_stat.st_ino != fd_stat.st_ino:
448 raise errors.AssertionError(
449 "{} changed between open and stat calls".format(source))
450 self._dependencies[src_path] = tuple(fd_stat)
452 def write(self, data):
453 if self._queued_file is None:
454 raise errors.AssertionError(
455 "resumable writer can't accept unsourced data")
456 return super(ResumableCollectionWriter, self).write(data)