8 from collections import deque
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
21 _logger = logging.getLogger('arvados.collection')
23 class CollectionBase(object):
27 def __exit__(self, exc_type, exc_value, traceback):
31 if self._keep_client is None:
32 self._keep_client = KeepClient(api_client=self._api_client,
33 num_retries=self.num_retries)
34 return self._keep_client
36 def stripped_manifest(self):
38 Return the manifest for the current collection with all
39 non-portable hints (i.e., permission signatures and other
40 hints other than size hints) removed from the locators.
42 raw = self.manifest_text()
44 for line in raw.split("\n"):
47 clean_fields = fields[:1] + [
48 (re.sub(r'\+[^\d][^\+]*', '', x)
49 if re.match(util.keep_locator_pattern, x)
52 clean += [' '.join(clean_fields), "\n"]
56 class CollectionReader(CollectionBase):
57 def __init__(self, manifest_locator_or_text, api_client=None,
58 keep_client=None, num_retries=0):
59 """Instantiate a CollectionReader.
61 This class parses Collection manifests to provide a simple interface
62 to read its underlying files.
65 * manifest_locator_or_text: One of a Collection UUID, portable data
66 hash, or full manifest text.
67 * api_client: The API client to use to look up Collections. If not
68 provided, CollectionReader will build one from available Arvados
70 * keep_client: The KeepClient to use to download Collection data.
71 If not provided, CollectionReader will build one from available
72 Arvados configuration.
73 * num_retries: The default number of times to retry failed
74 service requests. Default 0. You may change this value
75 after instantiation, but note those changes may not
76 propagate to related objects like the Keep client.
78 self._api_client = api_client
79 self._keep_client = keep_client
80 self.num_retries = num_retries
81 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
82 self._manifest_locator = manifest_locator_or_text
83 self._manifest_text = None
84 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
85 self._manifest_locator = manifest_locator_or_text
86 self._manifest_text = None
87 elif re.match(util.manifest_pattern, manifest_locator_or_text):
88 self._manifest_text = manifest_locator_or_text
89 self._manifest_locator = None
91 raise errors.ArgumentError(
92 "Argument to CollectionReader must be a manifest or a collection UUID")
93 self._api_response = None
96 def _populate_from_api_server(self):
97 # As in KeepClient itself, we must wait until the last
98 # possible moment to instantiate an API client, in order to
99 # avoid tripping up clients that don't have access to an API
100 # server. If we do build one, make sure our Keep client uses
101 # it. If instantiation fails, we'll fall back to the except
102 # clause, just like any other Collection lookup
103 # failure. Return an exception, or None if successful.
105 if self._api_client is None:
106 self._api_client = arvados.api('v1')
107 self._keep_client = None # Make a new one with the new api.
108 self._api_response = self._api_client.collections().get(
109 uuid=self._manifest_locator).execute(
110 num_retries=self.num_retries)
111 self._manifest_text = self._api_response['manifest_text']
113 except Exception as e:
116 def _populate_from_keep(self):
117 # Retrieve a manifest directly from Keep. This has a chance of
118 # working if [a] the locator includes a permission signature
119 # or [b] the Keep services are operating in world-readable
120 # mode. Return an exception, or None if successful.
122 self._manifest_text = self._my_keep().get(
123 self._manifest_locator, num_retries=self.num_retries)
124 except Exception as e:
129 error_via_keep = None
130 should_try_keep = ((self._manifest_text is None) and
131 util.keep_locator_pattern.match(
132 self._manifest_locator))
133 if ((self._manifest_text is None) and
134 util.signed_locator_pattern.match(self._manifest_locator)):
135 error_via_keep = self._populate_from_keep()
136 if self._manifest_text is None:
137 error_via_api = self._populate_from_api_server()
138 if error_via_api is not None and not should_try_keep:
140 if ((self._manifest_text is None) and
141 not error_via_keep and
143 # Looks like a keep locator, and we didn't already try keep above
144 error_via_keep = self._populate_from_keep()
145 if self._manifest_text is None:
147 raise arvados.errors.NotFoundError(
148 ("Failed to retrieve collection '{}' " +
149 "from either API server ({}) or Keep ({})."
151 self._manifest_locator,
154 self._streams = [sline.split()
155 for sline in self._manifest_text.split("\n")
158 def _populate_first(orig_func):
159 # Decorator for methods that read actual Collection data.
160 @functools.wraps(orig_func)
161 def wrapper(self, *args, **kwargs):
162 if self._streams is None:
164 return orig_func(self, *args, **kwargs)
168 def api_response(self):
169 """api_response() -> dict or None
171 Returns information about this Collection fetched from the API server.
172 If the Collection exists in Keep but not the API server, currently
173 returns None. Future versions may provide a synthetic response.
175 return self._api_response
181 for s in self.all_streams():
182 for f in s.all_files():
183 streamname, filename = split(s.name() + "/" + f.name())
184 if streamname not in streams:
185 streams[streamname] = {}
186 if filename not in streams[streamname]:
187 streams[streamname][filename] = []
189 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
191 self._streams = [normalize_stream(s, streams[s])
192 for s in sorted(streams)]
194 # Regenerate the manifest text based on the normalized streams
195 self._manifest_text = ''.join(
196 [StreamReader(stream, keep=self._my_keep()).manifest_text()
197 for stream in self._streams])
200 def open(self, streampath, filename=None):
201 """open(streampath[, filename]) -> file-like object
203 Pass in the path of a file to read from the Collection, either as a
204 single string or as two separate stream name and file name arguments.
205 This method returns a file-like object to read that file.
208 streampath, filename = split(streampath)
209 keep_client = self._my_keep()
210 for stream_s in self._streams:
211 stream = StreamReader(stream_s, keep_client,
212 num_retries=self.num_retries)
213 if stream.name() == streampath:
216 raise ValueError("stream '{}' not found in Collection".
219 return stream.files()[filename]
221 raise ValueError("file '{}' not found in Collection stream '{}'".
222 format(filename, streampath))
225 def all_streams(self):
226 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
227 for s in self._streams]
230 for s in self.all_streams():
231 for f in s.all_files():
235 def manifest_text(self, strip=False, normalize=False):
237 cr = CollectionReader(self.manifest_text())
239 return cr.manifest_text(strip=strip, normalize=False)
241 return self.stripped_manifest()
243 return self._manifest_text
246 class _WriterFile(ArvadosFileBase):
247 def __init__(self, coll_writer, name):
248 super(_WriterFile, self).__init__(name, 'wb')
249 self.dest = coll_writer
252 super(_WriterFile, self).close()
253 self.dest.finish_current_file()
255 @ArvadosFileBase._before_close
256 def write(self, data):
257 self.dest.write(data)
259 @ArvadosFileBase._before_close
260 def writelines(self, seq):
264 @ArvadosFileBase._before_close
266 self.dest.flush_data()
269 class CollectionWriter(CollectionBase):
270 def __init__(self, api_client=None, num_retries=0):
271 """Instantiate a CollectionWriter.
273 CollectionWriter lets you build a new Arvados Collection from scratch.
274 Write files to it. The CollectionWriter will upload data to Keep as
275 appropriate, and provide you with the Collection manifest text when
279 * api_client: The API client to use to look up Collections. If not
280 provided, CollectionReader will build one from available Arvados
282 * num_retries: The default number of times to retry failed
283 service requests. Default 0. You may change this value
284 after instantiation, but note those changes may not
285 propagate to related objects like the Keep client.
287 self._api_client = api_client
288 self.num_retries = num_retries
289 self._keep_client = None
290 self._data_buffer = []
291 self._data_buffer_len = 0
292 self._current_stream_files = []
293 self._current_stream_length = 0
294 self._current_stream_locators = []
295 self._current_stream_name = '.'
296 self._current_file_name = None
297 self._current_file_pos = 0
298 self._finished_streams = []
299 self._close_file = None
300 self._queued_file = None
301 self._queued_dirents = deque()
302 self._queued_trees = deque()
303 self._last_open = None
305 def __exit__(self, exc_type, exc_value, traceback):
309 def do_queued_work(self):
310 # The work queue consists of three pieces:
311 # * _queued_file: The file object we're currently writing to the
313 # * _queued_dirents: Entries under the current directory
314 # (_queued_trees[0]) that we want to write or recurse through.
315 # This may contain files from subdirectories if
316 # max_manifest_depth == 0 for this directory.
317 # * _queued_trees: Directories that should be written as separate
318 # streams to the Collection.
319 # This function handles the smallest piece of work currently queued
320 # (current file, then current directory, then next directory) until
321 # no work remains. The _work_THING methods each do a unit of work on
322 # THING. _queue_THING methods add a THING to the work queue.
324 if self._queued_file:
326 elif self._queued_dirents:
328 elif self._queued_trees:
333 def _work_file(self):
335 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
339 self.finish_current_file()
341 self._queued_file.close()
342 self._close_file = None
343 self._queued_file = None
345 def _work_dirents(self):
346 path, stream_name, max_manifest_depth = self._queued_trees[0]
347 if stream_name != self.current_stream_name():
348 self.start_new_stream(stream_name)
349 while self._queued_dirents:
350 dirent = self._queued_dirents.popleft()
351 target = os.path.join(path, dirent)
352 if os.path.isdir(target):
353 self._queue_tree(target,
354 os.path.join(stream_name, dirent),
355 max_manifest_depth - 1)
357 self._queue_file(target, dirent)
359 if not self._queued_dirents:
360 self._queued_trees.popleft()
362 def _work_trees(self):
363 path, stream_name, max_manifest_depth = self._queued_trees[0]
364 d = util.listdir_recursive(
365 path, max_depth = (None if max_manifest_depth == 0 else 0))
367 self._queue_dirents(stream_name, d)
369 self._queued_trees.popleft()
371 def _queue_file(self, source, filename=None):
372 assert (self._queued_file is None), "tried to queue more than one file"
373 if not hasattr(source, 'read'):
374 source = open(source, 'rb')
375 self._close_file = True
377 self._close_file = False
379 filename = os.path.basename(source.name)
380 self.start_new_file(filename)
381 self._queued_file = source
383 def _queue_dirents(self, stream_name, dirents):
384 assert (not self._queued_dirents), "tried to queue more than one tree"
385 self._queued_dirents = deque(sorted(dirents))
387 def _queue_tree(self, path, stream_name, max_manifest_depth):
388 self._queued_trees.append((path, stream_name, max_manifest_depth))
390 def write_file(self, source, filename=None):
391 self._queue_file(source, filename)
392 self.do_queued_work()
394 def write_directory_tree(self,
395 path, stream_name='.', max_manifest_depth=-1):
396 self._queue_tree(path, stream_name, max_manifest_depth)
397 self.do_queued_work()
399 def write(self, newdata):
400 if hasattr(newdata, '__iter__'):
404 self._data_buffer.append(newdata)
405 self._data_buffer_len += len(newdata)
406 self._current_stream_length += len(newdata)
407 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
410 def open(self, streampath, filename=None):
411 """open(streampath[, filename]) -> file-like object
413 Pass in the path of a file to write to the Collection, either as a
414 single string or as two separate stream name and file name arguments.
415 This method returns a file-like object you can write to add it to the
418 You may only have one file object from the Collection open at a time,
419 so be sure to close the object when you're done. Using the object in
420 a with statement makes that easy::
422 with cwriter.open('./doc/page1.txt') as outfile:
423 outfile.write(page1_data)
424 with cwriter.open('./doc/page2.txt') as outfile:
425 outfile.write(page2_data)
428 streampath, filename = split(streampath)
429 if self._last_open and not self._last_open.closed:
430 raise errors.AssertionError(
431 "can't open '{}' when '{}' is still open".format(
432 filename, self._last_open.name))
433 if streampath != self.current_stream_name():
434 self.start_new_stream(streampath)
435 self.set_current_file_name(filename)
436 self._last_open = _WriterFile(self, filename)
437 return self._last_open
439 def flush_data(self):
440 data_buffer = ''.join(self._data_buffer)
442 self._current_stream_locators.append(
443 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
444 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
445 self._data_buffer_len = len(self._data_buffer[0])
447 def start_new_file(self, newfilename=None):
448 self.finish_current_file()
449 self.set_current_file_name(newfilename)
451 def set_current_file_name(self, newfilename):
452 if re.search(r'[\t\n]', newfilename):
453 raise errors.AssertionError(
454 "Manifest filenames cannot contain whitespace: %s" %
456 elif re.search(r'\x00', newfilename):
457 raise errors.AssertionError(
458 "Manifest filenames cannot contain NUL characters: %s" %
460 self._current_file_name = newfilename
462 def current_file_name(self):
463 return self._current_file_name
465 def finish_current_file(self):
466 if self._current_file_name is None:
467 if self._current_file_pos == self._current_stream_length:
469 raise errors.AssertionError(
470 "Cannot finish an unnamed file " +
471 "(%d bytes at offset %d in '%s' stream)" %
472 (self._current_stream_length - self._current_file_pos,
473 self._current_file_pos,
474 self._current_stream_name))
475 self._current_stream_files.append([
476 self._current_file_pos,
477 self._current_stream_length - self._current_file_pos,
478 self._current_file_name])
479 self._current_file_pos = self._current_stream_length
480 self._current_file_name = None
482 def start_new_stream(self, newstreamname='.'):
483 self.finish_current_stream()
484 self.set_current_stream_name(newstreamname)
486 def set_current_stream_name(self, newstreamname):
487 if re.search(r'[\t\n]', newstreamname):
488 raise errors.AssertionError(
489 "Manifest stream names cannot contain whitespace")
490 self._current_stream_name = '.' if newstreamname=='' else newstreamname
492 def current_stream_name(self):
493 return self._current_stream_name
495 def finish_current_stream(self):
496 self.finish_current_file()
498 if not self._current_stream_files:
500 elif self._current_stream_name is None:
501 raise errors.AssertionError(
502 "Cannot finish an unnamed stream (%d bytes in %d files)" %
503 (self._current_stream_length, len(self._current_stream_files)))
505 if not self._current_stream_locators:
506 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
507 self._finished_streams.append([self._current_stream_name,
508 self._current_stream_locators,
509 self._current_stream_files])
510 self._current_stream_files = []
511 self._current_stream_length = 0
512 self._current_stream_locators = []
513 self._current_stream_name = None
514 self._current_file_pos = 0
515 self._current_file_name = None
518 # Store the manifest in Keep and return its locator.
519 return self._my_keep().put(self.manifest_text())
521 def portable_data_hash(self):
522 stripped = self.stripped_manifest()
523 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
525 def manifest_text(self):
526 self.finish_current_stream()
529 for stream in self._finished_streams:
530 if not re.search(r'^\.(/.*)?$', stream[0]):
532 manifest += stream[0].replace(' ', '\\040')
533 manifest += ' ' + ' '.join(stream[1])
534 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
539 def data_locators(self):
541 for name, locators, files in self._finished_streams:
546 class ResumableCollectionWriter(CollectionWriter):
547 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
548 '_current_stream_locators', '_current_stream_name',
549 '_current_file_name', '_current_file_pos', '_close_file',
550 '_data_buffer', '_dependencies', '_finished_streams',
551 '_queued_dirents', '_queued_trees']
553 def __init__(self, api_client=None, num_retries=0):
554 self._dependencies = {}
555 super(ResumableCollectionWriter, self).__init__(
556 api_client, num_retries=num_retries)
559 def from_state(cls, state, *init_args, **init_kwargs):
560 # Try to build a new writer from scratch with the given state.
561 # If the state is not suitable to resume (because files have changed,
562 # been deleted, aren't predictable, etc.), raise a
563 # StaleWriterStateError. Otherwise, return the initialized writer.
564 # The caller is responsible for calling writer.do_queued_work()
565 # appropriately after it's returned.
566 writer = cls(*init_args, **init_kwargs)
567 for attr_name in cls.STATE_PROPS:
568 attr_value = state[attr_name]
569 attr_class = getattr(writer, attr_name).__class__
570 # Coerce the value into the same type as the initial value, if
572 if attr_class not in (type(None), attr_value.__class__):
573 attr_value = attr_class(attr_value)
574 setattr(writer, attr_name, attr_value)
575 # Check dependencies before we try to resume anything.
576 if any(KeepLocator(ls).permission_expired()
577 for ls in writer._current_stream_locators):
578 raise errors.StaleWriterStateError(
579 "locators include expired permission hint")
580 writer.check_dependencies()
581 if state['_current_file'] is not None:
582 path, pos = state['_current_file']
584 writer._queued_file = open(path, 'rb')
585 writer._queued_file.seek(pos)
586 except IOError as error:
587 raise errors.StaleWriterStateError(
588 "failed to reopen active file {}: {}".format(path, error))
591 def check_dependencies(self):
592 for path, orig_stat in self._dependencies.items():
593 if not S_ISREG(orig_stat[ST_MODE]):
594 raise errors.StaleWriterStateError("{} not file".format(path))
596 now_stat = tuple(os.stat(path))
597 except OSError as error:
598 raise errors.StaleWriterStateError(
599 "failed to stat {}: {}".format(path, error))
600 if ((not S_ISREG(now_stat[ST_MODE])) or
601 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
602 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
603 raise errors.StaleWriterStateError("{} changed".format(path))
605 def dump_state(self, copy_func=lambda x: x):
606 state = {attr: copy_func(getattr(self, attr))
607 for attr in self.STATE_PROPS}
608 if self._queued_file is None:
609 state['_current_file'] = None
611 state['_current_file'] = (os.path.realpath(self._queued_file.name),
612 self._queued_file.tell())
615 def _queue_file(self, source, filename=None):
617 src_path = os.path.realpath(source)
619 raise errors.AssertionError("{} not a file path".format(source))
621 path_stat = os.stat(src_path)
622 except OSError as stat_error:
624 super(ResumableCollectionWriter, self)._queue_file(source, filename)
625 fd_stat = os.fstat(self._queued_file.fileno())
626 if not S_ISREG(fd_stat.st_mode):
627 # We won't be able to resume from this cache anyway, so don't
628 # worry about further checks.
629 self._dependencies[source] = tuple(fd_stat)
630 elif path_stat is None:
631 raise errors.AssertionError(
632 "could not stat {}: {}".format(source, stat_error))
633 elif path_stat.st_ino != fd_stat.st_ino:
634 raise errors.AssertionError(
635 "{} changed between open and stat calls".format(source))
637 self._dependencies[src_path] = tuple(fd_stat)
639 def write(self, data):
640 if self._queued_file is None:
641 raise errors.AssertionError(
642 "resumable writer can't accept unsourced data")
643 return super(ResumableCollectionWriter, self).write(data)
649 class SynchronizedCollectionBase(CollectionBase):
650 def __init__(self, parent=None):
655 raise NotImplementedError()
658 raise NotImplementedError()
660 def _my_block_manager(self):
661 raise NotImplementedError()
663 def _root_lock(self):
664 raise NotImplementedError()
667 raise NotImplementedError()
670 raise NotImplementedError()
672 def notify(self, collection, event, name, item):
673 raise NotImplementedError()
676 def find(self, path, create=False, create_collection=False):
677 """Recursively search the specified file path. May return either a Collection
681 If true, create path components (i.e. Collections) that are
682 missing. If "create" is False, return None if a path component is
686 If the path is not found, "create" is True, and
687 "create_collection" is False, then create and return a new
688 ArvadosFile for the last path component. If "create_collection" is
689 True, then create and return a new Collection for the last path
693 if create and self.sync_mode() == SYNC_READONLY:
694 raise IOError((errno.EROFS, "Collection is read only"))
701 item = self._items.get(p[0])
703 # item must be a file
704 if item is None and create:
706 if create_collection:
707 item = Subcollection(self)
709 item = ArvadosFile(self)
710 self._items[p[0]] = item
711 self.notify(self, ADD, p[0], item)
714 if item is None and create:
715 # create new collection
716 item = Subcollection(self)
717 self._items[p[0]] = item
718 self.notify(self, ADD, p[0], item)
720 if isinstance(item, SynchronizedCollectionBase):
721 return item.find("/".join(p), create=create)
723 raise errors.ArgumentError("Interior path components must be subcollection")
727 def open(self, path, mode):
728 """Open a file-like object for access.
731 path to a file in the collection
733 one of "r", "r+", "w", "w+", "a", "a+"
737 opens for reading and writing. Reads/writes share a file pointer.
739 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
741 opens for reading and writing. All writes are appended to
742 the end of the file. Writing does not affect the file pointer for
745 mode = mode.replace("b", "")
746 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
747 raise ArgumentError("Bad mode '%s'" % mode)
748 create = (mode != "r")
750 if create and self.sync_mode() == SYNC_READONLY:
751 raise IOError((errno.EROFS, "Collection is read only"))
753 f = self.find(path, create=create)
756 raise IOError((errno.ENOENT, "File not found"))
757 if not isinstance(f, ArvadosFile):
758 raise IOError((errno.EISDIR, "Path must refer to a file."))
764 return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
766 return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
770 """Test if the collection (or any subcollection or file) has been modified
771 since it was created."""
772 for k,v in self._items.items():
778 def set_unmodified(self):
779 """Recursively clear modified flag"""
780 for k,v in self._items.items():
785 """Iterate over names of files and collections contained in this collection."""
786 return self._items.keys()
790 """Iterate over names of files and collections directly contained in this collection."""
791 return self._items.keys()
794 def __getitem__(self, k):
795 """Get a file or collection that is directly contained by this collection. If
796 you want to search a path, use `find()` instead.
798 return self._items[k]
801 def __contains__(self, k):
802 """If there is a file or collection a directly contained by this collection
804 return k in self._items
808 """Get the number of items directly contained in this collection"""
809 return len(self._items)
813 def __delitem__(self, p):
814 """Delete an item by name which is directly contained by this collection."""
816 self.notify(self, DEL, p, None)
820 """Get a list of names of files and collections directly contained in this collection."""
821 return self._items.keys()
825 """Get a list of files and collection objects directly contained in this collection."""
826 return self._items.values()
830 """Get a list of (name, object) tuples directly contained in this collection."""
831 return self._items.items()
833 def exists(self, path):
834 """Test if there is a file or collection at "path" """
835 return self.find(path) != None
839 def remove(self, path, rm_r=False):
840 """Remove the file or subcollection (directory) at `path`.
842 Specify whether to remove non-empty subcollections (True), or raise an error (False).
846 # Remove '.' from the front of the path
850 item = self._items.get(p[0])
852 raise IOError((errno.ENOENT, "File not found"))
854 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
855 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
856 del self._items[p[0]]
857 self.notify(self, DEL, p[0], None)
860 item.remove("/".join(p))
862 raise IOError((errno.ENOENT, "File not found"))
864 def _cloneinto(self, target):
865 for k,v in self._items.items():
866 target._items[k] = v.clone(target)
869 raise NotImplementedError()
873 def copy(self, source, target_path, source_collection=None, overwrite=False):
874 """Copy a file or subcollection to a new path in this collection.
877 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
880 Destination file or path. If the target path already exists and is a
881 subcollection, the item will be placed inside the subcollection. If
882 the target path already exists and is a file, this will raise an error
883 unless you specify `overwrite=True`.
886 Collection to copy `source_path` from (default `self`)
889 Whether to overwrite target file if it already exists.
891 if source_collection is None:
892 source_collection = self
894 # Find the object to copy
895 if isinstance(source, basestring):
896 source_obj = source_collection.find(source)
897 if source_obj is None:
898 raise IOError((errno.ENOENT, "File not found"))
899 sp = source.split("/")
904 # Find parent collection the target path
905 tp = target_path.split("/")
907 # Determine the name to use.
908 target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
911 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
913 target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
915 if target_name in target_dir:
916 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
917 target_dir = target_dir[target_name]
920 raise IOError((errno.EEXIST, "File already exists"))
922 # Actually make the copy.
923 dup = source_obj.clone(target_dir)
924 with target_dir.lock:
925 target_dir._items[target_name] = dup
927 self.notify(target_dir, ADD, target_name, dup)
931 def manifest_text(self, strip=False, normalize=False):
932 """Get the manifest text for this collection, sub collections and files.
935 If True, remove signing tokens from block locators if present.
936 If False, block locators are left unchanged.
939 If True, always export the manifest text in normalized form
940 even if the Collection is not modified. If False and the collection
941 is not modified, return the original manifest text even if it is not
945 if self.modified() or self._manifest_text is None or normalize:
946 return export_manifest(self, stream_name=".", portable_locators=strip)
949 return self.stripped_manifest()
951 return self._manifest_text
954 def diff(self, start_collection, prefix="."):
956 Generate list of add/delete actions which change `start_collection` to result in `self`
959 for k in start_collection:
961 changes.append((DEL, os.path.join(prefix, k), start_collection[k]))
963 if k in start_collection:
964 if isinstance(self[k], Subcollection) and isinstance(start_collection[k], Subcollection):
965 changes.extend(self[k].diff(start_collection[k], os.path.join(prefix, k)))
966 elif self[k] != start_collection[k]:
967 changes.append((MOD, os.path.join(prefix, k), start_collection[k], self[k]))
969 changes.append((ADD, os.path.join(prefix, k), self[k]))
974 def apply(self, changes):
976 Apply changes from `diff`. If a change conflicts with a local change, it
977 will be saved to an alternate path indicating the conflict.
982 local = self.find(path)
983 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
987 # No local file at path, safe to copy over new file
988 self.copy(initial, path)
989 elif local is not None and local != initial:
990 # There is already local file and it is different:
991 # save change to conflict file.
992 self.copy(initial, conflictpath)
995 # Local matches the "initial" item so assume it hasn't
996 # changed locally and is safe to update.
997 if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
998 # Replace contents of local file with new contents
999 local.replace_contents(c[3])
1001 # Overwrite path with new item; this can happen if if
1002 # path was a file and is now a collection or vice versa
1003 self.copy(c[3], path, overwrite=True)
1005 # Local is missing (presumably deleted) or local doesn't
1006 # match the "start" value, so save change to conflict file
1007 self.copy(c[3], conflictpath)
1010 # Local item matches "initial" value, so it is safe to remove.
1011 self.remove(path, rm_r=True)
1012 # else, the file is modified or already removed, in either
1013 # case we don't want to try to remove it.
1015 def portable_data_hash(self):
1016 """Get the portable data hash for this collection's manifest."""
1017 stripped = self.manifest_text(strip=True)
1018 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1021 class Collection(SynchronizedCollectionBase):
1022 """Store an Arvados collection, consisting of a set of files and
1026 def __init__(self, manifest_locator_or_text=None,
1033 sync=SYNC_READONLY):
1034 """:manifest_locator_or_text:
1035 One of Arvados collection UUID, block locator of
1036 a manifest, raw manifest text, or None (to create an empty collection).
1038 the parent Collection, may be None.
1040 the arvados configuration to get the hostname and api token.
1041 Prefer this over supplying your own api_client and keep_client (except in testing).
1042 Will use default config settings if not specified.
1044 The API client object to use for requests. If not specified, create one using `config`.
1046 the Keep client to use for requests. If not specified, create one using `config`.
1048 the number of retries for API and Keep requests.
1050 the block manager to use. If not specified, create one.
1052 Set synchronization policy with API server collection record.
1054 Collection is read only. No synchronization. This mode will
1055 also forego locking, which gives better performance.
1057 Synchronize on explicit request via `update()` or `save()`
1059 Synchronize with server in response to background websocket events,
1060 on block write, or on file close.
1063 super(Collection, self).__init__(parent)
1064 self._api_client = api_client
1065 self._keep_client = keep_client
1066 self._block_manager = block_manager
1067 self._config = config
1068 self.num_retries = num_retries
1069 self._manifest_locator = None
1070 self._manifest_text = None
1071 self._api_response = None
1073 self.lock = threading.RLock()
1076 self._baseline_manifest
1078 if manifest_locator_or_text:
1079 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1080 self._manifest_locator = manifest_locator_or_text
1081 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1082 self._manifest_locator = manifest_locator_or_text
1083 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1084 self._manifest_text = manifest_locator_or_text
1086 raise errors.ArgumentError(
1087 "Argument to CollectionReader must be a manifest or a collection UUID")
1091 if self._sync == SYNC_LIVE:
1092 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1093 raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1094 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1097 def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1098 c = Collection(sync=sync)
1099 c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1102 def _root_lock(self):
1105 def sync_mode(self):
1108 def on_message(self):
1113 n = self._my_api().collections().get(uuid=self._manifest_locator).execute()
1114 other = import_collection(n["manifest_text"])
1115 baseline = import_collection(self._baseline_manifest)
1116 self.apply(other.diff(baseline))
1120 if self._api_client is None:
1121 self._api_client = arvados.SafeApi(self._config)
1122 self._keep_client = self._api_client.keep
1123 return self._api_client
1127 if self._keep_client is None:
1128 if self._api_client is None:
1131 self._keep_client = KeepClient(api=self._api_client)
1132 return self._keep_client
1135 def _my_block_manager(self):
1136 if self._block_manager is None:
1137 self._block_manager = BlockManager(self._my_keep())
1138 return self._block_manager
1140 def _populate_from_api_server(self):
1141 # As in KeepClient itself, we must wait until the last
1142 # possible moment to instantiate an API client, in order to
1143 # avoid tripping up clients that don't have access to an API
1144 # server. If we do build one, make sure our Keep client uses
1145 # it. If instantiation fails, we'll fall back to the except
1146 # clause, just like any other Collection lookup
1147 # failure. Return an exception, or None if successful.
1149 self._api_response = self._my_api().collections().get(
1150 uuid=self._manifest_locator).execute(
1151 num_retries=self.num_retries)
1152 self._manifest_text = self._api_response['manifest_text']
1154 except Exception as e:
1157 def _populate_from_keep(self):
1158 # Retrieve a manifest directly from Keep. This has a chance of
1159 # working if [a] the locator includes a permission signature
1160 # or [b] the Keep services are operating in world-readable
1161 # mode. Return an exception, or None if successful.
1163 self._manifest_text = self._my_keep().get(
1164 self._manifest_locator, num_retries=self.num_retries)
1165 except Exception as e:
1168 def _populate(self):
1169 if self._manifest_locator is None and self._manifest_text is None:
1171 error_via_api = None
1172 error_via_keep = None
1173 should_try_keep = ((self._manifest_text is None) and
1174 util.keep_locator_pattern.match(
1175 self._manifest_locator))
1176 if ((self._manifest_text is None) and
1177 util.signed_locator_pattern.match(self._manifest_locator)):
1178 error_via_keep = self._populate_from_keep()
1179 if self._manifest_text is None:
1180 error_via_api = self._populate_from_api_server()
1181 if error_via_api is not None and not should_try_keep:
1183 if ((self._manifest_text is None) and
1184 not error_via_keep and
1186 # Looks like a keep locator, and we didn't already try keep above
1187 error_via_keep = self._populate_from_keep()
1188 if self._manifest_text is None:
1190 raise arvados.errors.NotFoundError(
1191 ("Failed to retrieve collection '{}' " +
1192 "from either API server ({}) or Keep ({})."
1194 self._manifest_locator,
1198 self._baseline_manifest = self._manifest_text
1199 import_manifest(self._manifest_text, self)
1201 if self._sync == SYNC_READONLY:
1202 # Now that we're populated, knowing that this will be readonly,
1203 # forego any further locking.
1204 self.lock = NoopLock()
1206 def __enter__(self):
1209 def __exit__(self, exc_type, exc_value, traceback):
1210 """Support scoped auto-commit in a with: block"""
1211 if self._sync != SYNC_READONLY:
1212 self.save(allow_no_locator=True)
1213 if self._block_manager is not None:
1214 self._block_manager.stop_threads()
1217 def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1218 if new_config is None:
1219 new_config = self._config
1220 c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1221 if new_sync == SYNC_READONLY:
1228 def api_response(self):
1230 api_response() -> dict or None
1232 Returns information about this Collection fetched from the API server.
1233 If the Collection exists in Keep but not the API server, currently
1234 returns None. Future versions may provide a synthetic response.
1236 return self._api_response
1240 def save(self, allow_no_locator=False):
1241 """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1242 update the collection record to Keep.
1245 If there is no collection uuid associated with this
1246 Collection and `allow_no_locator` is False, raise an error. If True,
1247 do not raise an error.
1250 self._my_block_manager().commit_all()
1251 self._my_keep().put(self.manifest_text(strip=True))
1252 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1253 self._api_response = self._my_api().collections().update(
1254 uuid=self._manifest_locator,
1255 body={'manifest_text': self.manifest_text(strip=False)}
1257 num_retries=self.num_retries)
1258 elif not allow_no_locator:
1259 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1260 self.set_unmodified()
1264 def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1265 """Save a new collection record.
1268 The collection name.
1271 the user, or project uuid that will own this collection.
1272 If None, defaults to the current user.
1274 :ensure_unique_name:
1275 If True, ask the API server to rename the collection
1276 if it conflicts with a collection with the same name and owner. If
1277 False, a name conflict will result in an error.
1280 self._my_block_manager().commit_all()
1281 self._my_keep().put(self.manifest_text(strip=True))
1282 body = {"manifest_text": self.manifest_text(strip=False),
1285 body["owner_uuid"] = owner_uuid
1286 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1289 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1291 self._manifest_locator = self._api_response["uuid"]
1294 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1296 self.set_unmodified()
1299 def subscribe(self, callback):
1300 self.callbacks.append(callback)
1303 def unsubscribe(self, callback):
1304 self.callbacks.remove(callback)
1307 def notify(self, collection, event, name, item):
1308 for c in self.callbacks:
1309 c(collection, event, name, item)
1311 class Subcollection(SynchronizedCollectionBase):
1312 """This is a subdirectory within a collection that doesn't have its own API
1313 server record. It falls under the umbrella of the root collection."""
1315 def __init__(self, parent):
1316 super(Subcollection, self).__init__(parent)
1317 self.lock = parent._root_lock()
1319 def _root_lock(self):
1320 return self.parent._root_lock()
1322 def sync_mode(self):
1323 return self.parent.sync_mode()
1326 return self.parent._my_api()
1329 return self.parent._my_keep()
1331 def _my_block_manager(self):
1332 return self.parent._my_block_manager()
1334 def _populate(self):
1335 self.parent._populate()
1337 def notify(self, collection, event, name, item):
1338 self.parent.notify(collection, event, name, item)
1341 def clone(self, new_parent):
1342 c = Subcollection(new_parent)
1347 def import_manifest(manifest_text,
1348 into_collection=None,
1352 sync=SYNC_READONLY):
1353 """Import a manifest into a `Collection`.
1356 The manifest text to import from.
1359 The `Collection` that will be initialized (must be empty).
1360 If None, create a new `Collection` object.
1363 The API client object that will be used when creating a new `Collection` object.
1366 The keep client object that will be used when creating a new `Collection` object.
1369 the default number of api client and keep retries on error.
1372 Collection sync mode (only if into_collection is None)
1374 if into_collection is not None:
1375 if len(into_collection) > 0:
1376 raise ArgumentError("Can only import manifest into an empty collection")
1379 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1381 save_sync = c.sync_mode()
1391 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1395 if state == STREAM_NAME:
1396 # starting a new stream
1397 stream_name = tok.replace('\\040', ' ')
1405 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1407 blocksize = long(s.group(1))
1408 blocks.append(Range(tok, streamoffset, blocksize))
1409 streamoffset += blocksize
1413 if state == SEGMENTS:
1414 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1416 pos = long(s.group(1))
1417 size = long(s.group(2))
1418 name = s.group(3).replace('\\040', ' ')
1419 f = c.find("%s/%s" % (stream_name, name), create=True)
1420 f.add_segment(blocks, pos, size)
1423 raise errors.SyntaxError("Invalid manifest format")
1433 def export_manifest(item, stream_name=".", portable_locators=False):
1436 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1437 `item` is a is a `Collection`, this will also export subcollections.
1440 the name of the stream when exporting `item`.
1443 If True, strip any permission hints on block locators.
1444 If False, use block locators as-is.
1447 if isinstance(item, SynchronizedCollectionBase):
1449 sorted_keys = sorted(item.keys())
1450 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1453 for s in v.segments():
1455 if loc.startswith("bufferblock"):
1456 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1457 if portable_locators:
1458 loc = KeepLocator(loc).stripped()
1459 st.append(LocatorAndRange(loc, locator_block_size(loc),
1460 s.segment_offset, s.range_size))
1463 buf += ' '.join(normalize_stream(stream_name, stream))
1465 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1466 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1467 elif isinstance(item, ArvadosFile):
1469 for s in item.segments:
1471 if loc.startswith("bufferblock"):
1472 loc = item._bufferblocks[loc].calculate_locator()
1473 if portable_locators:
1474 loc = KeepLocator(loc).stripped()
1475 st.append(LocatorAndRange(loc, locator_block_size(loc),
1476 s.segment_offset, s.range_size))
1477 stream[stream_name] = st
1478 buf += ' '.join(normalize_stream(stream_name, stream))