21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
66 if len(stream[f]) == 0:
67 stream_tokens.append("0:0:{0}".format(fout))
71 def normalize(collection):
73 for s in collection.all_streams():
74 for f in s.all_files():
75 filestream = s.name() + "/" + f.name()
76 r = filestream.rindex("/")
77 streamname = filestream[:r]
78 filename = filestream[r+1:]
79 if streamname not in streams:
80 streams[streamname] = {}
81 if filename not in streams[streamname]:
82 streams[streamname][filename] = []
84 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
86 normalized_streams = []
87 sortedstreams = list(streams.keys())
89 for s in sortedstreams:
90 normalized_streams.append(normalize_stream(s, streams[s]))
91 return normalized_streams
94 class CollectionBase(object):
102 if self._keep_client is None:
103 self._keep_client = KeepClient(api_client=self._api_client,
104 num_retries=self.num_retries)
105 return self._keep_client
108 class CollectionReader(CollectionBase):
109 def __init__(self, manifest_locator_or_text, api_client=None,
110 keep_client=None, num_retries=0):
111 """Instantiate a CollectionReader.
113 This class parses Collection manifests to provide a simple interface
114 to read its underlying files.
117 * manifest_locator_or_text: One of a Collection UUID, portable data
118 hash, or full manifest text.
119 * api_client: The API client to use to look up Collections. If not
120 provided, CollectionReader will build one from available Arvados
122 * keep_client: The KeepClient to use to download Collection data.
123 If not provided, CollectionReader will build one from available
124 Arvados configuration.
125 * num_retries: The default number of times to retry failed
126 service requests. Default 0. You may change this value
127 after instantiation, but note those changes may not
128 propagate to related objects like the Keep client.
130 self._api_client = api_client
131 self._keep_client = keep_client
132 self.num_retries = num_retries
133 if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
134 self._manifest_locator = manifest_locator_or_text
135 self._manifest_text = None
136 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
137 self._manifest_locator = manifest_locator_or_text
138 self._manifest_text = None
139 elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
140 self._manifest_text = manifest_locator_or_text
141 self._manifest_locator = None
143 raise errors.ArgumentError(
144 "Argument to CollectionReader must be a manifest or a collection UUID")
148 if self._streams is not None:
151 error_via_keep = None
152 should_try_keep = (not self._manifest_text and
153 util.keep_locator_pattern.match(
154 self._manifest_locator))
155 if (not self._manifest_text and
156 util.signed_locator_pattern.match(self._manifest_locator)):
158 self._populate_from_keep()
161 if not self._manifest_text:
163 self._populate_from_api_server()
164 except Exception as e:
165 if not should_try_keep:
168 if (not self._manifest_text and
169 not error_via_keep and
171 # Looks like a keep locator, and we didn't already try keep above
173 self._populate_from_keep()
174 except Exception as e:
176 if not self._manifest_text:
178 raise arvados.errors.NotFoundError(
179 ("Failed to retrieve collection '{}' " +
180 "from either API server ({}) or Keep ({})."
182 self._manifest_locator,
185 self._streams = [sline.split()
186 for sline in self._manifest_text.split("\n")
188 self._streams = normalize(self)
190 # now regenerate the manifest text based on the normalized stream
192 #print "normalizing", self._manifest_text
193 self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
194 #print "result", self._manifest_text
197 def all_streams(self):
199 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
200 for s in self._streams]
203 for s in self.all_streams():
204 for f in s.all_files():
207 def manifest_text(self, strip=False):
210 m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
213 return self._manifest_text
216 class CollectionWriter(CollectionBase):
217 KEEP_BLOCK_SIZE = 2**26
219 def __init__(self, api_client=None, num_retries=0):
220 """Instantiate a CollectionWriter.
222 CollectionWriter lets you build a new Arvados Collection from scratch.
223 Write files to it. The CollectionWriter will upload data to Keep as
224 appropriate, and provide you with the Collection manifest text when
228 * api_client: The API client to use to look up Collections. If not
229 provided, CollectionReader will build one from available Arvados
231 * num_retries: The default number of times to retry failed
232 service requests. Default 0. You may change this value
233 after instantiation, but note those changes may not
234 propagate to related objects like the Keep client.
236 self._api_client = api_client
237 self.num_retries = num_retries
238 self._keep_client = None
239 self._data_buffer = []
240 self._data_buffer_len = 0
241 self._current_stream_files = []
242 self._current_stream_length = 0
243 self._current_stream_locators = []
244 self._current_stream_name = '.'
245 self._current_file_name = None
246 self._current_file_pos = 0
247 self._finished_streams = []
248 self._close_file = None
249 self._queued_file = None
250 self._queued_dirents = deque()
251 self._queued_trees = deque()
256 def do_queued_work(self):
257 # The work queue consists of three pieces:
258 # * _queued_file: The file object we're currently writing to the
260 # * _queued_dirents: Entries under the current directory
261 # (_queued_trees[0]) that we want to write or recurse through.
262 # This may contain files from subdirectories if
263 # max_manifest_depth == 0 for this directory.
264 # * _queued_trees: Directories that should be written as separate
265 # streams to the Collection.
266 # This function handles the smallest piece of work currently queued
267 # (current file, then current directory, then next directory) until
268 # no work remains. The _work_THING methods each do a unit of work on
269 # THING. _queue_THING methods add a THING to the work queue.
271 if self._queued_file:
273 elif self._queued_dirents:
275 elif self._queued_trees:
280 def _work_file(self):
282 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
286 self.finish_current_file()
288 self._queued_file.close()
289 self._close_file = None
290 self._queued_file = None
292 def _work_dirents(self):
293 path, stream_name, max_manifest_depth = self._queued_trees[0]
294 if stream_name != self.current_stream_name():
295 self.start_new_stream(stream_name)
296 while self._queued_dirents:
297 dirent = self._queued_dirents.popleft()
298 target = os.path.join(path, dirent)
299 if os.path.isdir(target):
300 self._queue_tree(target,
301 os.path.join(stream_name, dirent),
302 max_manifest_depth - 1)
304 self._queue_file(target, dirent)
306 if not self._queued_dirents:
307 self._queued_trees.popleft()
309 def _work_trees(self):
310 path, stream_name, max_manifest_depth = self._queued_trees[0]
311 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
313 d = make_dirents(path)
315 self._queue_dirents(stream_name, d)
317 self._queued_trees.popleft()
319 def _queue_file(self, source, filename=None):
320 assert (self._queued_file is None), "tried to queue more than one file"
321 if not hasattr(source, 'read'):
322 source = open(source, 'rb')
323 self._close_file = True
325 self._close_file = False
327 filename = os.path.basename(source.name)
328 self.start_new_file(filename)
329 self._queued_file = source
331 def _queue_dirents(self, stream_name, dirents):
332 assert (not self._queued_dirents), "tried to queue more than one tree"
333 self._queued_dirents = deque(sorted(dirents))
335 def _queue_tree(self, path, stream_name, max_manifest_depth):
336 self._queued_trees.append((path, stream_name, max_manifest_depth))
338 def write_file(self, source, filename=None):
339 self._queue_file(source, filename)
340 self.do_queued_work()
342 def write_directory_tree(self,
343 path, stream_name='.', max_manifest_depth=-1):
344 self._queue_tree(path, stream_name, max_manifest_depth)
345 self.do_queued_work()
347 def write(self, newdata):
348 if hasattr(newdata, '__iter__'):
352 self._data_buffer.append(newdata)
353 self._data_buffer_len += len(newdata)
354 self._current_stream_length += len(newdata)
355 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
358 def flush_data(self):
359 data_buffer = ''.join(self._data_buffer)
361 self._current_stream_locators.append(
362 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
363 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
364 self._data_buffer_len = len(self._data_buffer[0])
366 def start_new_file(self, newfilename=None):
367 self.finish_current_file()
368 self.set_current_file_name(newfilename)
370 def set_current_file_name(self, newfilename):
371 if re.search(r'[\t\n]', newfilename):
372 raise errors.AssertionError(
373 "Manifest filenames cannot contain whitespace: %s" %
375 self._current_file_name = newfilename
377 def current_file_name(self):
378 return self._current_file_name
380 def finish_current_file(self):
381 if self._current_file_name == None:
382 if self._current_file_pos == self._current_stream_length:
384 raise errors.AssertionError(
385 "Cannot finish an unnamed file " +
386 "(%d bytes at offset %d in '%s' stream)" %
387 (self._current_stream_length - self._current_file_pos,
388 self._current_file_pos,
389 self._current_stream_name))
390 self._current_stream_files.append([
391 self._current_file_pos,
392 self._current_stream_length - self._current_file_pos,
393 self._current_file_name])
394 self._current_file_pos = self._current_stream_length
395 self._current_file_name = None
397 def start_new_stream(self, newstreamname='.'):
398 self.finish_current_stream()
399 self.set_current_stream_name(newstreamname)
401 def set_current_stream_name(self, newstreamname):
402 if re.search(r'[\t\n]', newstreamname):
403 raise errors.AssertionError(
404 "Manifest stream names cannot contain whitespace")
405 self._current_stream_name = '.' if newstreamname=='' else newstreamname
407 def current_stream_name(self):
408 return self._current_stream_name
410 def finish_current_stream(self):
411 self.finish_current_file()
413 if not self._current_stream_files:
415 elif self._current_stream_name is None:
416 raise errors.AssertionError(
417 "Cannot finish an unnamed stream (%d bytes in %d files)" %
418 (self._current_stream_length, len(self._current_stream_files)))
420 if not self._current_stream_locators:
421 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
422 self._finished_streams.append([self._current_stream_name,
423 self._current_stream_locators,
424 self._current_stream_files])
425 self._current_stream_files = []
426 self._current_stream_length = 0
427 self._current_stream_locators = []
428 self._current_stream_name = None
429 self._current_file_pos = 0
430 self._current_file_name = None
433 # Store the manifest in Keep and return its locator.
434 return self._my_keep().put(self.manifest_text())
436 def stripped_manifest(self):
438 Return the manifest for the current collection with all permission
439 hints removed from the locators in the manifest.
441 raw = self.manifest_text()
443 for line in raw.split("\n"):
444 fields = line.split()
446 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
447 for x in fields[1:-1] ]
448 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
451 def manifest_text(self):
452 self.finish_current_stream()
455 for stream in self._finished_streams:
456 if not re.search(r'^\.(/.*)?$', stream[0]):
458 manifest += stream[0].replace(' ', '\\040')
459 manifest += ' ' + ' '.join(stream[1])
460 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
464 return CollectionReader(manifest, self._api_client).manifest_text()
468 def data_locators(self):
470 for name, locators, files in self._finished_streams:
475 class ResumableCollectionWriter(CollectionWriter):
476 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
477 '_current_stream_locators', '_current_stream_name',
478 '_current_file_name', '_current_file_pos', '_close_file',
479 '_data_buffer', '_dependencies', '_finished_streams',
480 '_queued_dirents', '_queued_trees']
482 def __init__(self, api_client=None, num_retries=0):
483 self._dependencies = {}
484 super(ResumableCollectionWriter, self).__init__(
485 api_client, num_retries=num_retries)
488 def from_state(cls, state, *init_args, **init_kwargs):
489 # Try to build a new writer from scratch with the given state.
490 # If the state is not suitable to resume (because files have changed,
491 # been deleted, aren't predictable, etc.), raise a
492 # StaleWriterStateError. Otherwise, return the initialized writer.
493 # The caller is responsible for calling writer.do_queued_work()
494 # appropriately after it's returned.
495 writer = cls(*init_args, **init_kwargs)
496 for attr_name in cls.STATE_PROPS:
497 attr_value = state[attr_name]
498 attr_class = getattr(writer, attr_name).__class__
499 # Coerce the value into the same type as the initial value, if
501 if attr_class not in (type(None), attr_value.__class__):
502 attr_value = attr_class(attr_value)
503 setattr(writer, attr_name, attr_value)
504 # Check dependencies before we try to resume anything.
505 if any(KeepLocator(ls).permission_expired()
506 for ls in writer._current_stream_locators):
507 raise errors.StaleWriterStateError(
508 "locators include expired permission hint")
509 writer.check_dependencies()
510 if state['_current_file'] is not None:
511 path, pos = state['_current_file']
513 writer._queued_file = open(path, 'rb')
514 writer._queued_file.seek(pos)
515 except IOError as error:
516 raise errors.StaleWriterStateError(
517 "failed to reopen active file {}: {}".format(path, error))
520 def check_dependencies(self):
521 for path, orig_stat in self._dependencies.items():
522 if not S_ISREG(orig_stat[ST_MODE]):
523 raise errors.StaleWriterStateError("{} not file".format(path))
525 now_stat = tuple(os.stat(path))
526 except OSError as error:
527 raise errors.StaleWriterStateError(
528 "failed to stat {}: {}".format(path, error))
529 if ((not S_ISREG(now_stat[ST_MODE])) or
530 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
531 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
532 raise errors.StaleWriterStateError("{} changed".format(path))
534 def dump_state(self, copy_func=lambda x: x):
535 state = {attr: copy_func(getattr(self, attr))
536 for attr in self.STATE_PROPS}
537 if self._queued_file is None:
538 state['_current_file'] = None
540 state['_current_file'] = (os.path.realpath(self._queued_file.name),
541 self._queued_file.tell())
544 def _queue_file(self, source, filename=None):
546 src_path = os.path.realpath(source)
548 raise errors.AssertionError("{} not a file path".format(source))
550 path_stat = os.stat(src_path)
551 except OSError as stat_error:
553 super(ResumableCollectionWriter, self)._queue_file(source, filename)
554 fd_stat = os.fstat(self._queued_file.fileno())
555 if not S_ISREG(fd_stat.st_mode):
556 # We won't be able to resume from this cache anyway, so don't
557 # worry about further checks.
558 self._dependencies[source] = tuple(fd_stat)
559 elif path_stat is None:
560 raise errors.AssertionError(
561 "could not stat {}: {}".format(source, stat_error))
562 elif path_stat.st_ino != fd_stat.st_ino:
563 raise errors.AssertionError(
564 "{} changed between open and stat calls".format(source))
566 self._dependencies[src_path] = tuple(fd_stat)
568 def write(self, data):
569 if self._queued_file is None:
570 raise errors.AssertionError(
571 "resumable writer can't accept unsourced data")
572 return super(ResumableCollectionWriter, self).write(data)