21 from collections import deque
30 def normalize_stream(s, stream):
32 sortedfiles = list(stream.keys())
39 if b[arvados.LOCATOR] not in blocks:
40 stream_tokens.append(b[arvados.LOCATOR])
41 blocks[b[arvados.LOCATOR]] = streamoffset
42 streamoffset += b[arvados.BLOCKSIZE]
44 if len(stream_tokens) == 1:
45 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
49 fout = f.replace(' ', '\\040')
50 for segment in stream[f]:
51 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
52 if current_span == None:
53 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
55 if segmentoffset == current_span[1]:
56 current_span[1] += segment[arvados.SEGMENTSIZE]
58 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
61 if current_span != None:
62 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
64 if len(stream[f]) == 0:
65 stream_tokens.append("0:0:{0}".format(fout))
69 def normalize(collection):
71 for s in collection.all_streams():
72 for f in s.all_files():
73 filestream = s.name() + "/" + f.name()
74 r = filestream.rindex("/")
75 streamname = filestream[:r]
76 filename = filestream[r+1:]
77 if streamname not in streams:
78 streams[streamname] = {}
79 if filename not in streams[streamname]:
80 streams[streamname][filename] = []
82 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
84 normalized_streams = []
85 sortedstreams = list(streams.keys())
87 for s in sortedstreams:
88 normalized_streams.append(normalize_stream(s, streams[s]))
89 return normalized_streams
92 class CollectionReader(object):
93 def __init__(self, manifest_locator_or_text):
94 if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
95 self._manifest_locator = manifest_locator_or_text
96 self._manifest_text = None
97 elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
98 self._manifest_text = manifest_locator_or_text
99 self._manifest_locator = None
101 raise errors.ArgumentError(
102 "Argument to CollectionReader must be a manifest or a collection UUID")
112 if self._streams != None:
114 if not self._manifest_text:
116 c = arvados.api('v1').collections().get(
117 uuid=self._manifest_locator).execute()
118 self._manifest_text = c['manifest_text']
119 except Exception as e:
120 logging.warning("API lookup failed for collection %s (%s: %s)" %
121 (self._manifest_locator, type(e), str(e)))
122 self._manifest_text = Keep.get(self._manifest_locator)
124 for stream_line in self._manifest_text.split("\n"):
125 if stream_line != '':
126 stream_tokens = stream_line.split()
127 self._streams += [stream_tokens]
128 self._streams = normalize(self)
130 # now regenerate the manifest text based on the normalized stream
132 #print "normalizing", self._manifest_text
133 self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
134 #print "result", self._manifest_text
137 def all_streams(self):
140 for s in self._streams:
141 resp.append(StreamReader(s))
145 for s in self.all_streams():
146 for f in s.all_files():
149 def manifest_text(self, strip=False):
152 m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
155 return self._manifest_text
157 class CollectionWriter(object):
158 KEEP_BLOCK_SIZE = 2**26
161 self._data_buffer = []
162 self._data_buffer_len = 0
163 self._current_stream_files = []
164 self._current_stream_length = 0
165 self._current_stream_locators = []
166 self._current_stream_name = '.'
167 self._current_file_name = None
168 self._current_file_pos = 0
169 self._finished_streams = []
170 self._close_file = None
171 self._queued_file = None
172 self._queued_dirents = deque()
173 self._queued_trees = deque()
181 def do_queued_work(self):
182 # The work queue consists of three pieces:
183 # * _queued_file: The file object we're currently writing to the
185 # * _queued_dirents: Entries under the current directory
186 # (_queued_trees[0]) that we want to write or recurse through.
187 # This may contain files from subdirectories if
188 # max_manifest_depth == 0 for this directory.
189 # * _queued_trees: Directories that should be written as separate
190 # streams to the Collection.
191 # This function handles the smallest piece of work currently queued
192 # (current file, then current directory, then next directory) until
193 # no work remains. The _work_THING methods each do a unit of work on
194 # THING. _queue_THING methods add a THING to the work queue.
196 if self._queued_file:
198 elif self._queued_dirents:
200 elif self._queued_trees:
205 def _work_file(self):
207 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
211 self.finish_current_file()
213 self._queued_file.close()
214 self._close_file = None
215 self._queued_file = None
217 def _work_dirents(self):
218 path, stream_name, max_manifest_depth = self._queued_trees[0]
219 if stream_name != self.current_stream_name():
220 self.start_new_stream(stream_name)
221 while self._queued_dirents:
222 dirent = self._queued_dirents.popleft()
223 target = os.path.join(path, dirent)
224 if os.path.isdir(target):
225 self._queue_tree(target,
226 os.path.join(stream_name, dirent),
227 max_manifest_depth - 1)
229 self._queue_file(target, dirent)
231 if not self._queued_dirents:
232 self._queued_trees.popleft()
234 def _work_trees(self):
235 path, stream_name, max_manifest_depth = self._queued_trees[0]
236 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
238 self._queue_dirents(stream_name, make_dirents(path))
240 def _queue_file(self, source, filename=None):
241 assert (self._queued_file is None), "tried to queue more than one file"
242 if not hasattr(source, 'read'):
243 source = open(source, 'rb')
244 self._close_file = True
246 self._close_file = False
248 filename = os.path.basename(source.name)
249 self.start_new_file(filename)
250 self._queued_file = source
252 def _queue_dirents(self, stream_name, dirents):
253 assert (not self._queued_dirents), "tried to queue more than one tree"
254 self._queued_dirents = deque(sorted(dirents))
256 def _queue_tree(self, path, stream_name, max_manifest_depth):
257 self._queued_trees.append((path, stream_name, max_manifest_depth))
259 def write_file(self, source, filename=None):
260 self._queue_file(source, filename)
261 self.do_queued_work()
263 def write_directory_tree(self,
264 path, stream_name='.', max_manifest_depth=-1):
265 self._queue_tree(path, stream_name, max_manifest_depth)
266 self.do_queued_work()
268 def write(self, newdata):
269 if hasattr(newdata, '__iter__'):
273 self._data_buffer += [newdata]
274 self._data_buffer_len += len(newdata)
275 self._current_stream_length += len(newdata)
276 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
279 def flush_data(self):
280 data_buffer = ''.join(self._data_buffer)
281 if data_buffer != '':
282 self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
283 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
284 self._data_buffer_len = len(self._data_buffer[0])
286 def start_new_file(self, newfilename=None):
287 self.finish_current_file()
288 self.set_current_file_name(newfilename)
290 def set_current_file_name(self, newfilename):
291 if re.search(r'[\t\n]', newfilename):
292 raise errors.AssertionError(
293 "Manifest filenames cannot contain whitespace: %s" %
295 self._current_file_name = newfilename
297 def current_file_name(self):
298 return self._current_file_name
300 def finish_current_file(self):
301 if self._current_file_name == None:
302 if self._current_file_pos == self._current_stream_length:
304 raise errors.AssertionError(
305 "Cannot finish an unnamed file " +
306 "(%d bytes at offset %d in '%s' stream)" %
307 (self._current_stream_length - self._current_file_pos,
308 self._current_file_pos,
309 self._current_stream_name))
310 self._current_stream_files += [[self._current_file_pos,
311 self._current_stream_length - self._current_file_pos,
312 self._current_file_name]]
313 self._current_file_pos = self._current_stream_length
315 def start_new_stream(self, newstreamname='.'):
316 self.finish_current_stream()
317 self.set_current_stream_name(newstreamname)
319 def set_current_stream_name(self, newstreamname):
320 if re.search(r'[\t\n]', newstreamname):
321 raise errors.AssertionError(
322 "Manifest stream names cannot contain whitespace")
323 self._current_stream_name = '.' if newstreamname=='' else newstreamname
325 def current_stream_name(self):
326 return self._current_stream_name
328 def finish_current_stream(self):
329 self.finish_current_file()
331 if len(self._current_stream_files) == 0:
333 elif self._current_stream_name == None:
334 raise errors.AssertionError(
335 "Cannot finish an unnamed stream (%d bytes in %d files)" %
336 (self._current_stream_length, len(self._current_stream_files)))
338 if len(self._current_stream_locators) == 0:
339 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
340 self._finished_streams += [[self._current_stream_name,
341 self._current_stream_locators,
342 self._current_stream_files]]
343 self._current_stream_files = []
344 self._current_stream_length = 0
345 self._current_stream_locators = []
346 self._current_stream_name = None
347 self._current_file_pos = 0
348 self._current_file_name = None
351 # Send the stripped manifest to Keep, to ensure that we use the
352 # same UUID regardless of what hints are used on the collection.
353 return Keep.put(self.stripped_manifest())
355 def stripped_manifest(self):
357 Return the manifest for the current collection with all permission
358 hints removed from the locators in the manifest.
360 raw = self.manifest_text()
362 for line in raw.split("\n"):
363 fields = line.split()
365 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
366 for x in fields[1:-1] ]
367 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
370 def manifest_text(self):
371 self.finish_current_stream()
374 for stream in self._finished_streams:
375 if not re.search(r'^\.(/.*)?$', stream[0]):
377 manifest += stream[0].replace(' ', '\\040')
378 manifest += ' ' + ' '.join(stream[1])
379 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
382 #print 'writer',manifest
383 #print 'after reader',CollectionReader(manifest).manifest_text()
385 return CollectionReader(manifest).manifest_text()
387 def data_locators(self):
389 for name, locators, files in self._finished_streams:
394 class ResumableCollectionWriter(CollectionWriter):
395 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
396 '_current_stream_locators', '_current_stream_name',
397 '_current_file_name', '_current_file_pos', '_close_file',
398 '_data_buffer', '_dependencies', '_finished_streams',
399 '_queued_dirents', '_queued_trees']
402 self._dependencies = {}
403 super(ResumableCollectionWriter, self).__init__()
406 def from_state(cls, state, *init_args, **init_kwargs):
407 # Try to build a new writer from scratch with the given state.
408 # If the state is not suitable to resume (because files have changed,
409 # been deleted, aren't predictable, etc.), raise a
410 # StaleWriterStateError. Otherwise, return the initialized writer.
411 # The caller is responsible for calling writer.do_queued_work()
412 # appropriately after it's returned.
413 writer = cls(*init_args, **init_kwargs)
414 for attr_name in cls.STATE_PROPS:
415 attr_value = state[attr_name]
416 attr_class = getattr(writer, attr_name).__class__
417 # Coerce the value into the same type as the initial value, if
419 if attr_class not in (type(None), attr_value.__class__):
420 attr_value = attr_class(attr_value)
421 setattr(writer, attr_name, attr_value)
422 # Check dependencies before we try to resume anything.
423 if any(KeepLocator(ls).permission_expired()
424 for ls in writer._current_stream_locators):
425 raise errors.StaleWriterStateError(
426 "locators include expired permission hint")
427 writer.check_dependencies()
428 if state['_current_file'] is not None:
429 path, pos = state['_current_file']
431 writer._queued_file = open(path, 'rb')
432 writer._queued_file.seek(pos)
433 except IOError as error:
434 raise errors.StaleWriterStateError(
435 "failed to reopen active file {}: {}".format(path, error))
438 def check_dependencies(self):
439 for path, orig_stat in self._dependencies.items():
440 if not S_ISREG(orig_stat[ST_MODE]):
441 raise errors.StaleWriterStateError("{} not file".format(path))
443 now_stat = tuple(os.stat(path))
444 except OSError as error:
445 raise errors.StaleWriterStateError(
446 "failed to stat {}: {}".format(path, error))
447 if ((not S_ISREG(now_stat[ST_MODE])) or
448 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
449 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
450 raise errors.StaleWriterStateError("{} changed".format(path))
452 def dump_state(self, copy_func=lambda x: x):
453 state = {attr: copy_func(getattr(self, attr))
454 for attr in self.STATE_PROPS}
455 if self._queued_file is None:
456 state['_current_file'] = None
458 state['_current_file'] = (os.path.realpath(self._queued_file.name),
459 self._queued_file.tell())
462 def _queue_file(self, source, filename=None):
464 src_path = os.path.realpath(source)
466 raise errors.AssertionError("{} not a file path".format(source))
468 path_stat = os.stat(src_path)
469 except OSError as stat_error:
471 super(ResumableCollectionWriter, self)._queue_file(source, filename)
472 fd_stat = os.fstat(self._queued_file.fileno())
473 if not S_ISREG(fd_stat.st_mode):
474 # We won't be able to resume from this cache anyway, so don't
475 # worry about further checks.
476 self._dependencies[source] = tuple(fd_stat)
477 elif path_stat is None:
478 raise errors.AssertionError(
479 "could not stat {}: {}".format(source, stat_error))
480 elif path_stat.st_ino != fd_stat.st_ino:
481 raise errors.AssertionError(
482 "{} changed between open and stat calls".format(source))
484 self._dependencies[src_path] = tuple(fd_stat)
486 def write(self, data):
487 if self._queued_file is None:
488 raise errors.AssertionError(
489 "resumable writer can't accept unsourced data")
490 return super(ResumableCollectionWriter, self).write(data)