6 from collections import deque
9 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
11 from .stream import StreamReader, normalize_stream, locator_block_size
12 from .ranges import Range, LocatorAndRange
17 _logger = logging.getLogger('arvados.collection')
19 class CollectionBase(object):
23 def __exit__(self, exc_type, exc_value, traceback):
27 if self._keep_client is None:
28 self._keep_client = KeepClient(api_client=self._api_client,
29 num_retries=self.num_retries)
30 return self._keep_client
32 def stripped_manifest(self):
34 Return the manifest for the current collection with all
35 non-portable hints (i.e., permission signatures and other
36 hints other than size hints) removed from the locators.
38 raw = self.manifest_text()
40 for line in raw.split("\n"):
43 clean_fields = fields[:1] + [
44 (re.sub(r'\+[^\d][^\+]*', '', x)
45 if re.match(util.keep_locator_pattern, x)
48 clean += [' '.join(clean_fields), "\n"]
52 class CollectionReader(CollectionBase):
53 def __init__(self, manifest_locator_or_text, api_client=None,
54 keep_client=None, num_retries=0):
55 """Instantiate a CollectionReader.
57 This class parses Collection manifests to provide a simple interface
58 to read its underlying files.
61 * manifest_locator_or_text: One of a Collection UUID, portable data
62 hash, or full manifest text.
63 * api_client: The API client to use to look up Collections. If not
64 provided, CollectionReader will build one from available Arvados
66 * keep_client: The KeepClient to use to download Collection data.
67 If not provided, CollectionReader will build one from available
68 Arvados configuration.
69 * num_retries: The default number of times to retry failed
70 service requests. Default 0. You may change this value
71 after instantiation, but note those changes may not
72 propagate to related objects like the Keep client.
74 self._api_client = api_client
75 self._keep_client = keep_client
76 self.num_retries = num_retries
77 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
78 self._manifest_locator = manifest_locator_or_text
79 self._manifest_text = None
80 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
81 self._manifest_locator = manifest_locator_or_text
82 self._manifest_text = None
83 elif re.match(util.manifest_pattern, manifest_locator_or_text):
84 self._manifest_text = manifest_locator_or_text
85 self._manifest_locator = None
87 raise errors.ArgumentError(
88 "Argument to CollectionReader must be a manifest or a collection UUID")
89 self._api_response = None
92 def _populate_from_api_server(self):
93 # As in KeepClient itself, we must wait until the last
94 # possible moment to instantiate an API client, in order to
95 # avoid tripping up clients that don't have access to an API
96 # server. If we do build one, make sure our Keep client uses
97 # it. If instantiation fails, we'll fall back to the except
98 # clause, just like any other Collection lookup
99 # failure. Return an exception, or None if successful.
101 if self._api_client is None:
102 self._api_client = arvados.api('v1')
103 self._keep_client = None # Make a new one with the new api.
104 self._api_response = self._api_client.collections().get(
105 uuid=self._manifest_locator).execute(
106 num_retries=self.num_retries)
107 self._manifest_text = self._api_response['manifest_text']
109 except Exception as e:
112 def _populate_from_keep(self):
113 # Retrieve a manifest directly from Keep. This has a chance of
114 # working if [a] the locator includes a permission signature
115 # or [b] the Keep services are operating in world-readable
116 # mode. Return an exception, or None if successful.
118 self._manifest_text = self._my_keep().get(
119 self._manifest_locator, num_retries=self.num_retries)
120 except Exception as e:
125 error_via_keep = None
126 should_try_keep = ((self._manifest_text is None) and
127 util.keep_locator_pattern.match(
128 self._manifest_locator))
129 if ((self._manifest_text is None) and
130 util.signed_locator_pattern.match(self._manifest_locator)):
131 error_via_keep = self._populate_from_keep()
132 if self._manifest_text is None:
133 error_via_api = self._populate_from_api_server()
134 if error_via_api is not None and not should_try_keep:
136 if ((self._manifest_text is None) and
137 not error_via_keep and
139 # Looks like a keep locator, and we didn't already try keep above
140 error_via_keep = self._populate_from_keep()
141 if self._manifest_text is None:
143 raise arvados.errors.NotFoundError(
144 ("Failed to retrieve collection '{}' " +
145 "from either API server ({}) or Keep ({})."
147 self._manifest_locator,
150 self._streams = [sline.split()
151 for sline in self._manifest_text.split("\n")
154 def _populate_first(orig_func):
155 # Decorator for methods that read actual Collection data.
156 @functools.wraps(orig_func)
157 def wrapper(self, *args, **kwargs):
158 if self._streams is None:
160 return orig_func(self, *args, **kwargs)
164 def api_response(self):
165 """api_response() -> dict or None
167 Returns information about this Collection fetched from the API server.
168 If the Collection exists in Keep but not the API server, currently
169 returns None. Future versions may provide a synthetic response.
171 return self._api_response
177 for s in self.all_streams():
178 for f in s.all_files():
179 streamname, filename = split(s.name() + "/" + f.name())
180 if streamname not in streams:
181 streams[streamname] = {}
182 if filename not in streams[streamname]:
183 streams[streamname][filename] = []
185 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
187 self._streams = [normalize_stream(s, streams[s])
188 for s in sorted(streams)]
190 # Regenerate the manifest text based on the normalized streams
191 self._manifest_text = ''.join(
192 [StreamReader(stream, keep=self._my_keep()).manifest_text()
193 for stream in self._streams])
196 def open(self, streampath, filename=None):
197 """open(streampath[, filename]) -> file-like object
199 Pass in the path of a file to read from the Collection, either as a
200 single string or as two separate stream name and file name arguments.
201 This method returns a file-like object to read that file.
204 streampath, filename = split(streampath)
205 keep_client = self._my_keep()
206 for stream_s in self._streams:
207 stream = StreamReader(stream_s, keep_client,
208 num_retries=self.num_retries)
209 if stream.name() == streampath:
212 raise ValueError("stream '{}' not found in Collection".
215 return stream.files()[filename]
217 raise ValueError("file '{}' not found in Collection stream '{}'".
218 format(filename, streampath))
221 def all_streams(self):
222 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
223 for s in self._streams]
226 for s in self.all_streams():
227 for f in s.all_files():
231 def manifest_text(self, strip=False, normalize=False):
233 cr = CollectionReader(self.manifest_text())
235 return cr.manifest_text(strip=strip, normalize=False)
237 return self.stripped_manifest()
239 return self._manifest_text
242 class _WriterFile(ArvadosFileBase):
243 def __init__(self, coll_writer, name):
244 super(_WriterFile, self).__init__(name, 'wb')
245 self.dest = coll_writer
248 super(_WriterFile, self).close()
249 self.dest.finish_current_file()
251 @ArvadosFileBase._before_close
252 def write(self, data):
253 self.dest.write(data)
255 @ArvadosFileBase._before_close
256 def writelines(self, seq):
260 @ArvadosFileBase._before_close
262 self.dest.flush_data()
265 class CollectionWriter(CollectionBase):
266 def __init__(self, api_client=None, num_retries=0):
267 """Instantiate a CollectionWriter.
269 CollectionWriter lets you build a new Arvados Collection from scratch.
270 Write files to it. The CollectionWriter will upload data to Keep as
271 appropriate, and provide you with the Collection manifest text when
275 * api_client: The API client to use to look up Collections. If not
276 provided, CollectionReader will build one from available Arvados
278 * num_retries: The default number of times to retry failed
279 service requests. Default 0. You may change this value
280 after instantiation, but note those changes may not
281 propagate to related objects like the Keep client.
283 self._api_client = api_client
284 self.num_retries = num_retries
285 self._keep_client = None
286 self._data_buffer = []
287 self._data_buffer_len = 0
288 self._current_stream_files = []
289 self._current_stream_length = 0
290 self._current_stream_locators = []
291 self._current_stream_name = '.'
292 self._current_file_name = None
293 self._current_file_pos = 0
294 self._finished_streams = []
295 self._close_file = None
296 self._queued_file = None
297 self._queued_dirents = deque()
298 self._queued_trees = deque()
299 self._last_open = None
301 def __exit__(self, exc_type, exc_value, traceback):
305 def do_queued_work(self):
306 # The work queue consists of three pieces:
307 # * _queued_file: The file object we're currently writing to the
309 # * _queued_dirents: Entries under the current directory
310 # (_queued_trees[0]) that we want to write or recurse through.
311 # This may contain files from subdirectories if
312 # max_manifest_depth == 0 for this directory.
313 # * _queued_trees: Directories that should be written as separate
314 # streams to the Collection.
315 # This function handles the smallest piece of work currently queued
316 # (current file, then current directory, then next directory) until
317 # no work remains. The _work_THING methods each do a unit of work on
318 # THING. _queue_THING methods add a THING to the work queue.
320 if self._queued_file:
322 elif self._queued_dirents:
324 elif self._queued_trees:
329 def _work_file(self):
331 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
335 self.finish_current_file()
337 self._queued_file.close()
338 self._close_file = None
339 self._queued_file = None
341 def _work_dirents(self):
342 path, stream_name, max_manifest_depth = self._queued_trees[0]
343 if stream_name != self.current_stream_name():
344 self.start_new_stream(stream_name)
345 while self._queued_dirents:
346 dirent = self._queued_dirents.popleft()
347 target = os.path.join(path, dirent)
348 if os.path.isdir(target):
349 self._queue_tree(target,
350 os.path.join(stream_name, dirent),
351 max_manifest_depth - 1)
353 self._queue_file(target, dirent)
355 if not self._queued_dirents:
356 self._queued_trees.popleft()
358 def _work_trees(self):
359 path, stream_name, max_manifest_depth = self._queued_trees[0]
360 d = util.listdir_recursive(
361 path, max_depth = (None if max_manifest_depth == 0 else 0))
363 self._queue_dirents(stream_name, d)
365 self._queued_trees.popleft()
367 def _queue_file(self, source, filename=None):
368 assert (self._queued_file is None), "tried to queue more than one file"
369 if not hasattr(source, 'read'):
370 source = open(source, 'rb')
371 self._close_file = True
373 self._close_file = False
375 filename = os.path.basename(source.name)
376 self.start_new_file(filename)
377 self._queued_file = source
379 def _queue_dirents(self, stream_name, dirents):
380 assert (not self._queued_dirents), "tried to queue more than one tree"
381 self._queued_dirents = deque(sorted(dirents))
383 def _queue_tree(self, path, stream_name, max_manifest_depth):
384 self._queued_trees.append((path, stream_name, max_manifest_depth))
386 def write_file(self, source, filename=None):
387 self._queue_file(source, filename)
388 self.do_queued_work()
390 def write_directory_tree(self,
391 path, stream_name='.', max_manifest_depth=-1):
392 self._queue_tree(path, stream_name, max_manifest_depth)
393 self.do_queued_work()
395 def write(self, newdata):
396 if hasattr(newdata, '__iter__'):
400 self._data_buffer.append(newdata)
401 self._data_buffer_len += len(newdata)
402 self._current_stream_length += len(newdata)
403 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
406 def open(self, streampath, filename=None):
407 """open(streampath[, filename]) -> file-like object
409 Pass in the path of a file to write to the Collection, either as a
410 single string or as two separate stream name and file name arguments.
411 This method returns a file-like object you can write to add it to the
414 You may only have one file object from the Collection open at a time,
415 so be sure to close the object when you're done. Using the object in
416 a with statement makes that easy::
418 with cwriter.open('./doc/page1.txt') as outfile:
419 outfile.write(page1_data)
420 with cwriter.open('./doc/page2.txt') as outfile:
421 outfile.write(page2_data)
424 streampath, filename = split(streampath)
425 if self._last_open and not self._last_open.closed:
426 raise errors.AssertionError(
427 "can't open '{}' when '{}' is still open".format(
428 filename, self._last_open.name))
429 if streampath != self.current_stream_name():
430 self.start_new_stream(streampath)
431 self.set_current_file_name(filename)
432 self._last_open = _WriterFile(self, filename)
433 return self._last_open
435 def flush_data(self):
436 data_buffer = ''.join(self._data_buffer)
438 self._current_stream_locators.append(
439 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
440 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
441 self._data_buffer_len = len(self._data_buffer[0])
443 def start_new_file(self, newfilename=None):
444 self.finish_current_file()
445 self.set_current_file_name(newfilename)
447 def set_current_file_name(self, newfilename):
448 if re.search(r'[\t\n]', newfilename):
449 raise errors.AssertionError(
450 "Manifest filenames cannot contain whitespace: %s" %
452 elif re.search(r'\x00', newfilename):
453 raise errors.AssertionError(
454 "Manifest filenames cannot contain NUL characters: %s" %
456 self._current_file_name = newfilename
458 def current_file_name(self):
459 return self._current_file_name
461 def finish_current_file(self):
462 if self._current_file_name is None:
463 if self._current_file_pos == self._current_stream_length:
465 raise errors.AssertionError(
466 "Cannot finish an unnamed file " +
467 "(%d bytes at offset %d in '%s' stream)" %
468 (self._current_stream_length - self._current_file_pos,
469 self._current_file_pos,
470 self._current_stream_name))
471 self._current_stream_files.append([
472 self._current_file_pos,
473 self._current_stream_length - self._current_file_pos,
474 self._current_file_name])
475 self._current_file_pos = self._current_stream_length
476 self._current_file_name = None
478 def start_new_stream(self, newstreamname='.'):
479 self.finish_current_stream()
480 self.set_current_stream_name(newstreamname)
482 def set_current_stream_name(self, newstreamname):
483 if re.search(r'[\t\n]', newstreamname):
484 raise errors.AssertionError(
485 "Manifest stream names cannot contain whitespace")
486 self._current_stream_name = '.' if newstreamname=='' else newstreamname
488 def current_stream_name(self):
489 return self._current_stream_name
491 def finish_current_stream(self):
492 self.finish_current_file()
494 if not self._current_stream_files:
496 elif self._current_stream_name is None:
497 raise errors.AssertionError(
498 "Cannot finish an unnamed stream (%d bytes in %d files)" %
499 (self._current_stream_length, len(self._current_stream_files)))
501 if not self._current_stream_locators:
502 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
503 self._finished_streams.append([self._current_stream_name,
504 self._current_stream_locators,
505 self._current_stream_files])
506 self._current_stream_files = []
507 self._current_stream_length = 0
508 self._current_stream_locators = []
509 self._current_stream_name = None
510 self._current_file_pos = 0
511 self._current_file_name = None
514 # Store the manifest in Keep and return its locator.
515 return self._my_keep().put(self.manifest_text())
517 def portable_data_hash(self):
518 stripped = self.stripped_manifest()
519 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
521 def manifest_text(self):
522 self.finish_current_stream()
525 for stream in self._finished_streams:
526 if not re.search(r'^\.(/.*)?$', stream[0]):
528 manifest += stream[0].replace(' ', '\\040')
529 manifest += ' ' + ' '.join(stream[1])
530 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
535 def data_locators(self):
537 for name, locators, files in self._finished_streams:
542 class ResumableCollectionWriter(CollectionWriter):
543 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
544 '_current_stream_locators', '_current_stream_name',
545 '_current_file_name', '_current_file_pos', '_close_file',
546 '_data_buffer', '_dependencies', '_finished_streams',
547 '_queued_dirents', '_queued_trees']
549 def __init__(self, api_client=None, num_retries=0):
550 self._dependencies = {}
551 super(ResumableCollectionWriter, self).__init__(
552 api_client, num_retries=num_retries)
555 def from_state(cls, state, *init_args, **init_kwargs):
556 # Try to build a new writer from scratch with the given state.
557 # If the state is not suitable to resume (because files have changed,
558 # been deleted, aren't predictable, etc.), raise a
559 # StaleWriterStateError. Otherwise, return the initialized writer.
560 # The caller is responsible for calling writer.do_queued_work()
561 # appropriately after it's returned.
562 writer = cls(*init_args, **init_kwargs)
563 for attr_name in cls.STATE_PROPS:
564 attr_value = state[attr_name]
565 attr_class = getattr(writer, attr_name).__class__
566 # Coerce the value into the same type as the initial value, if
568 if attr_class not in (type(None), attr_value.__class__):
569 attr_value = attr_class(attr_value)
570 setattr(writer, attr_name, attr_value)
571 # Check dependencies before we try to resume anything.
572 if any(KeepLocator(ls).permission_expired()
573 for ls in writer._current_stream_locators):
574 raise errors.StaleWriterStateError(
575 "locators include expired permission hint")
576 writer.check_dependencies()
577 if state['_current_file'] is not None:
578 path, pos = state['_current_file']
580 writer._queued_file = open(path, 'rb')
581 writer._queued_file.seek(pos)
582 except IOError as error:
583 raise errors.StaleWriterStateError(
584 "failed to reopen active file {}: {}".format(path, error))
587 def check_dependencies(self):
588 for path, orig_stat in self._dependencies.items():
589 if not S_ISREG(orig_stat[ST_MODE]):
590 raise errors.StaleWriterStateError("{} not file".format(path))
592 now_stat = tuple(os.stat(path))
593 except OSError as error:
594 raise errors.StaleWriterStateError(
595 "failed to stat {}: {}".format(path, error))
596 if ((not S_ISREG(now_stat[ST_MODE])) or
597 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
598 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
599 raise errors.StaleWriterStateError("{} changed".format(path))
601 def dump_state(self, copy_func=lambda x: x):
602 state = {attr: copy_func(getattr(self, attr))
603 for attr in self.STATE_PROPS}
604 if self._queued_file is None:
605 state['_current_file'] = None
607 state['_current_file'] = (os.path.realpath(self._queued_file.name),
608 self._queued_file.tell())
611 def _queue_file(self, source, filename=None):
613 src_path = os.path.realpath(source)
615 raise errors.AssertionError("{} not a file path".format(source))
617 path_stat = os.stat(src_path)
618 except OSError as stat_error:
620 super(ResumableCollectionWriter, self)._queue_file(source, filename)
621 fd_stat = os.fstat(self._queued_file.fileno())
622 if not S_ISREG(fd_stat.st_mode):
623 # We won't be able to resume from this cache anyway, so don't
624 # worry about further checks.
625 self._dependencies[source] = tuple(fd_stat)
626 elif path_stat is None:
627 raise errors.AssertionError(
628 "could not stat {}: {}".format(source, stat_error))
629 elif path_stat.st_ino != fd_stat.st_ino:
630 raise errors.AssertionError(
631 "{} changed between open and stat calls".format(source))
633 self._dependencies[src_path] = tuple(fd_stat)
635 def write(self, data):
636 if self._queued_file is None:
637 raise errors.AssertionError(
638 "resumable writer can't accept unsourced data")
639 return super(ResumableCollectionWriter, self).write(data)
642 class Collection(CollectionBase):
643 def __init__(self, manifest_locator_or_text=None, api_client=None,
644 keep_client=None, num_retries=0, block_manager=None):
647 self._api_client = api_client
648 self._keep_client = keep_client
649 self.num_retries = num_retries
650 self._manifest_locator = None
651 self._manifest_text = None
652 self._api_response = None
654 if block_manager is None:
655 self.block_manager = BlockManager(keep_client)
657 if manifest_locator_or_text:
658 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
659 self._manifest_locator = manifest_locator_or_text
660 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
661 self._manifest_locator = manifest_locator_or_text
662 elif re.match(util.manifest_pattern, manifest_locator_or_text):
663 self._manifest_text = manifest_locator_or_text
665 raise errors.ArgumentError(
666 "Argument to CollectionReader must be a manifest or a collection UUID")
668 def _populate_from_api_server(self):
669 # As in KeepClient itself, we must wait until the last
670 # possible moment to instantiate an API client, in order to
671 # avoid tripping up clients that don't have access to an API
672 # server. If we do build one, make sure our Keep client uses
673 # it. If instantiation fails, we'll fall back to the except
674 # clause, just like any other Collection lookup
675 # failure. Return an exception, or None if successful.
677 if self._api_client is None:
678 self._api_client = arvados.api('v1')
679 self._keep_client = None # Make a new one with the new api.
680 self._api_response = self._api_client.collections().get(
681 uuid=self._manifest_locator).execute(
682 num_retries=self.num_retries)
683 self._manifest_text = self._api_response['manifest_text']
685 except Exception as e:
688 def _populate_from_keep(self):
689 # Retrieve a manifest directly from Keep. This has a chance of
690 # working if [a] the locator includes a permission signature
691 # or [b] the Keep services are operating in world-readable
692 # mode. Return an exception, or None if successful.
694 self._manifest_text = self._my_keep().get(
695 self._manifest_locator, num_retries=self.num_retries)
696 except Exception as e:
701 if self._manifest_locator is None and self._manifest_text is None:
704 error_via_keep = None
705 should_try_keep = ((self._manifest_text is None) and
706 util.keep_locator_pattern.match(
707 self._manifest_locator))
708 if ((self._manifest_text is None) and
709 util.signed_locator_pattern.match(self._manifest_locator)):
710 error_via_keep = self._populate_from_keep()
711 if self._manifest_text is None:
712 error_via_api = self._populate_from_api_server()
713 if error_via_api is not None and not should_try_keep:
715 if ((self._manifest_text is None) and
716 not error_via_keep and
718 # Looks like a keep locator, and we didn't already try keep above
719 error_via_keep = self._populate_from_keep()
720 if self._manifest_text is None:
722 raise arvados.errors.NotFoundError(
723 ("Failed to retrieve collection '{}' " +
724 "from either API server ({}) or Keep ({})."
726 self._manifest_locator,
730 import_manifest(self._manifest_text, self)
732 def _populate_first(orig_func):
733 # Decorator for methods that read actual Collection data.
734 @functools.wraps(orig_func)
735 def wrapper(self, *args, **kwargs):
736 if self._items is None:
738 return orig_func(self, *args, **kwargs)
744 def __exit__(self, exc_type, exc_value, traceback):
748 def find(self, path, create=False):
754 item = self._items.get(p[0])
756 # item must be a file
757 if item is None and create:
759 item = ArvadosFile(self.block_manager, keep=self._keep_client)
760 self._items[p[0]] = item
763 if item is None and create:
764 # create new collection
765 item = Collection(api_client=self._api_client, keep=self._keep_client, num_retries=self.num_retries, block_manager=self.block_manager)
766 self._items[p[0]] = item
768 return item.find("/".join(p), create=create)
773 def api_response(self):
774 """api_response() -> dict or None
776 Returns information about this Collection fetched from the API server.
777 If the Collection exists in Keep but not the API server, currently
778 returns None. Future versions may provide a synthetic response.
780 return self._api_response
782 def open(self, path, mode):
783 mode = mode.replace("b", "")
784 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
785 raise ArgumentError("Bad mode '%s'" % mode)
786 create = (mode != "r")
788 f = self.find(path, create=create)
790 raise ArgumentError("File not found")
791 if not isinstance(f, ArvadosFile):
792 raise ArgumentError("Path must refer to a file.")
798 return ArvadosFileReader(f, path, mode)
800 return ArvadosFileWriter(f, path, mode)
804 for k,v in self._items.items():
810 def set_unmodified(self):
811 for k,v in self._items.items():
816 self._items.iterkeys()
820 self._items.iterkeys()
823 def __getitem__(self, k):
831 def __contains__(self, k):
832 return self.find(k) is not None
836 return len(self._items)
839 def __delitem__(self, p):
845 item = self._items.get(p[0])
847 raise NotFoundError()
849 del self._items[p[0]]
852 del item["/".join(p)]
854 raise NotFoundError()
858 return self._items.keys()
862 return self._items.values()
866 return self._items.items()
869 def manifest_text(self, strip=False, normalize=False):
870 if self.modified() or self._manifest_text is None or normalize:
871 return export_manifest(self, stream_name=".", portable_locators=strip)
874 return self.stripped_manifest()
876 return self._manifest_text
878 def portable_data_hash(self):
879 stripped = self.manifest_text(strip=True)
880 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
883 def commit_bufferblocks(self):
889 self._my_keep().put(self.manifest_text(strip=True))
890 if re.match(util.collection_uuid_pattern, self._manifest_locator):
891 self._api_response = self._api_client.collections().update(
892 uuid=self._manifest_locator,
893 body={'manifest_text': self.manifest_text(strip=False)}
895 num_retries=self.num_retries)
897 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
898 self.set_unmodified()
901 def save_as(self, name, owner_uuid=None):
902 self._my_keep().put(self.manifest_text(strip=True))
903 body = {"manifest_text": self.manifest_text(strip=False),
906 body["owner_uuid"] = owner_uuid
907 self._api_response = self._api_client.collections().create(body=body).execute(num_retries=self.num_retries)
908 self._manifest_locator = self._api_response["uuid"]
909 self.set_unmodified()
912 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
913 if into_collection is not None:
914 if len(into_collection) > 0:
915 raise ArgumentError("Can only import manifest into an empty collection")
918 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
927 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
931 if state == STREAM_NAME:
932 # starting a new stream
933 stream_name = tok.replace('\\040', ' ')
941 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
943 blocksize = long(s.group(1))
944 blocks.append(Range(tok, streamoffset, blocksize))
945 streamoffset += blocksize
949 if state == SEGMENTS:
950 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
952 pos = long(s.group(1))
953 size = long(s.group(2))
954 name = s.group(3).replace('\\040', ' ')
955 f = c.find("%s/%s" % (stream_name, name), create=True)
956 f.add_segment(blocks, pos, size)
959 raise errors.SyntaxError("Invalid manifest format")
968 def export_manifest(item, stream_name=".", portable_locators=False):
970 if isinstance(item, Collection):
972 sorted_keys = sorted(item.keys())
973 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
976 for s in v._segments:
978 if loc.startswith("bufferblock"):
979 loc = v.bbm._bufferblocks[loc].locator()
980 st.append(LocatorAndRange(loc, locator_block_size(loc),
981 s.segment_offset, s.range_size))
983 buf += ' '.join(normalize_stream(stream_name, stream))
985 for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
986 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
987 elif isinstance(item, ArvadosFile):
989 for s in item._segments:
991 if loc.startswith("bufferblock"):
992 loc = item._bufferblocks[loc].calculate_locator()
993 st.append(LocatorAndRange(loc, locator_block_size(loc),
994 s.segment_offset, s.range_size))
995 stream[stream_name] = st
996 buf += ' '.join(normalize_stream(stream_name, stream))