8 from collections import deque
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
20 from arvados.retry import retry_method
22 _logger = logging.getLogger('arvados.collection')
24 class CollectionBase(object):
28 def __exit__(self, exc_type, exc_value, traceback):
32 if self._keep_client is None:
33 self._keep_client = KeepClient(api_client=self._api_client,
34 num_retries=self.num_retries)
35 return self._keep_client
37 def stripped_manifest(self):
39 Return the manifest for the current collection with all
40 non-portable hints (i.e., permission signatures and other
41 hints other than size hints) removed from the locators.
43 raw = self.manifest_text()
45 for line in raw.split("\n"):
48 clean_fields = fields[:1] + [
49 (re.sub(r'\+[^\d][^\+]*', '', x)
50 if re.match(util.keep_locator_pattern, x)
53 clean += [' '.join(clean_fields), "\n"]
57 class CollectionReader(CollectionBase):
58 def __init__(self, manifest_locator_or_text, api_client=None,
59 keep_client=None, num_retries=0):
60 """Instantiate a CollectionReader.
62 This class parses Collection manifests to provide a simple interface
63 to read its underlying files.
66 * manifest_locator_or_text: One of a Collection UUID, portable data
67 hash, or full manifest text.
68 * api_client: The API client to use to look up Collections. If not
69 provided, CollectionReader will build one from available Arvados
71 * keep_client: The KeepClient to use to download Collection data.
72 If not provided, CollectionReader will build one from available
73 Arvados configuration.
74 * num_retries: The default number of times to retry failed
75 service requests. Default 0. You may change this value
76 after instantiation, but note those changes may not
77 propagate to related objects like the Keep client.
79 self._api_client = api_client
80 self._keep_client = keep_client
81 self.num_retries = num_retries
82 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83 self._manifest_locator = manifest_locator_or_text
84 self._manifest_text = None
85 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86 self._manifest_locator = manifest_locator_or_text
87 self._manifest_text = None
88 elif re.match(util.manifest_pattern, manifest_locator_or_text):
89 self._manifest_text = manifest_locator_or_text
90 self._manifest_locator = None
92 raise errors.ArgumentError(
93 "Argument to CollectionReader must be a manifest or a collection UUID")
94 self._api_response = None
97 def _populate_from_api_server(self):
98 # As in KeepClient itself, we must wait until the last
99 # possible moment to instantiate an API client, in order to
100 # avoid tripping up clients that don't have access to an API
101 # server. If we do build one, make sure our Keep client uses
102 # it. If instantiation fails, we'll fall back to the except
103 # clause, just like any other Collection lookup
104 # failure. Return an exception, or None if successful.
106 if self._api_client is None:
107 self._api_client = arvados.api('v1')
108 self._keep_client = None # Make a new one with the new api.
109 self._api_response = self._api_client.collections().get(
110 uuid=self._manifest_locator).execute(
111 num_retries=self.num_retries)
112 self._manifest_text = self._api_response['manifest_text']
114 except Exception as e:
117 def _populate_from_keep(self):
118 # Retrieve a manifest directly from Keep. This has a chance of
119 # working if [a] the locator includes a permission signature
120 # or [b] the Keep services are operating in world-readable
121 # mode. Return an exception, or None if successful.
123 self._manifest_text = self._my_keep().get(
124 self._manifest_locator, num_retries=self.num_retries)
125 except Exception as e:
130 error_via_keep = None
131 should_try_keep = ((self._manifest_text is None) and
132 util.keep_locator_pattern.match(
133 self._manifest_locator))
134 if ((self._manifest_text is None) and
135 util.signed_locator_pattern.match(self._manifest_locator)):
136 error_via_keep = self._populate_from_keep()
137 if self._manifest_text is None:
138 error_via_api = self._populate_from_api_server()
139 if error_via_api is not None and not should_try_keep:
141 if ((self._manifest_text is None) and
142 not error_via_keep and
144 # Looks like a keep locator, and we didn't already try keep above
145 error_via_keep = self._populate_from_keep()
146 if self._manifest_text is None:
148 raise arvados.errors.NotFoundError(
149 ("Failed to retrieve collection '{}' " +
150 "from either API server ({}) or Keep ({})."
152 self._manifest_locator,
155 self._streams = [sline.split()
156 for sline in self._manifest_text.split("\n")
159 def _populate_first(orig_func):
160 # Decorator for methods that read actual Collection data.
161 @functools.wraps(orig_func)
162 def wrapper(self, *args, **kwargs):
163 if self._streams is None:
165 return orig_func(self, *args, **kwargs)
169 def api_response(self):
170 """api_response() -> dict or None
172 Returns information about this Collection fetched from the API server.
173 If the Collection exists in Keep but not the API server, currently
174 returns None. Future versions may provide a synthetic response.
176 return self._api_response
182 for s in self.all_streams():
183 for f in s.all_files():
184 streamname, filename = split(s.name() + "/" + f.name())
185 if streamname not in streams:
186 streams[streamname] = {}
187 if filename not in streams[streamname]:
188 streams[streamname][filename] = []
190 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
192 self._streams = [normalize_stream(s, streams[s])
193 for s in sorted(streams)]
195 # Regenerate the manifest text based on the normalized streams
196 self._manifest_text = ''.join(
197 [StreamReader(stream, keep=self._my_keep()).manifest_text()
198 for stream in self._streams])
201 def open(self, streampath, filename=None):
202 """open(streampath[, filename]) -> file-like object
204 Pass in the path of a file to read from the Collection, either as a
205 single string or as two separate stream name and file name arguments.
206 This method returns a file-like object to read that file.
209 streampath, filename = split(streampath)
210 keep_client = self._my_keep()
211 for stream_s in self._streams:
212 stream = StreamReader(stream_s, keep_client,
213 num_retries=self.num_retries)
214 if stream.name() == streampath:
217 raise ValueError("stream '{}' not found in Collection".
220 return stream.files()[filename]
222 raise ValueError("file '{}' not found in Collection stream '{}'".
223 format(filename, streampath))
226 def all_streams(self):
227 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228 for s in self._streams]
231 for s in self.all_streams():
232 for f in s.all_files():
236 def manifest_text(self, strip=False, normalize=False):
238 cr = CollectionReader(self.manifest_text())
240 return cr.manifest_text(strip=strip, normalize=False)
242 return self.stripped_manifest()
244 return self._manifest_text
247 class _WriterFile(ArvadosFileBase):
248 def __init__(self, coll_writer, name):
249 super(_WriterFile, self).__init__(name, 'wb')
250 self.dest = coll_writer
253 super(_WriterFile, self).close()
254 self.dest.finish_current_file()
256 @ArvadosFileBase._before_close
257 def write(self, data):
258 self.dest.write(data)
260 @ArvadosFileBase._before_close
261 def writelines(self, seq):
265 @ArvadosFileBase._before_close
267 self.dest.flush_data()
270 class CollectionWriter(CollectionBase):
271 def __init__(self, api_client=None, num_retries=0):
272 """Instantiate a CollectionWriter.
274 CollectionWriter lets you build a new Arvados Collection from scratch.
275 Write files to it. The CollectionWriter will upload data to Keep as
276 appropriate, and provide you with the Collection manifest text when
280 * api_client: The API client to use to look up Collections. If not
281 provided, CollectionReader will build one from available Arvados
283 * num_retries: The default number of times to retry failed
284 service requests. Default 0. You may change this value
285 after instantiation, but note those changes may not
286 propagate to related objects like the Keep client.
288 self._api_client = api_client
289 self.num_retries = num_retries
290 self._keep_client = None
291 self._data_buffer = []
292 self._data_buffer_len = 0
293 self._current_stream_files = []
294 self._current_stream_length = 0
295 self._current_stream_locators = []
296 self._current_stream_name = '.'
297 self._current_file_name = None
298 self._current_file_pos = 0
299 self._finished_streams = []
300 self._close_file = None
301 self._queued_file = None
302 self._queued_dirents = deque()
303 self._queued_trees = deque()
304 self._last_open = None
306 def __exit__(self, exc_type, exc_value, traceback):
310 def do_queued_work(self):
311 # The work queue consists of three pieces:
312 # * _queued_file: The file object we're currently writing to the
314 # * _queued_dirents: Entries under the current directory
315 # (_queued_trees[0]) that we want to write or recurse through.
316 # This may contain files from subdirectories if
317 # max_manifest_depth == 0 for this directory.
318 # * _queued_trees: Directories that should be written as separate
319 # streams to the Collection.
320 # This function handles the smallest piece of work currently queued
321 # (current file, then current directory, then next directory) until
322 # no work remains. The _work_THING methods each do a unit of work on
323 # THING. _queue_THING methods add a THING to the work queue.
325 if self._queued_file:
327 elif self._queued_dirents:
329 elif self._queued_trees:
334 def _work_file(self):
336 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
340 self.finish_current_file()
342 self._queued_file.close()
343 self._close_file = None
344 self._queued_file = None
346 def _work_dirents(self):
347 path, stream_name, max_manifest_depth = self._queued_trees[0]
348 if stream_name != self.current_stream_name():
349 self.start_new_stream(stream_name)
350 while self._queued_dirents:
351 dirent = self._queued_dirents.popleft()
352 target = os.path.join(path, dirent)
353 if os.path.isdir(target):
354 self._queue_tree(target,
355 os.path.join(stream_name, dirent),
356 max_manifest_depth - 1)
358 self._queue_file(target, dirent)
360 if not self._queued_dirents:
361 self._queued_trees.popleft()
363 def _work_trees(self):
364 path, stream_name, max_manifest_depth = self._queued_trees[0]
365 d = util.listdir_recursive(
366 path, max_depth = (None if max_manifest_depth == 0 else 0))
368 self._queue_dirents(stream_name, d)
370 self._queued_trees.popleft()
372 def _queue_file(self, source, filename=None):
373 assert (self._queued_file is None), "tried to queue more than one file"
374 if not hasattr(source, 'read'):
375 source = open(source, 'rb')
376 self._close_file = True
378 self._close_file = False
380 filename = os.path.basename(source.name)
381 self.start_new_file(filename)
382 self._queued_file = source
384 def _queue_dirents(self, stream_name, dirents):
385 assert (not self._queued_dirents), "tried to queue more than one tree"
386 self._queued_dirents = deque(sorted(dirents))
388 def _queue_tree(self, path, stream_name, max_manifest_depth):
389 self._queued_trees.append((path, stream_name, max_manifest_depth))
391 def write_file(self, source, filename=None):
392 self._queue_file(source, filename)
393 self.do_queued_work()
395 def write_directory_tree(self,
396 path, stream_name='.', max_manifest_depth=-1):
397 self._queue_tree(path, stream_name, max_manifest_depth)
398 self.do_queued_work()
400 def write(self, newdata):
401 if hasattr(newdata, '__iter__'):
405 self._data_buffer.append(newdata)
406 self._data_buffer_len += len(newdata)
407 self._current_stream_length += len(newdata)
408 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
411 def open(self, streampath, filename=None):
412 """open(streampath[, filename]) -> file-like object
414 Pass in the path of a file to write to the Collection, either as a
415 single string or as two separate stream name and file name arguments.
416 This method returns a file-like object you can write to add it to the
419 You may only have one file object from the Collection open at a time,
420 so be sure to close the object when you're done. Using the object in
421 a with statement makes that easy::
423 with cwriter.open('./doc/page1.txt') as outfile:
424 outfile.write(page1_data)
425 with cwriter.open('./doc/page2.txt') as outfile:
426 outfile.write(page2_data)
429 streampath, filename = split(streampath)
430 if self._last_open and not self._last_open.closed:
431 raise errors.AssertionError(
432 "can't open '{}' when '{}' is still open".format(
433 filename, self._last_open.name))
434 if streampath != self.current_stream_name():
435 self.start_new_stream(streampath)
436 self.set_current_file_name(filename)
437 self._last_open = _WriterFile(self, filename)
438 return self._last_open
440 def flush_data(self):
441 data_buffer = ''.join(self._data_buffer)
443 self._current_stream_locators.append(
444 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
445 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
446 self._data_buffer_len = len(self._data_buffer[0])
448 def start_new_file(self, newfilename=None):
449 self.finish_current_file()
450 self.set_current_file_name(newfilename)
452 def set_current_file_name(self, newfilename):
453 if re.search(r'[\t\n]', newfilename):
454 raise errors.AssertionError(
455 "Manifest filenames cannot contain whitespace: %s" %
457 elif re.search(r'\x00', newfilename):
458 raise errors.AssertionError(
459 "Manifest filenames cannot contain NUL characters: %s" %
461 self._current_file_name = newfilename
463 def current_file_name(self):
464 return self._current_file_name
466 def finish_current_file(self):
467 if self._current_file_name is None:
468 if self._current_file_pos == self._current_stream_length:
470 raise errors.AssertionError(
471 "Cannot finish an unnamed file " +
472 "(%d bytes at offset %d in '%s' stream)" %
473 (self._current_stream_length - self._current_file_pos,
474 self._current_file_pos,
475 self._current_stream_name))
476 self._current_stream_files.append([
477 self._current_file_pos,
478 self._current_stream_length - self._current_file_pos,
479 self._current_file_name])
480 self._current_file_pos = self._current_stream_length
481 self._current_file_name = None
483 def start_new_stream(self, newstreamname='.'):
484 self.finish_current_stream()
485 self.set_current_stream_name(newstreamname)
487 def set_current_stream_name(self, newstreamname):
488 if re.search(r'[\t\n]', newstreamname):
489 raise errors.AssertionError(
490 "Manifest stream names cannot contain whitespace")
491 self._current_stream_name = '.' if newstreamname=='' else newstreamname
493 def current_stream_name(self):
494 return self._current_stream_name
496 def finish_current_stream(self):
497 self.finish_current_file()
499 if not self._current_stream_files:
501 elif self._current_stream_name is None:
502 raise errors.AssertionError(
503 "Cannot finish an unnamed stream (%d bytes in %d files)" %
504 (self._current_stream_length, len(self._current_stream_files)))
506 if not self._current_stream_locators:
507 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
508 self._finished_streams.append([self._current_stream_name,
509 self._current_stream_locators,
510 self._current_stream_files])
511 self._current_stream_files = []
512 self._current_stream_length = 0
513 self._current_stream_locators = []
514 self._current_stream_name = None
515 self._current_file_pos = 0
516 self._current_file_name = None
519 # Store the manifest in Keep and return its locator.
520 return self._my_keep().put(self.manifest_text())
522 def portable_data_hash(self):
523 stripped = self.stripped_manifest()
524 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
526 def manifest_text(self):
527 self.finish_current_stream()
530 for stream in self._finished_streams:
531 if not re.search(r'^\.(/.*)?$', stream[0]):
533 manifest += stream[0].replace(' ', '\\040')
534 manifest += ' ' + ' '.join(stream[1])
535 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
540 def data_locators(self):
542 for name, locators, files in self._finished_streams:
547 class ResumableCollectionWriter(CollectionWriter):
548 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
549 '_current_stream_locators', '_current_stream_name',
550 '_current_file_name', '_current_file_pos', '_close_file',
551 '_data_buffer', '_dependencies', '_finished_streams',
552 '_queued_dirents', '_queued_trees']
554 def __init__(self, api_client=None, num_retries=0):
555 self._dependencies = {}
556 super(ResumableCollectionWriter, self).__init__(
557 api_client, num_retries=num_retries)
560 def from_state(cls, state, *init_args, **init_kwargs):
561 # Try to build a new writer from scratch with the given state.
562 # If the state is not suitable to resume (because files have changed,
563 # been deleted, aren't predictable, etc.), raise a
564 # StaleWriterStateError. Otherwise, return the initialized writer.
565 # The caller is responsible for calling writer.do_queued_work()
566 # appropriately after it's returned.
567 writer = cls(*init_args, **init_kwargs)
568 for attr_name in cls.STATE_PROPS:
569 attr_value = state[attr_name]
570 attr_class = getattr(writer, attr_name).__class__
571 # Coerce the value into the same type as the initial value, if
573 if attr_class not in (type(None), attr_value.__class__):
574 attr_value = attr_class(attr_value)
575 setattr(writer, attr_name, attr_value)
576 # Check dependencies before we try to resume anything.
577 if any(KeepLocator(ls).permission_expired()
578 for ls in writer._current_stream_locators):
579 raise errors.StaleWriterStateError(
580 "locators include expired permission hint")
581 writer.check_dependencies()
582 if state['_current_file'] is not None:
583 path, pos = state['_current_file']
585 writer._queued_file = open(path, 'rb')
586 writer._queued_file.seek(pos)
587 except IOError as error:
588 raise errors.StaleWriterStateError(
589 "failed to reopen active file {}: {}".format(path, error))
592 def check_dependencies(self):
593 for path, orig_stat in self._dependencies.items():
594 if not S_ISREG(orig_stat[ST_MODE]):
595 raise errors.StaleWriterStateError("{} not file".format(path))
597 now_stat = tuple(os.stat(path))
598 except OSError as error:
599 raise errors.StaleWriterStateError(
600 "failed to stat {}: {}".format(path, error))
601 if ((not S_ISREG(now_stat[ST_MODE])) or
602 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
603 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
604 raise errors.StaleWriterStateError("{} changed".format(path))
606 def dump_state(self, copy_func=lambda x: x):
607 state = {attr: copy_func(getattr(self, attr))
608 for attr in self.STATE_PROPS}
609 if self._queued_file is None:
610 state['_current_file'] = None
612 state['_current_file'] = (os.path.realpath(self._queued_file.name),
613 self._queued_file.tell())
616 def _queue_file(self, source, filename=None):
618 src_path = os.path.realpath(source)
620 raise errors.AssertionError("{} not a file path".format(source))
622 path_stat = os.stat(src_path)
623 except OSError as stat_error:
625 super(ResumableCollectionWriter, self)._queue_file(source, filename)
626 fd_stat = os.fstat(self._queued_file.fileno())
627 if not S_ISREG(fd_stat.st_mode):
628 # We won't be able to resume from this cache anyway, so don't
629 # worry about further checks.
630 self._dependencies[source] = tuple(fd_stat)
631 elif path_stat is None:
632 raise errors.AssertionError(
633 "could not stat {}: {}".format(source, stat_error))
634 elif path_stat.st_ino != fd_stat.st_ino:
635 raise errors.AssertionError(
636 "{} changed between open and stat calls".format(source))
638 self._dependencies[src_path] = tuple(fd_stat)
640 def write(self, data):
641 if self._queued_file is None:
642 raise errors.AssertionError(
643 "resumable writer can't accept unsourced data")
644 return super(ResumableCollectionWriter, self).write(data)
650 class SynchronizedCollectionBase(CollectionBase):
651 """Base class for Collections and Subcollections. Implements the majority of
652 functionality relating to accessing items in the Collection."""
654 def __init__(self, parent=None):
656 self._modified = True
660 raise NotImplementedError()
663 raise NotImplementedError()
665 def _my_block_manager(self):
666 raise NotImplementedError()
669 raise NotImplementedError()
672 raise NotImplementedError()
674 def root_collection(self):
675 raise NotImplementedError()
677 def notify(self, event, collection, name, item):
678 raise NotImplementedError()
681 def find(self, path, create=False, create_collection=False):
682 """Recursively search the specified file path. May return either a Collection
686 If true, create path components (i.e. Collections) that are
687 missing. If "create" is False, return None if a path component is
691 If the path is not found, "create" is True, and
692 "create_collection" is False, then create and return a new
693 ArvadosFile for the last path component. If "create_collection" is
694 True, then create and return a new Collection for the last path
698 if create and self.sync_mode() == SYNC_READONLY:
699 raise IOError((errno.EROFS, "Collection is read only"))
706 item = self._items.get(p[0])
708 # item must be a file
709 if item is None and create:
711 if create_collection:
712 item = Subcollection(self)
714 item = ArvadosFile(self)
715 self._items[p[0]] = item
716 self._modified = True
717 self.notify(ADD, self, p[0], item)
720 if item is None and create:
721 # create new collection
722 item = Subcollection(self)
723 self._items[p[0]] = item
724 self._modified = True
725 self.notify(ADD, self, p[0], item)
727 if isinstance(item, SynchronizedCollectionBase):
728 return item.find("/".join(p), create=create)
730 raise errors.ArgumentError("Interior path components must be subcollection")
734 def open(self, path, mode):
735 """Open a file-like object for access.
738 path to a file in the collection
740 one of "r", "r+", "w", "w+", "a", "a+"
744 opens for reading and writing. Reads/writes share a file pointer.
746 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
748 opens for reading and writing. All writes are appended to
749 the end of the file. Writing does not affect the file pointer for
752 mode = mode.replace("b", "")
753 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
754 raise ArgumentError("Bad mode '%s'" % mode)
755 create = (mode != "r")
757 if create and self.sync_mode() == SYNC_READONLY:
758 raise IOError((errno.EROFS, "Collection is read only"))
760 f = self.find(path, create=create)
763 raise IOError((errno.ENOENT, "File not found"))
764 if not isinstance(f, ArvadosFile):
765 raise IOError((errno.EISDIR, "Path must refer to a file."))
771 return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
773 return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
777 """Test if the collection (or any subcollection or file) has been modified
778 since it was created."""
781 for k,v in self._items.items():
787 def set_unmodified(self):
788 """Recursively clear modified flag"""
789 self._modified = False
790 for k,v in self._items.items():
795 """Iterate over names of files and collections contained in this collection."""
796 return self._items.keys().__iter__()
800 """Iterate over names of files and collections directly contained in this collection."""
801 return self._items.keys()
804 def __getitem__(self, k):
805 """Get a file or collection that is directly contained by this collection. If
806 you want to search a path, use `find()` instead.
808 return self._items[k]
811 def __contains__(self, k):
812 """If there is a file or collection a directly contained by this collection
814 return k in self._items
818 """Get the number of items directly contained in this collection"""
819 return len(self._items)
823 def __delitem__(self, p):
824 """Delete an item by name which is directly contained by this collection."""
826 self._modified = True
827 self.notify(DEL, self, p, None)
831 """Get a list of names of files and collections directly contained in this collection."""
832 return self._items.keys()
836 """Get a list of files and collection objects directly contained in this collection."""
837 return self._items.values()
841 """Get a list of (name, object) tuples directly contained in this collection."""
842 return self._items.items()
844 def exists(self, path):
845 """Test if there is a file or collection at "path" """
846 return self.find(path) != None
850 def remove(self, path, rm_r=False):
851 """Remove the file or subcollection (directory) at `path`.
853 Specify whether to remove non-empty subcollections (True), or raise an error (False).
857 # Remove '.' from the front of the path
861 item = self._items.get(p[0])
863 raise IOError((errno.ENOENT, "File not found"))
865 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
866 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
867 d = self._items[p[0]]
868 del self._items[p[0]]
869 self._modified = True
870 self.notify(DEL, self, p[0], d)
873 item.remove("/".join(p))
875 raise IOError((errno.ENOENT, "File not found"))
877 def _cloneinto(self, target):
878 for k,v in self._items.items():
879 target._items[k] = v.clone(target)
882 raise NotImplementedError()
886 def copy(self, source, target_path, source_collection=None, overwrite=False):
887 """Copy a file or subcollection to a new path in this collection.
890 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
893 Destination file or path. If the target path already exists and is a
894 subcollection, the item will be placed inside the subcollection. If
895 the target path already exists and is a file, this will raise an error
896 unless you specify `overwrite=True`.
899 Collection to copy `source_path` from (default `self`)
902 Whether to overwrite target file if it already exists.
904 if source_collection is None:
905 source_collection = self
907 # Find the object to copy
908 if isinstance(source, basestring):
909 source_obj = source_collection.find(source)
910 if source_obj is None:
911 raise IOError((errno.ENOENT, "File not found"))
912 sp = source.split("/")
917 # Find parent collection the target path
918 tp = target_path.split("/")
920 # Determine the name to use.
921 target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
924 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
926 target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
928 with target_dir.lock:
929 if target_name in target_dir:
930 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
931 target_dir = target_dir[target_name]
934 raise IOError((errno.EEXIST, "File already exists"))
937 if target_name in target_dir:
938 mod = target_dir[target_name]
940 # Actually make the copy.
941 dup = source_obj.clone(target_dir)
942 target_dir._items[target_name] = dup
943 target_dir._modified = True
946 self.notify(MOD, target_dir, target_name, (mod, dup))
948 self.notify(ADD, target_dir, target_name, dup)
951 def manifest_text(self, strip=False, normalize=False):
952 """Get the manifest text for this collection, sub collections and files.
955 If True, remove signing tokens from block locators if present.
956 If False, block locators are left unchanged.
959 If True, always export the manifest text in normalized form
960 even if the Collection is not modified. If False and the collection
961 is not modified, return the original manifest text even if it is not
965 if self.modified() or self._manifest_text is None or normalize:
966 return export_manifest(self, stream_name=".", portable_locators=strip)
969 return self.stripped_manifest()
971 return self._manifest_text
974 def diff(self, end_collection, prefix=".", holding_collection=None):
976 Generate list of add/modify/delete actions which, when given to `apply`, will
977 change `self` to match `end_collection`
980 if holding_collection is None:
981 holding_collection = CollectionRoot(api_client=self._my_api(), keep_client=self._my_keep(), sync=SYNC_READONLY)
983 if k not in end_collection:
984 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
985 for k in end_collection:
987 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
988 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
989 elif end_collection[k] != self[k]:
990 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
992 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
997 def apply(self, changes):
999 Apply changes from `diff`. If a change conflicts with a local change, it
1000 will be saved to an alternate path indicating the conflict.
1005 local = self.find(path)
1006 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1010 # No local file at path, safe to copy over new file
1011 self.copy(initial, path)
1012 elif local is not None and local != initial:
1013 # There is already local file and it is different:
1014 # save change to conflict file.
1015 self.copy(initial, conflictpath)
1017 if local == initial:
1018 # Local matches the "initial" item so it has not
1019 # changed locally and is safe to update.
1020 if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
1021 # Replace contents of local file with new contents
1022 local.replace_contents(c[3])
1024 # Overwrite path with new item; this can happen if
1025 # path was a file and is now a collection or vice versa
1026 self.copy(c[3], path, overwrite=True)
1028 # Local is missing (presumably deleted) or local doesn't
1029 # match the "start" value, so save change to conflict file
1030 self.copy(c[3], conflictpath)
1032 if local == initial:
1033 # Local item matches "initial" value, so it is safe to remove.
1034 self.remove(path, rm_r=True)
1035 # else, the file is modified or already removed, in either
1036 # case we don't want to try to remove it.
1038 def portable_data_hash(self):
1039 """Get the portable data hash for this collection's manifest."""
1040 stripped = self.manifest_text(strip=True)
1041 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1044 def __eq__(self, other):
1047 if not isinstance(other, SynchronizedCollectionBase):
1049 if len(self._items) != len(other):
1051 for k in self._items:
1054 if self._items[k] != other[k]:
1058 def __ne__(self, other):
1059 return not self.__eq__(other)
1061 class CollectionRoot(SynchronizedCollectionBase):
1062 """Represents the root of an Arvados Collection, which may be associated with
1063 an API server Collection record.
1065 Brief summary of useful methods:
1067 :To read an existing file:
1068 `c.open("myfile", "r")`
1070 :To write a new file:
1071 `c.open("myfile", "w")`
1073 :To determine if a file exists:
1074 `c.find("myfile") is not None`
1077 `c.copy("source", "dest")`
1080 `c.remove("myfile")`
1082 :To save to an existing collection record:
1085 :To save a new collection record:
1088 :To merge remote changes into this object:
1091 This class is threadsafe. The root collection object, all subcollections
1092 and files are protected by a single lock (i.e. each access locks the entire
1097 def __init__(self, manifest_locator_or_text=None,
1105 """:manifest_locator_or_text:
1106 One of Arvados collection UUID, block locator of
1107 a manifest, raw manifest text, or None (to create an empty collection).
1109 the parent Collection, may be None.
1111 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1112 Prefer this over supplying your own api_client and keep_client (except in testing).
1113 Will use default config settings if not specified.
1115 The API client object to use for requests. If not specified, create one using `apiconfig`.
1117 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1119 the number of retries for API and Keep requests.
1121 the block manager to use. If not specified, create one.
1123 Set synchronization policy with API server collection record.
1125 Collection is read only. No synchronization. This mode will
1126 also forego locking, which gives better performance.
1128 Collection is writable. Synchronize on explicit request via `update()` or `save()`
1130 Collection is writable. Synchronize with server in response to
1131 background websocket events, on block write, or on file close.
1134 super(CollectionRoot, self).__init__(parent)
1135 self._api_client = api_client
1136 self._keep_client = keep_client
1137 self._block_manager = block_manager
1140 self._config = apiconfig
1142 self._config = config.settings()
1144 self.num_retries = num_retries
1145 self._manifest_locator = None
1146 self._manifest_text = None
1147 self._api_response = None
1150 raise errors.ArgumentError("Must specify sync mode")
1153 self.lock = threading.RLock()
1157 if manifest_locator_or_text:
1158 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1159 self._manifest_locator = manifest_locator_or_text
1160 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1161 self._manifest_locator = manifest_locator_or_text
1162 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1163 self._manifest_text = manifest_locator_or_text
1165 raise errors.ArgumentError(
1166 "Argument to CollectionReader must be a manifest or a collection UUID")
1170 if self._sync == SYNC_LIVE:
1171 if not self._has_collection_uuid():
1172 raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1173 self.events = events.subscribe(arvados.api(apiconfig=self._config),
1174 [["object_uuid", "=", self._manifest_locator]],
1178 def root_collection(self):
1181 def sync_mode(self):
1184 def on_message(self, event):
1185 if event.get("object_uuid") == self._manifest_locator:
1189 def create(name, owner_uuid=None, sync=SYNC_EXPLICIT, apiconfig=None):
1190 """Create a new empty Collection with associated collection record."""
1191 c = Collection(sync=SYNC_EXPLICIT, apiconfig=apiconfig)
1192 c.save_new(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1193 if sync == SYNC_LIVE:
1194 c.events = events.subscribe(arvados.api(apiconfig=self._config), [["object_uuid", "=", c._manifest_locator]], c.on_message)
1199 def update(self, other=None, num_retries=None):
1201 if self._manifest_locator is None:
1202 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1203 n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1204 other = import_collection(n["manifest_text"])
1205 baseline = import_collection(self._manifest_text)
1206 self.apply(other.diff(baseline))
1210 if self._api_client is None:
1211 self._api_client = arvados.SafeApi(self._config)
1212 self._keep_client = self._api_client.keep
1213 return self._api_client
1217 if self._keep_client is None:
1218 if self._api_client is None:
1221 self._keep_client = KeepClient(api=self._api_client)
1222 return self._keep_client
1225 def _my_block_manager(self):
1226 if self._block_manager is None:
1227 self._block_manager = BlockManager(self._my_keep())
1228 return self._block_manager
1230 def _populate_from_api_server(self):
1231 # As in KeepClient itself, we must wait until the last
1232 # possible moment to instantiate an API client, in order to
1233 # avoid tripping up clients that don't have access to an API
1234 # server. If we do build one, make sure our Keep client uses
1235 # it. If instantiation fails, we'll fall back to the except
1236 # clause, just like any other Collection lookup
1237 # failure. Return an exception, or None if successful.
1239 self._api_response = self._my_api().collections().get(
1240 uuid=self._manifest_locator).execute(
1241 num_retries=self.num_retries)
1242 self._manifest_text = self._api_response['manifest_text']
1244 except Exception as e:
1247 def _populate_from_keep(self):
1248 # Retrieve a manifest directly from Keep. This has a chance of
1249 # working if [a] the locator includes a permission signature
1250 # or [b] the Keep services are operating in world-readable
1251 # mode. Return an exception, or None if successful.
1253 self._manifest_text = self._my_keep().get(
1254 self._manifest_locator, num_retries=self.num_retries)
1255 except Exception as e:
1258 def _populate(self):
1259 if self._manifest_locator is None and self._manifest_text is None:
1261 error_via_api = None
1262 error_via_keep = None
1263 should_try_keep = ((self._manifest_text is None) and
1264 util.keep_locator_pattern.match(
1265 self._manifest_locator))
1266 if ((self._manifest_text is None) and
1267 util.signed_locator_pattern.match(self._manifest_locator)):
1268 error_via_keep = self._populate_from_keep()
1269 if self._manifest_text is None:
1270 error_via_api = self._populate_from_api_server()
1271 if error_via_api is not None and not should_try_keep:
1273 if ((self._manifest_text is None) and
1274 not error_via_keep and
1276 # Looks like a keep locator, and we didn't already try keep above
1277 error_via_keep = self._populate_from_keep()
1278 if self._manifest_text is None:
1280 raise arvados.errors.NotFoundError(
1281 ("Failed to retrieve collection '{}' " +
1282 "from either API server ({}) or Keep ({})."
1284 self._manifest_locator,
1288 self._baseline_manifest = self._manifest_text
1289 import_manifest(self._manifest_text, self)
1291 if self._sync == SYNC_READONLY:
1292 # Now that we're populated, knowing that this will be readonly,
1293 # forego any further locking.
1294 self.lock = NoopLock()
1296 def _has_collection_uuid(self):
1297 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1299 def __enter__(self):
1302 def __exit__(self, exc_type, exc_value, traceback):
1303 """Support scoped auto-commit in a with: block"""
1304 if self._sync != SYNC_READONLY and self._has_collection_uuid():
1306 if self._block_manager is not None:
1307 self._block_manager.stop_threads()
1310 def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1311 if new_config is None:
1312 new_config = self._config
1313 c = CollectionRoot(parent=new_parent, apiconfig=new_config, sync=new_sync)
1314 if new_sync == SYNC_READONLY:
1321 def api_response(self):
1323 api_response() -> dict or None
1325 Returns information about this Collection fetched from the API server.
1326 If the Collection exists in Keep but not the API server, currently
1327 returns None. Future versions may provide a synthetic response.
1329 return self._api_response
1334 def save(self, merge=True, num_retries=None):
1335 """Commit pending buffer blocks to Keep, merge with remote record (if
1336 update=True), write the manifest to Keep, and update the collection
1337 record. Will raise AssertionError if not associated with a collection
1338 record on the API server. If you want to save a manifest to Keep only,
1342 Update and merge remote changes before saving. Otherwise, any
1343 remote changes will be ignored and overwritten.
1347 if not self._has_collection_uuid():
1348 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1349 self._my_block_manager().commit_all()
1352 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1354 mt = self.manifest_text(strip=False)
1355 self._api_response = self._my_api().collections().update(
1356 uuid=self._manifest_locator,
1357 body={'manifest_text': mt}
1359 num_retries=num_retries)
1360 self._manifest_text = mt
1361 self.set_unmodified()
1366 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1367 """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1368 a new collection record (if create_collection_record True). After
1369 creating a new collection record, this Collection object will be
1370 associated with the new record for `save()` and SYNC_LIVE updates.
1373 The collection name.
1376 Only save the manifest to keep, do not create a collection record.
1379 the user, or project uuid that will own this collection.
1380 If None, defaults to the current user.
1382 :ensure_unique_name:
1383 If True, ask the API server to rename the collection
1384 if it conflicts with a collection with the same name and owner. If
1385 False, a name conflict will result in an error.
1388 self._my_block_manager().commit_all()
1389 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1390 mt = self.manifest_text(strip=False)
1392 if create_collection_record:
1394 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1396 body = {"manifest_text": mt,
1399 body["owner_uuid"] = owner_uuid
1401 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1404 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1406 self._manifest_locator = self._api_response["uuid"]
1409 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1411 self._manifest_text = mt
1412 self.set_unmodified()
1415 def subscribe(self, callback):
1416 self.callbacks.append(callback)
1419 def unsubscribe(self, callback):
1420 self.callbacks.remove(callback)
1423 def notify(self, event, collection, name, item):
1424 for c in self.callbacks:
1425 c(event, collection, name, item)
1427 def ReadOnlyCollection(*args, **kwargs):
1428 kwargs["sync"] = SYNC_READONLY
1429 return CollectionRoot(*args, **kwargs)
1431 def WritableCollection(*args, **kwargs):
1432 kwargs["sync"] = SYNC_EXPLICIT
1433 return CollectionRoot(*args, **kwargs)
1435 def LiveCollection(*args, **kwargs):
1436 kwargs["sync"] = SYNC_LIVE
1437 return CollectionRoot(*args, **kwargs)
1440 class Subcollection(SynchronizedCollectionBase):
1441 """This is a subdirectory within a collection that doesn't have its own API
1442 server record. It falls under the umbrella of the root collection."""
1444 def __init__(self, parent):
1445 super(Subcollection, self).__init__(parent)
1446 self.lock = self.root_collection().lock
1448 def root_collection(self):
1449 return self.parent.root_collection()
1451 def sync_mode(self):
1452 return self.root_collection().sync_mode()
1455 return self.root_collection()._my_api()
1458 return self.root_collection()._my_keep()
1460 def _my_block_manager(self):
1461 return self.root_collection()._my_block_manager()
1463 def _populate(self):
1464 self.root_collection()._populate()
1466 def notify(self, event, collection, name, item):
1467 return self.root_collection().notify(event, collection, name, item)
1470 def clone(self, new_parent):
1471 c = Subcollection(new_parent)
1475 def import_manifest(manifest_text,
1476 into_collection=None,
1480 sync=SYNC_READONLY):
1481 """Import a manifest into a `Collection`.
1484 The manifest text to import from.
1487 The `Collection` that will be initialized (must be empty).
1488 If None, create a new `Collection` object.
1491 The API client object that will be used when creating a new `Collection` object.
1494 The keep client object that will be used when creating a new `Collection` object.
1497 the default number of api client and keep retries on error.
1500 Collection sync mode (only if into_collection is None)
1502 if into_collection is not None:
1503 if len(into_collection) > 0:
1504 raise ArgumentError("Can only import manifest into an empty collection")
1507 c = CollectionRoot(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1509 save_sync = c.sync_mode()
1519 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1523 if state == STREAM_NAME:
1524 # starting a new stream
1525 stream_name = tok.replace('\\040', ' ')
1533 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1535 blocksize = long(s.group(1))
1536 blocks.append(Range(tok, streamoffset, blocksize))
1537 streamoffset += blocksize
1541 if state == SEGMENTS:
1542 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1544 pos = long(s.group(1))
1545 size = long(s.group(2))
1546 name = s.group(3).replace('\\040', ' ')
1547 f = c.find("%s/%s" % (stream_name, name), create=True)
1548 f.add_segment(blocks, pos, size)
1551 raise errors.SyntaxError("Invalid manifest format")
1561 def export_manifest(item, stream_name=".", portable_locators=False):
1564 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1565 `item` is a is a `Collection`, this will also export subcollections.
1568 the name of the stream when exporting `item`.
1571 If True, strip any permission hints on block locators.
1572 If False, use block locators as-is.
1575 if isinstance(item, SynchronizedCollectionBase):
1577 sorted_keys = sorted(item.keys())
1578 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1581 for s in v.segments():
1583 if loc.startswith("bufferblock"):
1584 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1585 if portable_locators:
1586 loc = KeepLocator(loc).stripped()
1587 st.append(LocatorAndRange(loc, locator_block_size(loc),
1588 s.segment_offset, s.range_size))
1591 buf += ' '.join(normalize_stream(stream_name, stream))
1593 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1594 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1595 elif isinstance(item, ArvadosFile):
1597 for s in item.segments:
1599 if loc.startswith("bufferblock"):
1600 loc = item._bufferblocks[loc].calculate_locator()
1601 if portable_locators:
1602 loc = KeepLocator(loc).stripped()
1603 st.append(LocatorAndRange(loc, locator_block_size(loc),
1604 s.segment_offset, s.range_size))
1605 stream[stream_name] = st
1606 buf += ' '.join(normalize_stream(stream_name, stream))