8 from collections import deque
11 from .arvfile import split, FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import ThreadSafeApiCache
20 from arvados.retry import retry_method
22 _logger = logging.getLogger('arvados.collection')
24 class CollectionBase(object):
28 def __exit__(self, exc_type, exc_value, traceback):
32 if self._keep_client is None:
33 self._keep_client = KeepClient(api_client=self._api_client,
34 num_retries=self.num_retries)
35 return self._keep_client
37 def stripped_manifest(self):
39 Return the manifest for the current collection with all
40 non-portable hints (i.e., permission signatures and other
41 hints other than size hints) removed from the locators.
43 raw = self.manifest_text()
45 for line in raw.split("\n"):
48 clean_fields = fields[:1] + [
49 (re.sub(r'\+[^\d][^\+]*', '', x)
50 if re.match(util.keep_locator_pattern, x)
53 clean += [' '.join(clean_fields), "\n"]
57 class CollectionReader(CollectionBase):
58 def __init__(self, manifest_locator_or_text, api_client=None,
59 keep_client=None, num_retries=0):
60 """Instantiate a CollectionReader.
62 This class parses Collection manifests to provide a simple interface
63 to read its underlying files.
66 * manifest_locator_or_text: One of a Collection UUID, portable data
67 hash, or full manifest text.
68 * api_client: The API client to use to look up Collections. If not
69 provided, CollectionReader will build one from available Arvados
71 * keep_client: The KeepClient to use to download Collection data.
72 If not provided, CollectionReader will build one from available
73 Arvados configuration.
74 * num_retries: The default number of times to retry failed
75 service requests. Default 0. You may change this value
76 after instantiation, but note those changes may not
77 propagate to related objects like the Keep client.
79 self._api_client = api_client
80 self._keep_client = keep_client
81 self.num_retries = num_retries
82 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83 self._manifest_locator = manifest_locator_or_text
84 self._manifest_text = None
85 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86 self._manifest_locator = manifest_locator_or_text
87 self._manifest_text = None
88 elif re.match(util.manifest_pattern, manifest_locator_or_text):
89 self._manifest_text = manifest_locator_or_text
90 self._manifest_locator = None
92 raise errors.ArgumentError(
93 "Argument to CollectionReader must be a manifest or a collection UUID")
94 self._api_response = None
97 def _populate_from_api_server(self):
98 # As in KeepClient itself, we must wait until the last
99 # possible moment to instantiate an API client, in order to
100 # avoid tripping up clients that don't have access to an API
101 # server. If we do build one, make sure our Keep client uses
102 # it. If instantiation fails, we'll fall back to the except
103 # clause, just like any other Collection lookup
104 # failure. Return an exception, or None if successful.
106 if self._api_client is None:
107 self._api_client = arvados.api('v1')
108 self._keep_client = None # Make a new one with the new api.
109 self._api_response = self._api_client.collections().get(
110 uuid=self._manifest_locator).execute(
111 num_retries=self.num_retries)
112 self._manifest_text = self._api_response['manifest_text']
114 except Exception as e:
117 def _populate_from_keep(self):
118 # Retrieve a manifest directly from Keep. This has a chance of
119 # working if [a] the locator includes a permission signature
120 # or [b] the Keep services are operating in world-readable
121 # mode. Return an exception, or None if successful.
123 self._manifest_text = self._my_keep().get(
124 self._manifest_locator, num_retries=self.num_retries)
125 except Exception as e:
130 error_via_keep = None
131 should_try_keep = ((self._manifest_text is None) and
132 util.keep_locator_pattern.match(
133 self._manifest_locator))
134 if ((self._manifest_text is None) and
135 util.signed_locator_pattern.match(self._manifest_locator)):
136 error_via_keep = self._populate_from_keep()
137 if self._manifest_text is None:
138 error_via_api = self._populate_from_api_server()
139 if error_via_api is not None and not should_try_keep:
141 if ((self._manifest_text is None) and
142 not error_via_keep and
144 # Looks like a keep locator, and we didn't already try keep above
145 error_via_keep = self._populate_from_keep()
146 if self._manifest_text is None:
148 raise arvados.errors.NotFoundError(
149 ("Failed to retrieve collection '{}' " +
150 "from either API server ({}) or Keep ({})."
152 self._manifest_locator,
155 self._streams = [sline.split()
156 for sline in self._manifest_text.split("\n")
159 def _populate_first(orig_func):
160 # Decorator for methods that read actual Collection data.
161 @functools.wraps(orig_func)
162 def populate_first_wrapper(self, *args, **kwargs):
163 if self._streams is None:
165 return orig_func(self, *args, **kwargs)
166 return populate_first_wrapper
169 def api_response(self):
170 """api_response() -> dict or None
172 Returns information about this Collection fetched from the API server.
173 If the Collection exists in Keep but not the API server, currently
174 returns None. Future versions may provide a synthetic response.
176 return self._api_response
182 for s in self.all_streams():
183 for f in s.all_files():
184 streamname, filename = split(s.name() + "/" + f.name())
185 if streamname not in streams:
186 streams[streamname] = {}
187 if filename not in streams[streamname]:
188 streams[streamname][filename] = []
190 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
192 self._streams = [normalize_stream(s, streams[s])
193 for s in sorted(streams)]
195 # Regenerate the manifest text based on the normalized streams
196 self._manifest_text = ''.join(
197 [StreamReader(stream, keep=self._my_keep()).manifest_text()
198 for stream in self._streams])
201 def open(self, streampath, filename=None):
202 """open(streampath[, filename]) -> file-like object
204 Pass in the path of a file to read from the Collection, either as a
205 single string or as two separate stream name and file name arguments.
206 This method returns a file-like object to read that file.
209 streampath, filename = split(streampath)
210 keep_client = self._my_keep()
211 for stream_s in self._streams:
212 stream = StreamReader(stream_s, keep_client,
213 num_retries=self.num_retries)
214 if stream.name() == streampath:
217 raise ValueError("stream '{}' not found in Collection".
220 return stream.files()[filename]
222 raise ValueError("file '{}' not found in Collection stream '{}'".
223 format(filename, streampath))
226 def all_streams(self):
227 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228 for s in self._streams]
231 for s in self.all_streams():
232 for f in s.all_files():
236 def manifest_text(self, strip=False, normalize=False):
238 cr = CollectionReader(self.manifest_text())
240 return cr.manifest_text(strip=strip, normalize=False)
242 return self.stripped_manifest()
244 return self._manifest_text
247 class _WriterFile(FileLikeObjectBase):
248 def __init__(self, coll_writer, name):
249 super(_WriterFile, self).__init__(name, 'wb')
250 self.dest = coll_writer
253 super(_WriterFile, self).close()
254 self.dest.finish_current_file()
256 @FileLikeObjectBase._before_close
257 def write(self, data):
258 self.dest.write(data)
260 @FileLikeObjectBase._before_close
261 def writelines(self, seq):
265 @FileLikeObjectBase._before_close
267 self.dest.flush_data()
270 class CollectionWriter(CollectionBase):
271 def __init__(self, api_client=None, num_retries=0, replication=None):
272 """Instantiate a CollectionWriter.
274 CollectionWriter lets you build a new Arvados Collection from scratch.
275 Write files to it. The CollectionWriter will upload data to Keep as
276 appropriate, and provide you with the Collection manifest text when
280 * api_client: The API client to use to look up Collections. If not
281 provided, CollectionReader will build one from available Arvados
283 * num_retries: The default number of times to retry failed
284 service requests. Default 0. You may change this value
285 after instantiation, but note those changes may not
286 propagate to related objects like the Keep client.
287 * replication: The number of copies of each block to store.
288 If this argument is None or not supplied, replication is
289 the server-provided default if available, otherwise 2.
291 self._api_client = api_client
292 self.num_retries = num_retries
293 self.replication = (2 if replication is None else replication)
294 self._keep_client = None
295 self._data_buffer = []
296 self._data_buffer_len = 0
297 self._current_stream_files = []
298 self._current_stream_length = 0
299 self._current_stream_locators = []
300 self._current_stream_name = '.'
301 self._current_file_name = None
302 self._current_file_pos = 0
303 self._finished_streams = []
304 self._close_file = None
305 self._queued_file = None
306 self._queued_dirents = deque()
307 self._queued_trees = deque()
308 self._last_open = None
310 def __exit__(self, exc_type, exc_value, traceback):
314 def do_queued_work(self):
315 # The work queue consists of three pieces:
316 # * _queued_file: The file object we're currently writing to the
318 # * _queued_dirents: Entries under the current directory
319 # (_queued_trees[0]) that we want to write or recurse through.
320 # This may contain files from subdirectories if
321 # max_manifest_depth == 0 for this directory.
322 # * _queued_trees: Directories that should be written as separate
323 # streams to the Collection.
324 # This function handles the smallest piece of work currently queued
325 # (current file, then current directory, then next directory) until
326 # no work remains. The _work_THING methods each do a unit of work on
327 # THING. _queue_THING methods add a THING to the work queue.
329 if self._queued_file:
331 elif self._queued_dirents:
333 elif self._queued_trees:
338 def _work_file(self):
340 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
344 self.finish_current_file()
346 self._queued_file.close()
347 self._close_file = None
348 self._queued_file = None
350 def _work_dirents(self):
351 path, stream_name, max_manifest_depth = self._queued_trees[0]
352 if stream_name != self.current_stream_name():
353 self.start_new_stream(stream_name)
354 while self._queued_dirents:
355 dirent = self._queued_dirents.popleft()
356 target = os.path.join(path, dirent)
357 if os.path.isdir(target):
358 self._queue_tree(target,
359 os.path.join(stream_name, dirent),
360 max_manifest_depth - 1)
362 self._queue_file(target, dirent)
364 if not self._queued_dirents:
365 self._queued_trees.popleft()
367 def _work_trees(self):
368 path, stream_name, max_manifest_depth = self._queued_trees[0]
369 d = util.listdir_recursive(
370 path, max_depth = (None if max_manifest_depth == 0 else 0))
372 self._queue_dirents(stream_name, d)
374 self._queued_trees.popleft()
376 def _queue_file(self, source, filename=None):
377 assert (self._queued_file is None), "tried to queue more than one file"
378 if not hasattr(source, 'read'):
379 source = open(source, 'rb')
380 self._close_file = True
382 self._close_file = False
384 filename = os.path.basename(source.name)
385 self.start_new_file(filename)
386 self._queued_file = source
388 def _queue_dirents(self, stream_name, dirents):
389 assert (not self._queued_dirents), "tried to queue more than one tree"
390 self._queued_dirents = deque(sorted(dirents))
392 def _queue_tree(self, path, stream_name, max_manifest_depth):
393 self._queued_trees.append((path, stream_name, max_manifest_depth))
395 def write_file(self, source, filename=None):
396 self._queue_file(source, filename)
397 self.do_queued_work()
399 def write_directory_tree(self,
400 path, stream_name='.', max_manifest_depth=-1):
401 self._queue_tree(path, stream_name, max_manifest_depth)
402 self.do_queued_work()
404 def write(self, newdata):
405 if hasattr(newdata, '__iter__'):
409 self._data_buffer.append(newdata)
410 self._data_buffer_len += len(newdata)
411 self._current_stream_length += len(newdata)
412 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
415 def open(self, streampath, filename=None):
416 """open(streampath[, filename]) -> file-like object
418 Pass in the path of a file to write to the Collection, either as a
419 single string or as two separate stream name and file name arguments.
420 This method returns a file-like object you can write to add it to the
423 You may only have one file object from the Collection open at a time,
424 so be sure to close the object when you're done. Using the object in
425 a with statement makes that easy::
427 with cwriter.open('./doc/page1.txt') as outfile:
428 outfile.write(page1_data)
429 with cwriter.open('./doc/page2.txt') as outfile:
430 outfile.write(page2_data)
433 streampath, filename = split(streampath)
434 if self._last_open and not self._last_open.closed:
435 raise errors.AssertionError(
436 "can't open '{}' when '{}' is still open".format(
437 filename, self._last_open.name))
438 if streampath != self.current_stream_name():
439 self.start_new_stream(streampath)
440 self.set_current_file_name(filename)
441 self._last_open = _WriterFile(self, filename)
442 return self._last_open
444 def flush_data(self):
445 data_buffer = ''.join(self._data_buffer)
447 self._current_stream_locators.append(
449 data_buffer[0:config.KEEP_BLOCK_SIZE],
450 copies=self.replication))
451 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
452 self._data_buffer_len = len(self._data_buffer[0])
454 def start_new_file(self, newfilename=None):
455 self.finish_current_file()
456 self.set_current_file_name(newfilename)
458 def set_current_file_name(self, newfilename):
459 if re.search(r'[\t\n]', newfilename):
460 raise errors.AssertionError(
461 "Manifest filenames cannot contain whitespace: %s" %
463 elif re.search(r'\x00', newfilename):
464 raise errors.AssertionError(
465 "Manifest filenames cannot contain NUL characters: %s" %
467 self._current_file_name = newfilename
469 def current_file_name(self):
470 return self._current_file_name
472 def finish_current_file(self):
473 if self._current_file_name is None:
474 if self._current_file_pos == self._current_stream_length:
476 raise errors.AssertionError(
477 "Cannot finish an unnamed file " +
478 "(%d bytes at offset %d in '%s' stream)" %
479 (self._current_stream_length - self._current_file_pos,
480 self._current_file_pos,
481 self._current_stream_name))
482 self._current_stream_files.append([
483 self._current_file_pos,
484 self._current_stream_length - self._current_file_pos,
485 self._current_file_name])
486 self._current_file_pos = self._current_stream_length
487 self._current_file_name = None
489 def start_new_stream(self, newstreamname='.'):
490 self.finish_current_stream()
491 self.set_current_stream_name(newstreamname)
493 def set_current_stream_name(self, newstreamname):
494 if re.search(r'[\t\n]', newstreamname):
495 raise errors.AssertionError(
496 "Manifest stream names cannot contain whitespace")
497 self._current_stream_name = '.' if newstreamname=='' else newstreamname
499 def current_stream_name(self):
500 return self._current_stream_name
502 def finish_current_stream(self):
503 self.finish_current_file()
505 if not self._current_stream_files:
507 elif self._current_stream_name is None:
508 raise errors.AssertionError(
509 "Cannot finish an unnamed stream (%d bytes in %d files)" %
510 (self._current_stream_length, len(self._current_stream_files)))
512 if not self._current_stream_locators:
513 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
514 self._finished_streams.append([self._current_stream_name,
515 self._current_stream_locators,
516 self._current_stream_files])
517 self._current_stream_files = []
518 self._current_stream_length = 0
519 self._current_stream_locators = []
520 self._current_stream_name = None
521 self._current_file_pos = 0
522 self._current_file_name = None
525 """Store the manifest in Keep and return its locator.
527 This is useful for storing manifest fragments (task outputs)
528 temporarily in Keep during a Crunch job.
530 In other cases you should make a collection instead, by
531 sending manifest_text() to the API server's "create
532 collection" endpoint.
534 return self._my_keep().put(self.manifest_text(), copies=self.replication)
536 def portable_data_hash(self):
537 stripped = self.stripped_manifest()
538 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
540 def manifest_text(self):
541 self.finish_current_stream()
544 for stream in self._finished_streams:
545 if not re.search(r'^\.(/.*)?$', stream[0]):
547 manifest += stream[0].replace(' ', '\\040')
548 manifest += ' ' + ' '.join(stream[1])
549 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
554 def data_locators(self):
556 for name, locators, files in self._finished_streams:
561 class ResumableCollectionWriter(CollectionWriter):
562 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
563 '_current_stream_locators', '_current_stream_name',
564 '_current_file_name', '_current_file_pos', '_close_file',
565 '_data_buffer', '_dependencies', '_finished_streams',
566 '_queued_dirents', '_queued_trees']
568 def __init__(self, api_client=None, **kwargs):
569 self._dependencies = {}
570 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
573 def from_state(cls, state, *init_args, **init_kwargs):
574 # Try to build a new writer from scratch with the given state.
575 # If the state is not suitable to resume (because files have changed,
576 # been deleted, aren't predictable, etc.), raise a
577 # StaleWriterStateError. Otherwise, return the initialized writer.
578 # The caller is responsible for calling writer.do_queued_work()
579 # appropriately after it's returned.
580 writer = cls(*init_args, **init_kwargs)
581 for attr_name in cls.STATE_PROPS:
582 attr_value = state[attr_name]
583 attr_class = getattr(writer, attr_name).__class__
584 # Coerce the value into the same type as the initial value, if
586 if attr_class not in (type(None), attr_value.__class__):
587 attr_value = attr_class(attr_value)
588 setattr(writer, attr_name, attr_value)
589 # Check dependencies before we try to resume anything.
590 if any(KeepLocator(ls).permission_expired()
591 for ls in writer._current_stream_locators):
592 raise errors.StaleWriterStateError(
593 "locators include expired permission hint")
594 writer.check_dependencies()
595 if state['_current_file'] is not None:
596 path, pos = state['_current_file']
598 writer._queued_file = open(path, 'rb')
599 writer._queued_file.seek(pos)
600 except IOError as error:
601 raise errors.StaleWriterStateError(
602 "failed to reopen active file {}: {}".format(path, error))
605 def check_dependencies(self):
606 for path, orig_stat in self._dependencies.items():
607 if not S_ISREG(orig_stat[ST_MODE]):
608 raise errors.StaleWriterStateError("{} not file".format(path))
610 now_stat = tuple(os.stat(path))
611 except OSError as error:
612 raise errors.StaleWriterStateError(
613 "failed to stat {}: {}".format(path, error))
614 if ((not S_ISREG(now_stat[ST_MODE])) or
615 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
616 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
617 raise errors.StaleWriterStateError("{} changed".format(path))
619 def dump_state(self, copy_func=lambda x: x):
620 state = {attr: copy_func(getattr(self, attr))
621 for attr in self.STATE_PROPS}
622 if self._queued_file is None:
623 state['_current_file'] = None
625 state['_current_file'] = (os.path.realpath(self._queued_file.name),
626 self._queued_file.tell())
629 def _queue_file(self, source, filename=None):
631 src_path = os.path.realpath(source)
633 raise errors.AssertionError("{} not a file path".format(source))
635 path_stat = os.stat(src_path)
636 except OSError as stat_error:
638 super(ResumableCollectionWriter, self)._queue_file(source, filename)
639 fd_stat = os.fstat(self._queued_file.fileno())
640 if not S_ISREG(fd_stat.st_mode):
641 # We won't be able to resume from this cache anyway, so don't
642 # worry about further checks.
643 self._dependencies[source] = tuple(fd_stat)
644 elif path_stat is None:
645 raise errors.AssertionError(
646 "could not stat {}: {}".format(source, stat_error))
647 elif path_stat.st_ino != fd_stat.st_ino:
648 raise errors.AssertionError(
649 "{} changed between open and stat calls".format(source))
651 self._dependencies[src_path] = tuple(fd_stat)
653 def write(self, data):
654 if self._queued_file is None:
655 raise errors.AssertionError(
656 "resumable writer can't accept unsourced data")
657 return super(ResumableCollectionWriter, self).write(data)
663 COLLECTION = "collection"
665 class SynchronizedCollectionBase(CollectionBase):
666 """Base class for Collections and Subcollections.
668 Implements the majority of functionality relating to accessing items in the
673 def __init__(self, parent=None):
675 self._modified = True
679 raise NotImplementedError()
682 raise NotImplementedError()
684 def _my_block_manager(self):
685 raise NotImplementedError()
688 raise NotImplementedError()
691 raise NotImplementedError()
693 def root_collection(self):
694 raise NotImplementedError()
696 def notify(self, event, collection, name, item):
697 raise NotImplementedError()
701 def find_or_create(self, path, create_type):
702 """Recursively search the specified file path.
704 May return either a `Collection` or `ArvadosFile`. If not found, will
705 create a new item at the specified path based on `create_type`. Will
706 create intermediate subcollections needed to contain the final item in
710 One of `arvado.collection.FILE` or
711 `arvado.collection.COLLECTION`. If the path is not found, and value
712 of create_type is FILE then create and return a new ArvadosFile for
713 the last path component. If COLLECTION, then create and return a new
714 Collection for the last path component.
718 if self.sync_mode() == SYNC_READONLY:
719 raise IOError((errno.EROFS, "Collection is read only"))
721 pathcomponents = path.split("/")
722 if pathcomponents[0] == '.':
723 del pathcomponents[0]
725 if pathcomponents and pathcomponents[0]:
726 item = self._items.get(pathcomponents[0])
727 if len(pathcomponents) == 1:
728 # item must be a file
731 if create_type == COLLECTION:
732 item = Subcollection(self)
734 item = ArvadosFile(self)
735 self._items[pathcomponents[0]] = item
736 self._modified = True
737 self.notify(ADD, self, pathcomponents[0], item)
741 # create new collection
742 item = Subcollection(self)
743 self._items[pathcomponents[0]] = item
744 self._modified = True
745 self.notify(ADD, self, pathcomponents[0], item)
746 del pathcomponents[0]
747 if isinstance(item, SynchronizedCollectionBase):
748 return item.find_or_create("/".join(pathcomponents), create_type)
750 raise errors.ArgumentError("Interior path components must be subcollection")
755 def find(self, path):
756 """Recursively search the specified file path.
758 May return either a Collection or ArvadosFile. Return None if not
762 pathcomponents = path.split("/")
763 if pathcomponents[0] == '.':
764 del pathcomponents[0]
766 if pathcomponents and pathcomponents[0]:
767 item = self._items.get(pathcomponents[0])
768 if len(pathcomponents) == 1:
769 # item must be a file
772 del pathcomponents[0]
773 if isinstance(item, SynchronizedCollectionBase):
774 return item.find("/".join(pathcomponents))
776 raise errors.ArgumentError("Interior path components must be subcollection")
781 """Recursive subcollection create.
783 Like `os.mkdirs()`. Will create intermediate subcollections needed to
784 contain the leaf subcollection path.
787 return self.find_or_create(path, COLLECTION)
789 def open(self, path, mode):
790 """Open a file-like object for access.
793 path to a file in the collection
795 one of "r", "r+", "w", "w+", "a", "a+"
799 opens for reading and writing. Reads/writes share a file pointer.
801 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
803 opens for reading and writing. All writes are appended to
804 the end of the file. Writing does not affect the file pointer for
807 mode = mode.replace("b", "")
808 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
809 raise ArgumentError("Bad mode '%s'" % mode)
810 create = (mode != "r")
812 if create and self.sync_mode() == SYNC_READONLY:
813 raise IOError((errno.EROFS, "Collection is read only"))
816 arvfile = self.find_or_create(path, FILE)
818 arvfile = self.find(path)
821 raise IOError((errno.ENOENT, "File not found"))
822 if not isinstance(arvfile, ArvadosFile):
823 raise IOError((errno.EISDIR, "Path must refer to a file."))
829 return ArvadosFileReader(arvfile, path, mode, num_retries=self.num_retries)
831 return ArvadosFileWriter(arvfile, path, mode, num_retries=self.num_retries)
835 """Test if the collection (or any subcollection or file) has been modified
836 since it was created."""
839 for k,v in self._items.items():
845 def set_unmodified(self):
846 """Recursively clear modified flag."""
847 self._modified = False
848 for k,v in self._items.items():
853 """Iterate over names of files and collections contained in this collection."""
854 return iter(self._items.keys())
858 """Iterate over names of files and collections directly contained in this collection."""
859 return self._items.keys()
862 def __getitem__(self, k):
863 """Get a file or collection that is directly contained by this collection. If
864 you want to search a path, use `find()` instead.
866 return self._items[k]
869 def __contains__(self, k):
870 """If there is a file or collection a directly contained by this collection
872 return k in self._items
876 """Get the number of items directly contained in this collection."""
877 return len(self._items)
881 def __delitem__(self, p):
882 """Delete an item by name which is directly contained by this collection."""
884 self._modified = True
885 self.notify(DEL, self, p, None)
889 """Get a list of names of files and collections directly contained in this collection."""
890 return self._items.keys()
894 """Get a list of files and collection objects directly contained in this collection."""
895 return self._items.values()
899 """Get a list of (name, object) tuples directly contained in this collection."""
900 return self._items.items()
902 def exists(self, path):
903 """Test if there is a file or collection at `path`."""
904 return self.find(path) != None
908 def remove(self, path, recursive=False):
909 """Remove the file or subcollection (directory) at `path`.
912 Specify whether to remove non-empty subcollections (True), or raise an error (False).
914 pathcomponents = path.split("/")
915 if pathcomponents[0] == '.':
916 # Remove '.' from the front of the path
917 del pathcomponents[0]
919 if len(pathcomponents) > 0:
920 item = self._items.get(pathcomponents[0])
922 raise IOError((errno.ENOENT, "File not found"))
923 if len(pathcomponents) == 1:
924 if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
925 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
926 deleteditem = self._items[pathcomponents[0]]
927 del self._items[pathcomponents[0]]
928 self._modified = True
929 self.notify(DEL, self, pathcomponents[0], deleteditem)
931 del pathcomponents[0]
932 item.remove("/".join(pathcomponents))
934 raise IOError((errno.ENOENT, "File not found"))
936 def _cloneinto(self, target):
937 for k,v in self._items.items():
938 target._items[k] = v.clone(target)
941 raise NotImplementedError()
945 def copy(self, source, target_path, source_collection=None, overwrite=False):
946 """Copy a file or subcollection to a new path in this collection.
949 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
952 Destination file or path. If the target path already exists and is a
953 subcollection, the item will be placed inside the subcollection. If
954 the target path already exists and is a file, this will raise an error
955 unless you specify `overwrite=True`.
958 Collection to copy `source_path` from (default `self`)
961 Whether to overwrite target file if it already exists.
963 if source_collection is None:
964 source_collection = self
966 # Find the object to copy
967 if isinstance(source, basestring):
968 source_obj = source_collection.find(source)
969 if source_obj is None:
970 raise IOError((errno.ENOENT, "File not found"))
971 sourcecomponents = source.split("/")
974 sourcecomponents = None
976 # Find parent collection the target path
977 targetcomponents = target_path.split("/")
979 # Determine the name to use.
980 target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
983 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
985 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
987 with target_dir.lock:
988 if target_name in target_dir:
989 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
990 target_dir = target_dir[target_name]
991 target_name = sourcecomponents[-1]
993 raise IOError((errno.EEXIST, "File already exists"))
996 if target_name in target_dir:
997 modified_from = target_dir[target_name]
999 # Actually make the copy.
1000 dup = source_obj.clone(target_dir)
1001 target_dir._items[target_name] = dup
1002 target_dir._modified = True
1005 self.notify(MOD, target_dir, target_name, (modified_from, dup))
1007 self.notify(ADD, target_dir, target_name, dup)
1010 def manifest_text(self, strip=False, normalize=False):
1011 """Get the manifest text for this collection, sub collections and files.
1014 If True, remove signing tokens from block locators if present.
1015 If False, block locators are left unchanged.
1018 If True, always export the manifest text in normalized form
1019 even if the Collection is not modified. If False and the collection
1020 is not modified, return the original manifest text even if it is not
1024 if self.modified() or self._manifest_text is None or normalize:
1025 return export_manifest(self, stream_name=".", portable_locators=strip)
1028 return self.stripped_manifest()
1030 return self._manifest_text
1033 def diff(self, end_collection, prefix=".", holding_collection=None):
1035 Generate list of add/modify/delete actions which, when given to `apply`, will
1036 change `self` to match `end_collection`
1039 if holding_collection is None:
1040 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_EXPLICIT)
1042 if k not in end_collection:
1043 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
1044 for k in end_collection:
1046 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1047 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1048 elif end_collection[k] != self[k]:
1049 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
1051 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
1056 def apply(self, changes):
1057 """Apply changes from `diff`.
1059 If a change conflicts with a local change, it will be saved to an
1060 alternate path indicating the conflict.
1063 for change in changes:
1064 event_type = change[0]
1067 local = self.find(path)
1068 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1070 if event_type == ADD:
1072 # No local file at path, safe to copy over new file
1073 self.copy(initial, path)
1074 elif local is not None and local != initial:
1075 # There is already local file and it is different:
1076 # save change to conflict file.
1077 self.copy(initial, conflictpath)
1078 elif event_type == MOD:
1080 if local == initial:
1081 # Local matches the "initial" item so it has not
1082 # changed locally and is safe to update.
1083 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1084 # Replace contents of local file with new contents
1085 local.replace_contents(final)
1087 # Overwrite path with new item; this can happen if
1088 # path was a file and is now a collection or vice versa
1089 self.copy(final, path, overwrite=True)
1091 # Local is missing (presumably deleted) or local doesn't
1092 # match the "start" value, so save change to conflict file
1093 self.copy(final, conflictpath)
1094 elif event_type == DEL:
1095 if local == initial:
1096 # Local item matches "initial" value, so it is safe to remove.
1097 self.remove(path, recursive=True)
1098 # else, the file is modified or already removed, in either
1099 # case we don't want to try to remove it.
1101 def portable_data_hash(self):
1102 """Get the portable data hash for this collection's manifest."""
1103 stripped = self.manifest_text(strip=True)
1104 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1107 def __eq__(self, other):
1110 if not isinstance(other, SynchronizedCollectionBase):
1112 if len(self._items) != len(other):
1114 for k in self._items:
1117 if self._items[k] != other[k]:
1121 def __ne__(self, other):
1122 return not self.__eq__(other)
1124 class Collection(SynchronizedCollectionBase):
1125 """Represents the root of an Arvados Collection, which may be associated with
1126 an API server Collection record.
1128 Brief summary of useful methods:
1130 :To read an existing file:
1131 `c.open("myfile", "r")`
1133 :To write a new file:
1134 `c.open("myfile", "w")`
1136 :To determine if a file exists:
1137 `c.find("myfile") is not None`
1140 `c.copy("source", "dest")`
1143 `c.remove("myfile")`
1145 :To save to an existing collection record:
1148 :To save a new collection record:
1151 :To merge remote changes into this object:
1154 This class is threadsafe. The root collection object, all subcollections
1155 and files are protected by a single lock (i.e. each access locks the entire
1160 def __init__(self, manifest_locator_or_text=None,
1168 """Collection constructor.
1170 :manifest_locator_or_text:
1171 One of Arvados collection UUID, block locator of
1172 a manifest, raw manifest text, or None (to create an empty collection).
1174 the parent Collection, may be None.
1176 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1177 Prefer this over supplying your own api_client and keep_client (except in testing).
1178 Will use default config settings if not specified.
1180 The API client object to use for requests. If not specified, create one using `apiconfig`.
1182 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1184 the number of retries for API and Keep requests.
1186 the block manager to use. If not specified, create one.
1188 Set synchronization policy with API server collection record.
1190 Collection is read only. No synchronization. This mode will
1191 also forego locking, which gives better performance.
1193 Collection is writable. Synchronize on explicit request via `update()` or `save()`
1195 Collection is writable. Synchronize with server in response to
1196 background websocket events, on block write, or on file close.
1199 super(Collection, self).__init__(parent)
1200 self._api_client = api_client
1201 self._keep_client = keep_client
1202 self._block_manager = block_manager
1205 self._config = apiconfig
1207 self._config = config.settings()
1209 self.num_retries = num_retries if num_retries is not None else 2
1210 self._manifest_locator = None
1211 self._manifest_text = None
1212 self._api_response = None
1215 raise errors.ArgumentError("Must specify sync mode")
1218 self.lock = threading.RLock()
1222 if manifest_locator_or_text:
1223 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1224 self._manifest_locator = manifest_locator_or_text
1225 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1226 self._manifest_locator = manifest_locator_or_text
1227 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1228 self._manifest_text = manifest_locator_or_text
1230 raise errors.ArgumentError(
1231 "Argument to CollectionReader must be a manifest or a collection UUID")
1234 self._subscribe_events()
1237 def root_collection(self):
1240 def sync_mode(self):
1243 def _subscribe_events(self):
1244 if self._sync == SYNC_LIVE and self.events is None:
1245 if not self._has_collection_uuid():
1246 raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1247 self.events = events.subscribe(arvados.api(apiconfig=self._config),
1248 [["object_uuid", "=", self._manifest_locator]],
1251 def on_message(self, event):
1252 if event.get("object_uuid") == self._manifest_locator:
1257 def update(self, other=None, num_retries=None):
1258 """Fetch the latest collection record on the API server and merge it with the
1259 current collection contents.
1263 if self._manifest_locator is None:
1264 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1265 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1266 other = import_manifest(response["manifest_text"])
1267 baseline = import_manifest(self._manifest_text)
1268 self.apply(baseline.diff(other))
1272 if self._api_client is None:
1273 self._api_client = ThreadSafeApiCache(self._config)
1274 self._keep_client = self._api_client.keep
1275 return self._api_client
1279 if self._keep_client is None:
1280 if self._api_client is None:
1283 self._keep_client = KeepClient(api=self._api_client)
1284 return self._keep_client
1287 def _my_block_manager(self):
1288 if self._block_manager is None:
1289 self._block_manager = BlockManager(self._my_keep())
1290 return self._block_manager
1292 def _populate_from_api_server(self):
1293 # As in KeepClient itself, we must wait until the last
1294 # possible moment to instantiate an API client, in order to
1295 # avoid tripping up clients that don't have access to an API
1296 # server. If we do build one, make sure our Keep client uses
1297 # it. If instantiation fails, we'll fall back to the except
1298 # clause, just like any other Collection lookup
1299 # failure. Return an exception, or None if successful.
1301 self._api_response = self._my_api().collections().get(
1302 uuid=self._manifest_locator).execute(
1303 num_retries=self.num_retries)
1304 self._manifest_text = self._api_response['manifest_text']
1306 except Exception as e:
1309 def _populate_from_keep(self):
1310 # Retrieve a manifest directly from Keep. This has a chance of
1311 # working if [a] the locator includes a permission signature
1312 # or [b] the Keep services are operating in world-readable
1313 # mode. Return an exception, or None if successful.
1315 self._manifest_text = self._my_keep().get(
1316 self._manifest_locator, num_retries=self.num_retries)
1317 except Exception as e:
1320 def _populate(self):
1321 if self._manifest_locator is None and self._manifest_text is None:
1323 error_via_api = None
1324 error_via_keep = None
1325 should_try_keep = ((self._manifest_text is None) and
1326 util.keep_locator_pattern.match(
1327 self._manifest_locator))
1328 if ((self._manifest_text is None) and
1329 util.signed_locator_pattern.match(self._manifest_locator)):
1330 error_via_keep = self._populate_from_keep()
1331 if self._manifest_text is None:
1332 error_via_api = self._populate_from_api_server()
1333 if error_via_api is not None and not should_try_keep:
1335 if ((self._manifest_text is None) and
1336 not error_via_keep and
1338 # Looks like a keep locator, and we didn't already try keep above
1339 error_via_keep = self._populate_from_keep()
1340 if self._manifest_text is None:
1342 raise arvados.errors.NotFoundError(
1343 ("Failed to retrieve collection '{}' " +
1344 "from either API server ({}) or Keep ({})."
1346 self._manifest_locator,
1350 self._baseline_manifest = self._manifest_text
1351 import_manifest(self._manifest_text, self)
1353 if self._sync == SYNC_READONLY:
1354 # Now that we're populated, knowing that this will be readonly,
1355 # forego any further locking.
1356 self.lock = NoopLock()
1358 def _has_collection_uuid(self):
1359 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1361 def __enter__(self):
1364 def __exit__(self, exc_type, exc_value, traceback):
1365 """Support scoped auto-commit in a with: block."""
1366 if self._sync != SYNC_READONLY and self._has_collection_uuid():
1368 if self._block_manager is not None:
1369 self._block_manager.stop_threads()
1372 def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1373 if new_config is None:
1374 new_config = self._config
1375 newcollection = Collection(parent=new_parent, apiconfig=new_config, sync=SYNC_EXPLICIT)
1376 if new_sync == SYNC_READONLY:
1377 newcollection.lock = NoopLock()
1378 self._cloneinto(newcollection)
1379 newcollection._sync = new_sync
1380 return newcollection
1383 def api_response(self):
1384 """Returns information about this Collection fetched from the API server.
1386 If the Collection exists in Keep but not the API server, currently
1387 returns None. Future versions may provide a synthetic response.
1390 return self._api_response
1395 def save(self, merge=True, num_retries=None):
1396 """Commit pending buffer blocks to Keep, merge with remote record (if
1397 update=True), write the manifest to Keep, and update the collection
1400 Will raise AssertionError if not associated with a collection record on
1401 the API server. If you want to save a manifest to Keep only, see
1405 Update and merge remote changes before saving. Otherwise, any
1406 remote changes will be ignored and overwritten.
1410 if not self._has_collection_uuid():
1411 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
1412 self._my_block_manager().commit_all()
1415 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1417 text = self.manifest_text(strip=False)
1418 self._api_response = self._my_api().collections().update(
1419 uuid=self._manifest_locator,
1420 body={'manifest_text': text}
1422 num_retries=num_retries)
1423 self._manifest_text = self._api_response["manifest_text"]
1424 self.set_unmodified()
1429 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1430 """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1431 a new collection record (if create_collection_record True).
1433 After creating a new collection record, this Collection object will be
1434 associated with the new record for `save()` and SYNC_LIVE updates.
1437 The collection name.
1440 Only save the manifest to keep, do not create a collection record.
1443 the user, or project uuid that will own this collection.
1444 If None, defaults to the current user.
1446 :ensure_unique_name:
1447 If True, ask the API server to rename the collection
1448 if it conflicts with a collection with the same name and owner. If
1449 False, a name conflict will result in an error.
1452 self._my_block_manager().commit_all()
1453 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1454 text = self.manifest_text(strip=False)
1456 if create_collection_record:
1458 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1460 body = {"manifest_text": text,
1463 body["owner_uuid"] = owner_uuid
1465 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1466 text = self._api_response["manifest_text"]
1469 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1471 self._manifest_locator = self._api_response["uuid"]
1474 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1476 self._manifest_text = text
1477 self.set_unmodified()
1480 def subscribe(self, callback):
1481 self.callbacks.append(callback)
1484 def unsubscribe(self, callback):
1485 self.callbacks.remove(callback)
1488 def notify(self, event, collection, name, item):
1489 for c in self.callbacks:
1490 c(event, collection, name, item)
1492 def ReadOnlyCollection(*args, **kwargs):
1493 """Create a read-only collection object from an api collection record locator,
1494 a portable data hash of a manifest, or raw manifest text.
1496 See `Collection` constructor for detailed options.
1499 kwargs["sync"] = SYNC_READONLY
1500 return Collection(*args, **kwargs)
1502 def WritableCollection(*args, **kwargs):
1503 """Create a writable collection object from an api collection record locator,
1504 a portable data hash of a manifest, or raw manifest text.
1506 See `Collection` constructor for detailed options.
1510 kwargs["sync"] = SYNC_EXPLICIT
1511 return Collection(*args, **kwargs)
1513 def LiveCollection(*args, **kwargs):
1514 """Create a writable, live updating collection object representing an existing
1515 collection record on the API server.
1517 See `Collection` constructor for detailed options.
1520 kwargs["sync"] = SYNC_LIVE
1521 return Collection(*args, **kwargs)
1523 def createWritableCollection(name, owner_uuid=None, apiconfig=None):
1524 """Create an empty, writable collection object and create an associated api
1534 Optional alternate api configuration to use (to specify alternate API
1535 host or token than the default.)
1538 newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1539 newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1540 return newcollection
1542 def createLiveCollection(name, owner_uuid=None, apiconfig=None):
1543 """Create an empty, writable, live updating Collection object and create an
1544 associated collection record on the API server.
1553 Optional alternate api configuration to use (to specify alternate API
1554 host or token than the default.)
1557 newcollection = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1558 newcollection.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1559 newcollection._sync = SYNC_LIVE
1560 newcollection._subscribe_events()
1561 return newcollection
1563 class Subcollection(SynchronizedCollectionBase):
1564 """This is a subdirectory within a collection that doesn't have its own API
1567 It falls under the umbrella of the root collection.
1571 def __init__(self, parent):
1572 super(Subcollection, self).__init__(parent)
1573 self.lock = self.root_collection().lock
1575 def root_collection(self):
1576 return self.parent.root_collection()
1578 def sync_mode(self):
1579 return self.root_collection().sync_mode()
1582 return self.root_collection()._my_api()
1585 return self.root_collection()._my_keep()
1587 def _my_block_manager(self):
1588 return self.root_collection()._my_block_manager()
1590 def _populate(self):
1591 self.root_collection()._populate()
1593 def notify(self, event, collection, name, item):
1594 return self.root_collection().notify(event, collection, name, item)
1597 def clone(self, new_parent):
1598 c = Subcollection(new_parent)
1602 def import_manifest(manifest_text,
1603 into_collection=None,
1607 sync=SYNC_READONLY):
1608 """Import a manifest into a `Collection`.
1611 The manifest text to import from.
1614 The `Collection` that will be initialized (must be empty).
1615 If None, create a new `Collection` object.
1618 The API client object that will be used when creating a new `Collection` object.
1621 The keep client object that will be used when creating a new `Collection` object.
1624 the default number of api client and keep retries on error.
1627 Collection sync mode (only if into_collection is None)
1629 if into_collection is not None:
1630 if len(into_collection) > 0:
1631 raise ArgumentError("Can only import manifest into an empty collection")
1633 into_collection = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1635 save_sync = into_collection.sync_mode()
1636 into_collection._sync = None
1645 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1649 if state == STREAM_NAME:
1650 # starting a new stream
1651 stream_name = tok.replace('\\040', ' ')
1659 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1661 blocksize = long(s.group(1))
1662 blocks.append(Range(tok, streamoffset, blocksize))
1663 streamoffset += blocksize
1667 if state == SEGMENTS:
1668 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1670 pos = long(s.group(1))
1671 size = long(s.group(2))
1672 name = s.group(3).replace('\\040', ' ')
1673 f = into_collection.find_or_create("%s/%s" % (stream_name, name), FILE)
1674 f.add_segment(blocks, pos, size)
1677 raise errors.SyntaxError("Invalid manifest format")
1683 into_collection.set_unmodified()
1684 into_collection._sync = save_sync
1685 return into_collection
1687 def export_manifest(item, stream_name=".", portable_locators=False):
1688 """Export a manifest from the contents of a SynchronizedCollectionBase.
1691 Create a manifest for `item` (must be a `SynchronizedCollectionBase` or `ArvadosFile`). If
1692 `item` is a is a `Collection`, this will also export subcollections.
1695 the name of the stream when exporting `item`.
1698 If True, strip any permission hints on block locators.
1699 If False, use block locators as-is.
1703 if isinstance(item, SynchronizedCollectionBase):
1705 sorted_keys = sorted(item.keys())
1706 for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1707 # Create a stream per file `k`
1708 arvfile = item[filename]
1710 for segment in arvfile.segments():
1711 loc = segment.locator
1712 if loc.startswith("bufferblock"):
1713 loc = arvfile.parent._my_block_manager()._bufferblocks[loc].locator()
1714 if portable_locators:
1715 loc = KeepLocator(loc).stripped()
1716 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1717 segment.segment_offset, segment.range_size))
1718 stream[filename] = filestream
1720 buf += ' '.join(normalize_stream(stream_name, stream))
1722 for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1723 buf += export_manifest(item[dirname], stream_name=os.path.join(stream_name, dirname), portable_locators=portable_locators)
1724 elif isinstance(item, ArvadosFile):
1726 for segment in item.segments:
1727 loc = segment.locator
1728 if loc.startswith("bufferblock"):
1729 loc = item._bufferblocks[loc].calculate_locator()
1730 if portable_locators:
1731 loc = KeepLocator(loc).stripped()
1732 filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1733 segment.segment_offset, segment.range_size))
1734 stream[stream_name] = filestream
1735 buf += ' '.join(normalize_stream(stream_name, stream))