7 from collections import deque
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
18 _logger = logging.getLogger('arvados.collection')
20 class CollectionBase(object):
24 def __exit__(self, exc_type, exc_value, traceback):
28 if self._keep_client is None:
29 self._keep_client = KeepClient(api_client=self._api_client,
30 num_retries=self.num_retries)
31 return self._keep_client
33 def stripped_manifest(self):
35 Return the manifest for the current collection with all
36 non-portable hints (i.e., permission signatures and other
37 hints other than size hints) removed from the locators.
39 raw = self.manifest_text()
41 for line in raw.split("\n"):
44 clean_fields = fields[:1] + [
45 (re.sub(r'\+[^\d][^\+]*', '', x)
46 if re.match(util.keep_locator_pattern, x)
49 clean += [' '.join(clean_fields), "\n"]
53 class CollectionReader(CollectionBase):
54 def __init__(self, manifest_locator_or_text, api_client=None,
55 keep_client=None, num_retries=0):
56 """Instantiate a CollectionReader.
58 This class parses Collection manifests to provide a simple interface
59 to read its underlying files.
62 * manifest_locator_or_text: One of a Collection UUID, portable data
63 hash, or full manifest text.
64 * api_client: The API client to use to look up Collections. If not
65 provided, CollectionReader will build one from available Arvados
67 * keep_client: The KeepClient to use to download Collection data.
68 If not provided, CollectionReader will build one from available
69 Arvados configuration.
70 * num_retries: The default number of times to retry failed
71 service requests. Default 0. You may change this value
72 after instantiation, but note those changes may not
73 propagate to related objects like the Keep client.
75 self._api_client = api_client
76 self._keep_client = keep_client
77 self.num_retries = num_retries
78 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79 self._manifest_locator = manifest_locator_or_text
80 self._manifest_text = None
81 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82 self._manifest_locator = manifest_locator_or_text
83 self._manifest_text = None
84 elif re.match(util.manifest_pattern, manifest_locator_or_text):
85 self._manifest_text = manifest_locator_or_text
86 self._manifest_locator = None
88 raise errors.ArgumentError(
89 "Argument to CollectionReader must be a manifest or a collection UUID")
90 self._api_response = None
93 def _populate_from_api_server(self):
94 # As in KeepClient itself, we must wait until the last
95 # possible moment to instantiate an API client, in order to
96 # avoid tripping up clients that don't have access to an API
97 # server. If we do build one, make sure our Keep client uses
98 # it. If instantiation fails, we'll fall back to the except
99 # clause, just like any other Collection lookup
100 # failure. Return an exception, or None if successful.
102 if self._api_client is None:
103 self._api_client = arvados.api('v1')
104 self._keep_client = None # Make a new one with the new api.
105 self._api_response = self._api_client.collections().get(
106 uuid=self._manifest_locator).execute(
107 num_retries=self.num_retries)
108 self._manifest_text = self._api_response['manifest_text']
110 except Exception as e:
113 def _populate_from_keep(self):
114 # Retrieve a manifest directly from Keep. This has a chance of
115 # working if [a] the locator includes a permission signature
116 # or [b] the Keep services are operating in world-readable
117 # mode. Return an exception, or None if successful.
119 self._manifest_text = self._my_keep().get(
120 self._manifest_locator, num_retries=self.num_retries)
121 except Exception as e:
126 error_via_keep = None
127 should_try_keep = ((self._manifest_text is None) and
128 util.keep_locator_pattern.match(
129 self._manifest_locator))
130 if ((self._manifest_text is None) and
131 util.signed_locator_pattern.match(self._manifest_locator)):
132 error_via_keep = self._populate_from_keep()
133 if self._manifest_text is None:
134 error_via_api = self._populate_from_api_server()
135 if error_via_api is not None and not should_try_keep:
137 if ((self._manifest_text is None) and
138 not error_via_keep and
140 # Looks like a keep locator, and we didn't already try keep above
141 error_via_keep = self._populate_from_keep()
142 if self._manifest_text is None:
144 raise arvados.errors.NotFoundError(
145 ("Failed to retrieve collection '{}' " +
146 "from either API server ({}) or Keep ({})."
148 self._manifest_locator,
151 self._streams = [sline.split()
152 for sline in self._manifest_text.split("\n")
155 def _populate_first(orig_func):
156 # Decorator for methods that read actual Collection data.
157 @functools.wraps(orig_func)
158 def wrapper(self, *args, **kwargs):
159 if self._streams is None:
161 return orig_func(self, *args, **kwargs)
165 def api_response(self):
166 """api_response() -> dict or None
168 Returns information about this Collection fetched from the API server.
169 If the Collection exists in Keep but not the API server, currently
170 returns None. Future versions may provide a synthetic response.
172 return self._api_response
178 for s in self.all_streams():
179 for f in s.all_files():
180 streamname, filename = split(s.name() + "/" + f.name())
181 if streamname not in streams:
182 streams[streamname] = {}
183 if filename not in streams[streamname]:
184 streams[streamname][filename] = []
186 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
188 self._streams = [normalize_stream(s, streams[s])
189 for s in sorted(streams)]
191 # Regenerate the manifest text based on the normalized streams
192 self._manifest_text = ''.join(
193 [StreamReader(stream, keep=self._my_keep()).manifest_text()
194 for stream in self._streams])
197 def open(self, streampath, filename=None):
198 """open(streampath[, filename]) -> file-like object
200 Pass in the path of a file to read from the Collection, either as a
201 single string or as two separate stream name and file name arguments.
202 This method returns a file-like object to read that file.
205 streampath, filename = split(streampath)
206 keep_client = self._my_keep()
207 for stream_s in self._streams:
208 stream = StreamReader(stream_s, keep_client,
209 num_retries=self.num_retries)
210 if stream.name() == streampath:
213 raise ValueError("stream '{}' not found in Collection".
216 return stream.files()[filename]
218 raise ValueError("file '{}' not found in Collection stream '{}'".
219 format(filename, streampath))
222 def all_streams(self):
223 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
224 for s in self._streams]
227 for s in self.all_streams():
228 for f in s.all_files():
232 def manifest_text(self, strip=False, normalize=False):
234 cr = CollectionReader(self.manifest_text())
236 return cr.manifest_text(strip=strip, normalize=False)
238 return self.stripped_manifest()
240 return self._manifest_text
243 class _WriterFile(ArvadosFileBase):
244 def __init__(self, coll_writer, name):
245 super(_WriterFile, self).__init__(name, 'wb')
246 self.dest = coll_writer
249 super(_WriterFile, self).close()
250 self.dest.finish_current_file()
252 @ArvadosFileBase._before_close
253 def write(self, data):
254 self.dest.write(data)
256 @ArvadosFileBase._before_close
257 def writelines(self, seq):
261 @ArvadosFileBase._before_close
263 self.dest.flush_data()
266 class CollectionWriter(CollectionBase):
267 def __init__(self, api_client=None, num_retries=0):
268 """Instantiate a CollectionWriter.
270 CollectionWriter lets you build a new Arvados Collection from scratch.
271 Write files to it. The CollectionWriter will upload data to Keep as
272 appropriate, and provide you with the Collection manifest text when
276 * api_client: The API client to use to look up Collections. If not
277 provided, CollectionReader will build one from available Arvados
279 * num_retries: The default number of times to retry failed
280 service requests. Default 0. You may change this value
281 after instantiation, but note those changes may not
282 propagate to related objects like the Keep client.
284 self._api_client = api_client
285 self.num_retries = num_retries
286 self._keep_client = None
287 self._data_buffer = []
288 self._data_buffer_len = 0
289 self._current_stream_files = []
290 self._current_stream_length = 0
291 self._current_stream_locators = []
292 self._current_stream_name = '.'
293 self._current_file_name = None
294 self._current_file_pos = 0
295 self._finished_streams = []
296 self._close_file = None
297 self._queued_file = None
298 self._queued_dirents = deque()
299 self._queued_trees = deque()
300 self._last_open = None
302 def __exit__(self, exc_type, exc_value, traceback):
306 def do_queued_work(self):
307 # The work queue consists of three pieces:
308 # * _queued_file: The file object we're currently writing to the
310 # * _queued_dirents: Entries under the current directory
311 # (_queued_trees[0]) that we want to write or recurse through.
312 # This may contain files from subdirectories if
313 # max_manifest_depth == 0 for this directory.
314 # * _queued_trees: Directories that should be written as separate
315 # streams to the Collection.
316 # This function handles the smallest piece of work currently queued
317 # (current file, then current directory, then next directory) until
318 # no work remains. The _work_THING methods each do a unit of work on
319 # THING. _queue_THING methods add a THING to the work queue.
321 if self._queued_file:
323 elif self._queued_dirents:
325 elif self._queued_trees:
330 def _work_file(self):
332 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
336 self.finish_current_file()
338 self._queued_file.close()
339 self._close_file = None
340 self._queued_file = None
342 def _work_dirents(self):
343 path, stream_name, max_manifest_depth = self._queued_trees[0]
344 if stream_name != self.current_stream_name():
345 self.start_new_stream(stream_name)
346 while self._queued_dirents:
347 dirent = self._queued_dirents.popleft()
348 target = os.path.join(path, dirent)
349 if os.path.isdir(target):
350 self._queue_tree(target,
351 os.path.join(stream_name, dirent),
352 max_manifest_depth - 1)
354 self._queue_file(target, dirent)
356 if not self._queued_dirents:
357 self._queued_trees.popleft()
359 def _work_trees(self):
360 path, stream_name, max_manifest_depth = self._queued_trees[0]
361 d = util.listdir_recursive(
362 path, max_depth = (None if max_manifest_depth == 0 else 0))
364 self._queue_dirents(stream_name, d)
366 self._queued_trees.popleft()
368 def _queue_file(self, source, filename=None):
369 assert (self._queued_file is None), "tried to queue more than one file"
370 if not hasattr(source, 'read'):
371 source = open(source, 'rb')
372 self._close_file = True
374 self._close_file = False
376 filename = os.path.basename(source.name)
377 self.start_new_file(filename)
378 self._queued_file = source
380 def _queue_dirents(self, stream_name, dirents):
381 assert (not self._queued_dirents), "tried to queue more than one tree"
382 self._queued_dirents = deque(sorted(dirents))
384 def _queue_tree(self, path, stream_name, max_manifest_depth):
385 self._queued_trees.append((path, stream_name, max_manifest_depth))
387 def write_file(self, source, filename=None):
388 self._queue_file(source, filename)
389 self.do_queued_work()
391 def write_directory_tree(self,
392 path, stream_name='.', max_manifest_depth=-1):
393 self._queue_tree(path, stream_name, max_manifest_depth)
394 self.do_queued_work()
396 def write(self, newdata):
397 if hasattr(newdata, '__iter__'):
401 self._data_buffer.append(newdata)
402 self._data_buffer_len += len(newdata)
403 self._current_stream_length += len(newdata)
404 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
407 def open(self, streampath, filename=None):
408 """open(streampath[, filename]) -> file-like object
410 Pass in the path of a file to write to the Collection, either as a
411 single string or as two separate stream name and file name arguments.
412 This method returns a file-like object you can write to add it to the
415 You may only have one file object from the Collection open at a time,
416 so be sure to close the object when you're done. Using the object in
417 a with statement makes that easy::
419 with cwriter.open('./doc/page1.txt') as outfile:
420 outfile.write(page1_data)
421 with cwriter.open('./doc/page2.txt') as outfile:
422 outfile.write(page2_data)
425 streampath, filename = split(streampath)
426 if self._last_open and not self._last_open.closed:
427 raise errors.AssertionError(
428 "can't open '{}' when '{}' is still open".format(
429 filename, self._last_open.name))
430 if streampath != self.current_stream_name():
431 self.start_new_stream(streampath)
432 self.set_current_file_name(filename)
433 self._last_open = _WriterFile(self, filename)
434 return self._last_open
436 def flush_data(self):
437 data_buffer = ''.join(self._data_buffer)
439 self._current_stream_locators.append(
440 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
441 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
442 self._data_buffer_len = len(self._data_buffer[0])
444 def start_new_file(self, newfilename=None):
445 self.finish_current_file()
446 self.set_current_file_name(newfilename)
448 def set_current_file_name(self, newfilename):
449 if re.search(r'[\t\n]', newfilename):
450 raise errors.AssertionError(
451 "Manifest filenames cannot contain whitespace: %s" %
453 elif re.search(r'\x00', newfilename):
454 raise errors.AssertionError(
455 "Manifest filenames cannot contain NUL characters: %s" %
457 self._current_file_name = newfilename
459 def current_file_name(self):
460 return self._current_file_name
462 def finish_current_file(self):
463 if self._current_file_name is None:
464 if self._current_file_pos == self._current_stream_length:
466 raise errors.AssertionError(
467 "Cannot finish an unnamed file " +
468 "(%d bytes at offset %d in '%s' stream)" %
469 (self._current_stream_length - self._current_file_pos,
470 self._current_file_pos,
471 self._current_stream_name))
472 self._current_stream_files.append([
473 self._current_file_pos,
474 self._current_stream_length - self._current_file_pos,
475 self._current_file_name])
476 self._current_file_pos = self._current_stream_length
477 self._current_file_name = None
479 def start_new_stream(self, newstreamname='.'):
480 self.finish_current_stream()
481 self.set_current_stream_name(newstreamname)
483 def set_current_stream_name(self, newstreamname):
484 if re.search(r'[\t\n]', newstreamname):
485 raise errors.AssertionError(
486 "Manifest stream names cannot contain whitespace")
487 self._current_stream_name = '.' if newstreamname=='' else newstreamname
489 def current_stream_name(self):
490 return self._current_stream_name
492 def finish_current_stream(self):
493 self.finish_current_file()
495 if not self._current_stream_files:
497 elif self._current_stream_name is None:
498 raise errors.AssertionError(
499 "Cannot finish an unnamed stream (%d bytes in %d files)" %
500 (self._current_stream_length, len(self._current_stream_files)))
502 if not self._current_stream_locators:
503 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
504 self._finished_streams.append([self._current_stream_name,
505 self._current_stream_locators,
506 self._current_stream_files])
507 self._current_stream_files = []
508 self._current_stream_length = 0
509 self._current_stream_locators = []
510 self._current_stream_name = None
511 self._current_file_pos = 0
512 self._current_file_name = None
515 # Store the manifest in Keep and return its locator.
516 return self._my_keep().put(self.manifest_text())
518 def portable_data_hash(self):
519 stripped = self.stripped_manifest()
520 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
522 def manifest_text(self):
523 self.finish_current_stream()
526 for stream in self._finished_streams:
527 if not re.search(r'^\.(/.*)?$', stream[0]):
529 manifest += stream[0].replace(' ', '\\040')
530 manifest += ' ' + ' '.join(stream[1])
531 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
536 def data_locators(self):
538 for name, locators, files in self._finished_streams:
543 class ResumableCollectionWriter(CollectionWriter):
544 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
545 '_current_stream_locators', '_current_stream_name',
546 '_current_file_name', '_current_file_pos', '_close_file',
547 '_data_buffer', '_dependencies', '_finished_streams',
548 '_queued_dirents', '_queued_trees']
550 def __init__(self, api_client=None, num_retries=0):
551 self._dependencies = {}
552 super(ResumableCollectionWriter, self).__init__(
553 api_client, num_retries=num_retries)
556 def from_state(cls, state, *init_args, **init_kwargs):
557 # Try to build a new writer from scratch with the given state.
558 # If the state is not suitable to resume (because files have changed,
559 # been deleted, aren't predictable, etc.), raise a
560 # StaleWriterStateError. Otherwise, return the initialized writer.
561 # The caller is responsible for calling writer.do_queued_work()
562 # appropriately after it's returned.
563 writer = cls(*init_args, **init_kwargs)
564 for attr_name in cls.STATE_PROPS:
565 attr_value = state[attr_name]
566 attr_class = getattr(writer, attr_name).__class__
567 # Coerce the value into the same type as the initial value, if
569 if attr_class not in (type(None), attr_value.__class__):
570 attr_value = attr_class(attr_value)
571 setattr(writer, attr_name, attr_value)
572 # Check dependencies before we try to resume anything.
573 if any(KeepLocator(ls).permission_expired()
574 for ls in writer._current_stream_locators):
575 raise errors.StaleWriterStateError(
576 "locators include expired permission hint")
577 writer.check_dependencies()
578 if state['_current_file'] is not None:
579 path, pos = state['_current_file']
581 writer._queued_file = open(path, 'rb')
582 writer._queued_file.seek(pos)
583 except IOError as error:
584 raise errors.StaleWriterStateError(
585 "failed to reopen active file {}: {}".format(path, error))
588 def check_dependencies(self):
589 for path, orig_stat in self._dependencies.items():
590 if not S_ISREG(orig_stat[ST_MODE]):
591 raise errors.StaleWriterStateError("{} not file".format(path))
593 now_stat = tuple(os.stat(path))
594 except OSError as error:
595 raise errors.StaleWriterStateError(
596 "failed to stat {}: {}".format(path, error))
597 if ((not S_ISREG(now_stat[ST_MODE])) or
598 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
599 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
600 raise errors.StaleWriterStateError("{} changed".format(path))
602 def dump_state(self, copy_func=lambda x: x):
603 state = {attr: copy_func(getattr(self, attr))
604 for attr in self.STATE_PROPS}
605 if self._queued_file is None:
606 state['_current_file'] = None
608 state['_current_file'] = (os.path.realpath(self._queued_file.name),
609 self._queued_file.tell())
612 def _queue_file(self, source, filename=None):
614 src_path = os.path.realpath(source)
616 raise errors.AssertionError("{} not a file path".format(source))
618 path_stat = os.stat(src_path)
619 except OSError as stat_error:
621 super(ResumableCollectionWriter, self)._queue_file(source, filename)
622 fd_stat = os.fstat(self._queued_file.fileno())
623 if not S_ISREG(fd_stat.st_mode):
624 # We won't be able to resume from this cache anyway, so don't
625 # worry about further checks.
626 self._dependencies[source] = tuple(fd_stat)
627 elif path_stat is None:
628 raise errors.AssertionError(
629 "could not stat {}: {}".format(source, stat_error))
630 elif path_stat.st_ino != fd_stat.st_ino:
631 raise errors.AssertionError(
632 "{} changed between open and stat calls".format(source))
634 self._dependencies[src_path] = tuple(fd_stat)
636 def write(self, data):
637 if self._queued_file is None:
638 raise errors.AssertionError(
639 "resumable writer can't accept unsourced data")
640 return super(ResumableCollectionWriter, self).write(data)
643 class Collection(CollectionBase):
644 '''An abstract Arvados collection, consisting of a set of files and
648 def __init__(self, manifest_locator_or_text=None, parent=None, api_client=None,
649 keep_client=None, num_retries=0, block_manager=None):
650 '''manifest_locator_or_text: One of Arvados collection UUID, block locator of
651 a manifest, raw manifest text, or None (to create an empty collection).
653 parent: the parent Collection, may be None.
655 api_client: The API client object to use for requests. If None, use default.
657 keep_client: the Keep client to use for requests. If None, use default.
659 num_retries: the number of retries for API and Keep requests.
661 block_manager: the block manager to use. If None, create one.
665 self._api_client = api_client
666 self._keep_client = keep_client
667 self._block_manager = block_manager
669 self.num_retries = num_retries
670 self._manifest_locator = None
671 self._manifest_text = None
672 self._api_response = None
674 if manifest_locator_or_text:
675 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
676 self._manifest_locator = manifest_locator_or_text
677 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
678 self._manifest_locator = manifest_locator_or_text
679 elif re.match(util.manifest_pattern, manifest_locator_or_text):
680 self._manifest_text = manifest_locator_or_text
682 raise errors.ArgumentError(
683 "Argument to CollectionReader must be a manifest or a collection UUID")
686 if self._api_client is None:
687 if self.parent is not None:
688 return self.parent._my_api()
689 self._api_client = arvados.api('v1')
690 self._keep_client = None # Make a new one with the new api.
691 return self._api_client
694 if self._keep_client is None:
695 if self.parent is not None:
696 return self.parent._my_keep()
697 self._keep_client = KeepClient(api_client=self._my_api(),
698 num_retries=self.num_retries)
699 return self._keep_client
701 def _my_block_manager(self):
702 if self._block_manager is None:
703 if self.parent is not None:
704 return self.parent._my_block_manager()
705 self._block_manager = BlockManager(self._my_keep())
706 return self._block_manager
708 def _populate_from_api_server(self):
709 # As in KeepClient itself, we must wait until the last
710 # possible moment to instantiate an API client, in order to
711 # avoid tripping up clients that don't have access to an API
712 # server. If we do build one, make sure our Keep client uses
713 # it. If instantiation fails, we'll fall back to the except
714 # clause, just like any other Collection lookup
715 # failure. Return an exception, or None if successful.
717 self._api_response = self._my_api().collections().get(
718 uuid=self._manifest_locator).execute(
719 num_retries=self.num_retries)
720 self._manifest_text = self._api_response['manifest_text']
722 except Exception as e:
725 def _populate_from_keep(self):
726 # Retrieve a manifest directly from Keep. This has a chance of
727 # working if [a] the locator includes a permission signature
728 # or [b] the Keep services are operating in world-readable
729 # mode. Return an exception, or None if successful.
731 self._manifest_text = self._my_keep().get(
732 self._manifest_locator, num_retries=self.num_retries)
733 except Exception as e:
738 if self._manifest_locator is None and self._manifest_text is None:
741 error_via_keep = None
742 should_try_keep = ((self._manifest_text is None) and
743 util.keep_locator_pattern.match(
744 self._manifest_locator))
745 if ((self._manifest_text is None) and
746 util.signed_locator_pattern.match(self._manifest_locator)):
747 error_via_keep = self._populate_from_keep()
748 if self._manifest_text is None:
749 error_via_api = self._populate_from_api_server()
750 if error_via_api is not None and not should_try_keep:
752 if ((self._manifest_text is None) and
753 not error_via_keep and
755 # Looks like a keep locator, and we didn't already try keep above
756 error_via_keep = self._populate_from_keep()
757 if self._manifest_text is None:
759 raise arvados.errors.NotFoundError(
760 ("Failed to retrieve collection '{}' " +
761 "from either API server ({}) or Keep ({})."
763 self._manifest_locator,
767 import_manifest(self._manifest_text, self)
769 def _populate_first(orig_func):
770 # Decorator for methods that read actual Collection data.
771 @functools.wraps(orig_func)
772 def wrapper(self, *args, **kwargs):
773 if self._items is None:
775 return orig_func(self, *args, **kwargs)
781 def __exit__(self, exc_type, exc_value, traceback):
782 '''Support scoped auto-commit in a with: block'''
783 self.save(no_locator=True)
784 if self._block_manager is not None:
785 self._block_manager.stop_threads()
788 def find(self, path, create=False, create_collection=False):
789 '''Recursively search the specified file path. May return either a Collection
792 create: If true, create path components (i.e. Collections) that are
793 missing. If "create" is False, return None if a path component is not
796 create_collection: If the path is not found, "create" is True, and
797 "create_collection" is False, then create and return a new ArvadosFile
798 for the last path component. If "create_collection" is True, then
799 create and return a new Collection for the last path component.
806 item = self._items.get(p[0])
808 # item must be a file
809 if item is None and create:
811 if create_collection:
812 item = Collection(parent=self, num_retries=self.num_retries)
814 item = ArvadosFile(self)
815 self._items[p[0]] = item
818 if item is None and create:
819 # create new collection
820 item = Collection(parent=self, num_retries=self.num_retries)
821 self._items[p[0]] = item
823 return item.find("/".join(p), create=create)
828 def api_response(self):
829 """api_response() -> dict or None
831 Returns information about this Collection fetched from the API server.
832 If the Collection exists in Keep but not the API server, currently
833 returns None. Future versions may provide a synthetic response.
835 return self._api_response
837 def open(self, path, mode):
838 '''Open a file-like object for access.
840 path: path to a file in the collection
842 mode: one of "r", "r+", "w", "w+", "a", "a+"
843 "r" opens for reading
845 "r+" opens for reading and writing. Reads/writes share a file pointer.
847 "w", "w+" truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
849 "a", "a+" opens for reading and writing. All writes are appended to the end of the file. Writing does not affect the file pointer for reading.
851 mode = mode.replace("b", "")
852 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
853 raise ArgumentError("Bad mode '%s'" % mode)
854 create = (mode != "r")
856 f = self.find(path, create=create)
858 raise IOError((errno.ENOENT, "File not found"))
859 if not isinstance(f, ArvadosFile):
860 raise IOError((errno.EISDIR, "Path must refer to a file."))
866 return ArvadosFileReader(f, path, mode)
868 return ArvadosFileWriter(f, path, mode)
872 '''Test if the collection (or any subcollection or file) has been modified
873 since it was created.'''
874 for k,v in self._items.items():
880 def set_unmodified(self):
881 '''Recursively clear modified flag'''
882 for k,v in self._items.items():
887 '''Iterate over names of files and collections contained in this collection.'''
888 return self._items.iterkeys()
892 '''Iterate over names of files and collections directly contained in this collection.'''
893 return self._items.iterkeys()
896 def __getitem__(self, k):
897 '''Get a file or collection that is directly contained by this collection. Use
898 find() for path serach.'''
899 return self._items[k]
902 def __contains__(self, k):
903 '''If there is a file or collection a directly contained by this collection
905 return k in self._items
909 '''Get the number of items directly contained in this collection'''
910 return len(self._items)
913 def __delitem__(self, p):
914 '''Delete an item by name which is directly contained by this collection.'''
919 '''Get a list of names of files and collections directly contained in this collection.'''
920 return self._items.keys()
924 '''Get a list of files and collection objects directly contained in this collection.'''
925 return self._items.values()
929 '''Get a list of (name, object) tuples directly contained in this collection.'''
930 return self._items.items()
933 def exists(self, path):
934 '''Test if there is a file or collection at "path"'''
935 return self.find(path) != None
938 def remove(self, path):
939 '''Test if there is a file or collection at "path"'''
945 item = self._items.get(p[0])
947 raise IOError((errno.ENOENT, "File not found"))
949 del self._items[p[0]]
952 item.remove("/".join(p))
954 raise IOError((errno.ENOENT, "File not found"))
957 def manifest_text(self, strip=False, normalize=False):
958 '''Get the manifest text for this collection, sub collections and files.
960 strip: If True, remove signing tokens from block locators if present.
961 If False, block locators are left unchanged.
963 normalize: If True, always export the manifest text in normalized form
964 even if the Collection is not modified. If False and the collection is
965 not modified, return the original manifest text even if it is not in
968 if self.modified() or self._manifest_text is None or normalize:
969 return export_manifest(self, stream_name=".", portable_locators=strip)
972 return self.stripped_manifest()
974 return self._manifest_text
976 def portable_data_hash(self):
977 '''Get the portable data hash for this collection's manifest.'''
978 stripped = self.manifest_text(strip=True)
979 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
982 def save(self, no_locator=False):
983 '''Commit pending buffer blocks to Keep, write the manifest to Keep, and
984 update the collection record to Keep.
986 no_locator: If False and there is no collection uuid associated with
987 this Collection, raise an error. If True, do not raise an error.
990 self._my_block_manager().commit_all()
991 self._my_keep().put(self.manifest_text(strip=True))
992 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
993 self._api_response = self._my_api().collections().update(
994 uuid=self._manifest_locator,
995 body={'manifest_text': self.manifest_text(strip=False)}
997 num_retries=self.num_retries)
999 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1000 self.set_unmodified()
1003 def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1004 '''Save a new collection record.
1006 name: The collection name.
1008 owner_uuid: the user, or project uuid that will own this collection.
1009 If None, defaults to the current user.
1011 ensure_unique_name: If True, ask the API server to rename the
1012 collection if it conflicts with a collection with the same name and
1013 owner. If False, a name conflict will result in an error.
1015 self._my_block_manager().commit_all()
1016 self._my_keep().put(self.manifest_text(strip=True))
1017 body = {"manifest_text": self.manifest_text(strip=False),
1020 body["owner_uuid"] = owner_uuid
1021 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1022 self._manifest_locator = self._api_response["uuid"]
1023 self.set_unmodified()
1026 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1027 '''Import a manifest into a Collection.
1029 manifest_text: The manifest text to import from.
1031 into_collection: The Collection that will be initialized (must be empty).
1032 If None, create a new Collection object.
1034 api_client: The API client object that will be used when creating a new Collection object.
1036 keep: The keep client object that will be used when creating a new Collection object.
1038 num_retries: the default number of api client and keep retries on error.
1040 if into_collection is not None:
1041 if len(into_collection) > 0:
1042 raise ArgumentError("Can only import manifest into an empty collection")
1045 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1054 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1058 if state == STREAM_NAME:
1059 # starting a new stream
1060 stream_name = tok.replace('\\040', ' ')
1068 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1070 blocksize = long(s.group(1))
1071 blocks.append(Range(tok, streamoffset, blocksize))
1072 streamoffset += blocksize
1076 if state == SEGMENTS:
1077 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1079 pos = long(s.group(1))
1080 size = long(s.group(2))
1081 name = s.group(3).replace('\\040', ' ')
1082 f = c.find("%s/%s" % (stream_name, name), create=True)
1083 f.add_segment(blocks, pos, size)
1086 raise errors.SyntaxError("Invalid manifest format")
1095 def export_manifest(item, stream_name=".", portable_locators=False):
1096 '''Create a manifest for "item" (must be a Collection or ArvadosFile). If
1097 "item" is a is a Collection, this will also export subcollections.
1099 stream_name: the name of the stream when exporting "item".
1101 portable_locators: If True, strip any permission hints on block locators.
1102 If False, use block locators as-is.
1105 if isinstance(item, Collection):
1107 sorted_keys = sorted(item.keys())
1108 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1111 for s in v.segments:
1113 if loc.startswith("bufferblock"):
1114 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1115 if portable_locators:
1116 loc = KeepLocator(loc).stripped()
1117 st.append(LocatorAndRange(loc, locator_block_size(loc),
1118 s.segment_offset, s.range_size))
1121 buf += ' '.join(normalize_stream(stream_name, stream))
1123 for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
1124 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1125 elif isinstance(item, ArvadosFile):
1127 for s in item.segments:
1129 if loc.startswith("bufferblock"):
1130 loc = item._bufferblocks[loc].calculate_locator()
1131 if portable_locators:
1132 loc = KeepLocator(loc).stripped()
1133 st.append(LocatorAndRange(loc, locator_block_size(loc),
1134 s.segment_offset, s.range_size))
1135 stream[stream_name] = st
1136 buf += ' '.join(normalize_stream(stream_name, stream))