8 from collections import deque
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
15 from .safeapi import SafeApi
20 from arvados.retry import retry_method
22 _logger = logging.getLogger('arvados.collection')
24 class CollectionBase(object):
28 def __exit__(self, exc_type, exc_value, traceback):
32 if self._keep_client is None:
33 self._keep_client = KeepClient(api_client=self._api_client,
34 num_retries=self.num_retries)
35 return self._keep_client
37 def stripped_manifest(self):
39 Return the manifest for the current collection with all
40 non-portable hints (i.e., permission signatures and other
41 hints other than size hints) removed from the locators.
43 raw = self.manifest_text()
45 for line in raw.split("\n"):
48 clean_fields = fields[:1] + [
49 (re.sub(r'\+[^\d][^\+]*', '', x)
50 if re.match(util.keep_locator_pattern, x)
53 clean += [' '.join(clean_fields), "\n"]
57 class CollectionReader(CollectionBase):
58 def __init__(self, manifest_locator_or_text, api_client=None,
59 keep_client=None, num_retries=0):
60 """Instantiate a CollectionReader.
62 This class parses Collection manifests to provide a simple interface
63 to read its underlying files.
66 * manifest_locator_or_text: One of a Collection UUID, portable data
67 hash, or full manifest text.
68 * api_client: The API client to use to look up Collections. If not
69 provided, CollectionReader will build one from available Arvados
71 * keep_client: The KeepClient to use to download Collection data.
72 If not provided, CollectionReader will build one from available
73 Arvados configuration.
74 * num_retries: The default number of times to retry failed
75 service requests. Default 0. You may change this value
76 after instantiation, but note those changes may not
77 propagate to related objects like the Keep client.
79 self._api_client = api_client
80 self._keep_client = keep_client
81 self.num_retries = num_retries
82 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
83 self._manifest_locator = manifest_locator_or_text
84 self._manifest_text = None
85 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
86 self._manifest_locator = manifest_locator_or_text
87 self._manifest_text = None
88 elif re.match(util.manifest_pattern, manifest_locator_or_text):
89 self._manifest_text = manifest_locator_or_text
90 self._manifest_locator = None
92 raise errors.ArgumentError(
93 "Argument to CollectionReader must be a manifest or a collection UUID")
94 self._api_response = None
97 def _populate_from_api_server(self):
98 # As in KeepClient itself, we must wait until the last
99 # possible moment to instantiate an API client, in order to
100 # avoid tripping up clients that don't have access to an API
101 # server. If we do build one, make sure our Keep client uses
102 # it. If instantiation fails, we'll fall back to the except
103 # clause, just like any other Collection lookup
104 # failure. Return an exception, or None if successful.
106 if self._api_client is None:
107 self._api_client = arvados.api('v1')
108 self._keep_client = None # Make a new one with the new api.
109 self._api_response = self._api_client.collections().get(
110 uuid=self._manifest_locator).execute(
111 num_retries=self.num_retries)
112 self._manifest_text = self._api_response['manifest_text']
114 except Exception as e:
117 def _populate_from_keep(self):
118 # Retrieve a manifest directly from Keep. This has a chance of
119 # working if [a] the locator includes a permission signature
120 # or [b] the Keep services are operating in world-readable
121 # mode. Return an exception, or None if successful.
123 self._manifest_text = self._my_keep().get(
124 self._manifest_locator, num_retries=self.num_retries)
125 except Exception as e:
130 error_via_keep = None
131 should_try_keep = ((self._manifest_text is None) and
132 util.keep_locator_pattern.match(
133 self._manifest_locator))
134 if ((self._manifest_text is None) and
135 util.signed_locator_pattern.match(self._manifest_locator)):
136 error_via_keep = self._populate_from_keep()
137 if self._manifest_text is None:
138 error_via_api = self._populate_from_api_server()
139 if error_via_api is not None and not should_try_keep:
141 if ((self._manifest_text is None) and
142 not error_via_keep and
144 # Looks like a keep locator, and we didn't already try keep above
145 error_via_keep = self._populate_from_keep()
146 if self._manifest_text is None:
148 raise arvados.errors.NotFoundError(
149 ("Failed to retrieve collection '{}' " +
150 "from either API server ({}) or Keep ({})."
152 self._manifest_locator,
155 self._streams = [sline.split()
156 for sline in self._manifest_text.split("\n")
159 def _populate_first(orig_func):
160 # Decorator for methods that read actual Collection data.
161 @functools.wraps(orig_func)
162 def wrapper(self, *args, **kwargs):
163 if self._streams is None:
165 return orig_func(self, *args, **kwargs)
169 def api_response(self):
170 """api_response() -> dict or None
172 Returns information about this Collection fetched from the API server.
173 If the Collection exists in Keep but not the API server, currently
174 returns None. Future versions may provide a synthetic response.
176 return self._api_response
182 for s in self.all_streams():
183 for f in s.all_files():
184 streamname, filename = split(s.name() + "/" + f.name())
185 if streamname not in streams:
186 streams[streamname] = {}
187 if filename not in streams[streamname]:
188 streams[streamname][filename] = []
190 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
192 self._streams = [normalize_stream(s, streams[s])
193 for s in sorted(streams)]
195 # Regenerate the manifest text based on the normalized streams
196 self._manifest_text = ''.join(
197 [StreamReader(stream, keep=self._my_keep()).manifest_text()
198 for stream in self._streams])
201 def open(self, streampath, filename=None):
202 """open(streampath[, filename]) -> file-like object
204 Pass in the path of a file to read from the Collection, either as a
205 single string or as two separate stream name and file name arguments.
206 This method returns a file-like object to read that file.
209 streampath, filename = split(streampath)
210 keep_client = self._my_keep()
211 for stream_s in self._streams:
212 stream = StreamReader(stream_s, keep_client,
213 num_retries=self.num_retries)
214 if stream.name() == streampath:
217 raise ValueError("stream '{}' not found in Collection".
220 return stream.files()[filename]
222 raise ValueError("file '{}' not found in Collection stream '{}'".
223 format(filename, streampath))
226 def all_streams(self):
227 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
228 for s in self._streams]
231 for s in self.all_streams():
232 for f in s.all_files():
236 def manifest_text(self, strip=False, normalize=False):
238 cr = CollectionReader(self.manifest_text())
240 return cr.manifest_text(strip=strip, normalize=False)
242 return self.stripped_manifest()
244 return self._manifest_text
247 class _WriterFile(ArvadosFileBase):
248 def __init__(self, coll_writer, name):
249 super(_WriterFile, self).__init__(name, 'wb')
250 self.dest = coll_writer
253 super(_WriterFile, self).close()
254 self.dest.finish_current_file()
256 @ArvadosFileBase._before_close
257 def write(self, data):
258 self.dest.write(data)
260 @ArvadosFileBase._before_close
261 def writelines(self, seq):
265 @ArvadosFileBase._before_close
267 self.dest.flush_data()
270 class CollectionWriter(CollectionBase):
271 def __init__(self, api_client=None, num_retries=0):
272 """Instantiate a CollectionWriter.
274 CollectionWriter lets you build a new Arvados Collection from scratch.
275 Write files to it. The CollectionWriter will upload data to Keep as
276 appropriate, and provide you with the Collection manifest text when
280 * api_client: The API client to use to look up Collections. If not
281 provided, CollectionReader will build one from available Arvados
283 * num_retries: The default number of times to retry failed
284 service requests. Default 0. You may change this value
285 after instantiation, but note those changes may not
286 propagate to related objects like the Keep client.
288 self._api_client = api_client
289 self.num_retries = num_retries
290 self._keep_client = None
291 self._data_buffer = []
292 self._data_buffer_len = 0
293 self._current_stream_files = []
294 self._current_stream_length = 0
295 self._current_stream_locators = []
296 self._current_stream_name = '.'
297 self._current_file_name = None
298 self._current_file_pos = 0
299 self._finished_streams = []
300 self._close_file = None
301 self._queued_file = None
302 self._queued_dirents = deque()
303 self._queued_trees = deque()
304 self._last_open = None
306 def __exit__(self, exc_type, exc_value, traceback):
310 def do_queued_work(self):
311 # The work queue consists of three pieces:
312 # * _queued_file: The file object we're currently writing to the
314 # * _queued_dirents: Entries under the current directory
315 # (_queued_trees[0]) that we want to write or recurse through.
316 # This may contain files from subdirectories if
317 # max_manifest_depth == 0 for this directory.
318 # * _queued_trees: Directories that should be written as separate
319 # streams to the Collection.
320 # This function handles the smallest piece of work currently queued
321 # (current file, then current directory, then next directory) until
322 # no work remains. The _work_THING methods each do a unit of work on
323 # THING. _queue_THING methods add a THING to the work queue.
325 if self._queued_file:
327 elif self._queued_dirents:
329 elif self._queued_trees:
334 def _work_file(self):
336 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
340 self.finish_current_file()
342 self._queued_file.close()
343 self._close_file = None
344 self._queued_file = None
346 def _work_dirents(self):
347 path, stream_name, max_manifest_depth = self._queued_trees[0]
348 if stream_name != self.current_stream_name():
349 self.start_new_stream(stream_name)
350 while self._queued_dirents:
351 dirent = self._queued_dirents.popleft()
352 target = os.path.join(path, dirent)
353 if os.path.isdir(target):
354 self._queue_tree(target,
355 os.path.join(stream_name, dirent),
356 max_manifest_depth - 1)
358 self._queue_file(target, dirent)
360 if not self._queued_dirents:
361 self._queued_trees.popleft()
363 def _work_trees(self):
364 path, stream_name, max_manifest_depth = self._queued_trees[0]
365 d = util.listdir_recursive(
366 path, max_depth = (None if max_manifest_depth == 0 else 0))
368 self._queue_dirents(stream_name, d)
370 self._queued_trees.popleft()
372 def _queue_file(self, source, filename=None):
373 assert (self._queued_file is None), "tried to queue more than one file"
374 if not hasattr(source, 'read'):
375 source = open(source, 'rb')
376 self._close_file = True
378 self._close_file = False
380 filename = os.path.basename(source.name)
381 self.start_new_file(filename)
382 self._queued_file = source
384 def _queue_dirents(self, stream_name, dirents):
385 assert (not self._queued_dirents), "tried to queue more than one tree"
386 self._queued_dirents = deque(sorted(dirents))
388 def _queue_tree(self, path, stream_name, max_manifest_depth):
389 self._queued_trees.append((path, stream_name, max_manifest_depth))
391 def write_file(self, source, filename=None):
392 self._queue_file(source, filename)
393 self.do_queued_work()
395 def write_directory_tree(self,
396 path, stream_name='.', max_manifest_depth=-1):
397 self._queue_tree(path, stream_name, max_manifest_depth)
398 self.do_queued_work()
400 def write(self, newdata):
401 if hasattr(newdata, '__iter__'):
405 self._data_buffer.append(newdata)
406 self._data_buffer_len += len(newdata)
407 self._current_stream_length += len(newdata)
408 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
411 def open(self, streampath, filename=None):
412 """open(streampath[, filename]) -> file-like object
414 Pass in the path of a file to write to the Collection, either as a
415 single string or as two separate stream name and file name arguments.
416 This method returns a file-like object you can write to add it to the
419 You may only have one file object from the Collection open at a time,
420 so be sure to close the object when you're done. Using the object in
421 a with statement makes that easy::
423 with cwriter.open('./doc/page1.txt') as outfile:
424 outfile.write(page1_data)
425 with cwriter.open('./doc/page2.txt') as outfile:
426 outfile.write(page2_data)
429 streampath, filename = split(streampath)
430 if self._last_open and not self._last_open.closed:
431 raise errors.AssertionError(
432 "can't open '{}' when '{}' is still open".format(
433 filename, self._last_open.name))
434 if streampath != self.current_stream_name():
435 self.start_new_stream(streampath)
436 self.set_current_file_name(filename)
437 self._last_open = _WriterFile(self, filename)
438 return self._last_open
440 def flush_data(self):
441 data_buffer = ''.join(self._data_buffer)
443 self._current_stream_locators.append(
444 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
445 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
446 self._data_buffer_len = len(self._data_buffer[0])
448 def start_new_file(self, newfilename=None):
449 self.finish_current_file()
450 self.set_current_file_name(newfilename)
452 def set_current_file_name(self, newfilename):
453 if re.search(r'[\t\n]', newfilename):
454 raise errors.AssertionError(
455 "Manifest filenames cannot contain whitespace: %s" %
457 elif re.search(r'\x00', newfilename):
458 raise errors.AssertionError(
459 "Manifest filenames cannot contain NUL characters: %s" %
461 self._current_file_name = newfilename
463 def current_file_name(self):
464 return self._current_file_name
466 def finish_current_file(self):
467 if self._current_file_name is None:
468 if self._current_file_pos == self._current_stream_length:
470 raise errors.AssertionError(
471 "Cannot finish an unnamed file " +
472 "(%d bytes at offset %d in '%s' stream)" %
473 (self._current_stream_length - self._current_file_pos,
474 self._current_file_pos,
475 self._current_stream_name))
476 self._current_stream_files.append([
477 self._current_file_pos,
478 self._current_stream_length - self._current_file_pos,
479 self._current_file_name])
480 self._current_file_pos = self._current_stream_length
481 self._current_file_name = None
483 def start_new_stream(self, newstreamname='.'):
484 self.finish_current_stream()
485 self.set_current_stream_name(newstreamname)
487 def set_current_stream_name(self, newstreamname):
488 if re.search(r'[\t\n]', newstreamname):
489 raise errors.AssertionError(
490 "Manifest stream names cannot contain whitespace")
491 self._current_stream_name = '.' if newstreamname=='' else newstreamname
493 def current_stream_name(self):
494 return self._current_stream_name
496 def finish_current_stream(self):
497 self.finish_current_file()
499 if not self._current_stream_files:
501 elif self._current_stream_name is None:
502 raise errors.AssertionError(
503 "Cannot finish an unnamed stream (%d bytes in %d files)" %
504 (self._current_stream_length, len(self._current_stream_files)))
506 if not self._current_stream_locators:
507 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
508 self._finished_streams.append([self._current_stream_name,
509 self._current_stream_locators,
510 self._current_stream_files])
511 self._current_stream_files = []
512 self._current_stream_length = 0
513 self._current_stream_locators = []
514 self._current_stream_name = None
515 self._current_file_pos = 0
516 self._current_file_name = None
519 # Store the manifest in Keep and return its locator.
520 return self._my_keep().put(self.manifest_text())
522 def portable_data_hash(self):
523 stripped = self.stripped_manifest()
524 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
526 def manifest_text(self):
527 self.finish_current_stream()
530 for stream in self._finished_streams:
531 if not re.search(r'^\.(/.*)?$', stream[0]):
533 manifest += stream[0].replace(' ', '\\040')
534 manifest += ' ' + ' '.join(stream[1])
535 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
540 def data_locators(self):
542 for name, locators, files in self._finished_streams:
547 class ResumableCollectionWriter(CollectionWriter):
548 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
549 '_current_stream_locators', '_current_stream_name',
550 '_current_file_name', '_current_file_pos', '_close_file',
551 '_data_buffer', '_dependencies', '_finished_streams',
552 '_queued_dirents', '_queued_trees']
554 def __init__(self, api_client=None, num_retries=0):
555 self._dependencies = {}
556 super(ResumableCollectionWriter, self).__init__(
557 api_client, num_retries=num_retries)
560 def from_state(cls, state, *init_args, **init_kwargs):
561 # Try to build a new writer from scratch with the given state.
562 # If the state is not suitable to resume (because files have changed,
563 # been deleted, aren't predictable, etc.), raise a
564 # StaleWriterStateError. Otherwise, return the initialized writer.
565 # The caller is responsible for calling writer.do_queued_work()
566 # appropriately after it's returned.
567 writer = cls(*init_args, **init_kwargs)
568 for attr_name in cls.STATE_PROPS:
569 attr_value = state[attr_name]
570 attr_class = getattr(writer, attr_name).__class__
571 # Coerce the value into the same type as the initial value, if
573 if attr_class not in (type(None), attr_value.__class__):
574 attr_value = attr_class(attr_value)
575 setattr(writer, attr_name, attr_value)
576 # Check dependencies before we try to resume anything.
577 if any(KeepLocator(ls).permission_expired()
578 for ls in writer._current_stream_locators):
579 raise errors.StaleWriterStateError(
580 "locators include expired permission hint")
581 writer.check_dependencies()
582 if state['_current_file'] is not None:
583 path, pos = state['_current_file']
585 writer._queued_file = open(path, 'rb')
586 writer._queued_file.seek(pos)
587 except IOError as error:
588 raise errors.StaleWriterStateError(
589 "failed to reopen active file {}: {}".format(path, error))
592 def check_dependencies(self):
593 for path, orig_stat in self._dependencies.items():
594 if not S_ISREG(orig_stat[ST_MODE]):
595 raise errors.StaleWriterStateError("{} not file".format(path))
597 now_stat = tuple(os.stat(path))
598 except OSError as error:
599 raise errors.StaleWriterStateError(
600 "failed to stat {}: {}".format(path, error))
601 if ((not S_ISREG(now_stat[ST_MODE])) or
602 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
603 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
604 raise errors.StaleWriterStateError("{} changed".format(path))
606 def dump_state(self, copy_func=lambda x: x):
607 state = {attr: copy_func(getattr(self, attr))
608 for attr in self.STATE_PROPS}
609 if self._queued_file is None:
610 state['_current_file'] = None
612 state['_current_file'] = (os.path.realpath(self._queued_file.name),
613 self._queued_file.tell())
616 def _queue_file(self, source, filename=None):
618 src_path = os.path.realpath(source)
620 raise errors.AssertionError("{} not a file path".format(source))
622 path_stat = os.stat(src_path)
623 except OSError as stat_error:
625 super(ResumableCollectionWriter, self)._queue_file(source, filename)
626 fd_stat = os.fstat(self._queued_file.fileno())
627 if not S_ISREG(fd_stat.st_mode):
628 # We won't be able to resume from this cache anyway, so don't
629 # worry about further checks.
630 self._dependencies[source] = tuple(fd_stat)
631 elif path_stat is None:
632 raise errors.AssertionError(
633 "could not stat {}: {}".format(source, stat_error))
634 elif path_stat.st_ino != fd_stat.st_ino:
635 raise errors.AssertionError(
636 "{} changed between open and stat calls".format(source))
638 self._dependencies[src_path] = tuple(fd_stat)
640 def write(self, data):
641 if self._queued_file is None:
642 raise errors.AssertionError(
643 "resumable writer can't accept unsourced data")
644 return super(ResumableCollectionWriter, self).write(data)
650 class SynchronizedCollectionBase(CollectionBase):
651 def __init__(self, parent=None):
653 self._modified = True
657 raise NotImplementedError()
660 raise NotImplementedError()
662 def _my_block_manager(self):
663 raise NotImplementedError()
666 raise NotImplementedError()
669 raise NotImplementedError()
671 def root_collection(self):
672 raise NotImplementedError()
674 def notify(self, event, collection, name, item):
675 raise NotImplementedError()
678 def find(self, path, create=False, create_collection=False):
679 """Recursively search the specified file path. May return either a Collection
683 If true, create path components (i.e. Collections) that are
684 missing. If "create" is False, return None if a path component is
688 If the path is not found, "create" is True, and
689 "create_collection" is False, then create and return a new
690 ArvadosFile for the last path component. If "create_collection" is
691 True, then create and return a new Collection for the last path
695 if create and self.sync_mode() == SYNC_READONLY:
696 raise IOError((errno.EROFS, "Collection is read only"))
703 item = self._items.get(p[0])
705 # item must be a file
706 if item is None and create:
708 if create_collection:
709 item = Subcollection(self)
711 item = ArvadosFile(self)
712 self._items[p[0]] = item
713 self._modified = True
714 self.notify(ADD, self, p[0], item)
717 if item is None and create:
718 # create new collection
719 item = Subcollection(self)
720 self._items[p[0]] = item
721 self._modified = True
722 self.notify(ADD, self, p[0], item)
724 if isinstance(item, SynchronizedCollectionBase):
725 return item.find("/".join(p), create=create)
727 raise errors.ArgumentError("Interior path components must be subcollection")
731 def open(self, path, mode):
732 """Open a file-like object for access.
735 path to a file in the collection
737 one of "r", "r+", "w", "w+", "a", "a+"
741 opens for reading and writing. Reads/writes share a file pointer.
743 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
745 opens for reading and writing. All writes are appended to
746 the end of the file. Writing does not affect the file pointer for
749 mode = mode.replace("b", "")
750 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
751 raise ArgumentError("Bad mode '%s'" % mode)
752 create = (mode != "r")
754 if create and self.sync_mode() == SYNC_READONLY:
755 raise IOError((errno.EROFS, "Collection is read only"))
757 f = self.find(path, create=create)
760 raise IOError((errno.ENOENT, "File not found"))
761 if not isinstance(f, ArvadosFile):
762 raise IOError((errno.EISDIR, "Path must refer to a file."))
768 return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
770 return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
774 """Test if the collection (or any subcollection or file) has been modified
775 since it was created."""
778 for k,v in self._items.items():
784 def set_unmodified(self):
785 """Recursively clear modified flag"""
786 self._modified = False
787 for k,v in self._items.items():
792 """Iterate over names of files and collections contained in this collection."""
793 return self._items.keys().__iter__()
797 """Iterate over names of files and collections directly contained in this collection."""
798 return self._items.keys()
801 def __getitem__(self, k):
802 """Get a file or collection that is directly contained by this collection. If
803 you want to search a path, use `find()` instead.
805 return self._items[k]
808 def __contains__(self, k):
809 """If there is a file or collection a directly contained by this collection
811 return k in self._items
815 """Get the number of items directly contained in this collection"""
816 return len(self._items)
820 def __delitem__(self, p):
821 """Delete an item by name which is directly contained by this collection."""
823 self._modified = True
824 self.notify(DEL, self, p, None)
828 """Get a list of names of files and collections directly contained in this collection."""
829 return self._items.keys()
833 """Get a list of files and collection objects directly contained in this collection."""
834 return self._items.values()
838 """Get a list of (name, object) tuples directly contained in this collection."""
839 return self._items.items()
841 def exists(self, path):
842 """Test if there is a file or collection at "path" """
843 return self.find(path) != None
847 def remove(self, path, rm_r=False):
848 """Remove the file or subcollection (directory) at `path`.
850 Specify whether to remove non-empty subcollections (True), or raise an error (False).
854 # Remove '.' from the front of the path
858 item = self._items.get(p[0])
860 raise IOError((errno.ENOENT, "File not found"))
862 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
863 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
864 d = self._items[p[0]]
865 del self._items[p[0]]
866 self._modified = True
867 self.notify(DEL, self, p[0], d)
870 item.remove("/".join(p))
872 raise IOError((errno.ENOENT, "File not found"))
874 def _cloneinto(self, target):
875 for k,v in self._items.items():
876 target._items[k] = v.clone(target)
879 raise NotImplementedError()
883 def copy(self, source, target_path, source_collection=None, overwrite=False):
884 """Copy a file or subcollection to a new path in this collection.
887 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
890 Destination file or path. If the target path already exists and is a
891 subcollection, the item will be placed inside the subcollection. If
892 the target path already exists and is a file, this will raise an error
893 unless you specify `overwrite=True`.
896 Collection to copy `source_path` from (default `self`)
899 Whether to overwrite target file if it already exists.
901 if source_collection is None:
902 source_collection = self
904 # Find the object to copy
905 if isinstance(source, basestring):
906 source_obj = source_collection.find(source)
907 if source_obj is None:
908 raise IOError((errno.ENOENT, "File not found"))
909 sp = source.split("/")
914 # Find parent collection the target path
915 tp = target_path.split("/")
917 # Determine the name to use.
918 target_name = tp[-1] if tp[-1] else (sp[-1] if sp else None)
921 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
923 target_dir = self.find("/".join(tp[0:-1]), create=True, create_collection=True)
925 with target_dir.lock:
926 if target_name in target_dir:
927 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sp:
928 target_dir = target_dir[target_name]
931 raise IOError((errno.EEXIST, "File already exists"))
934 if target_name in target_dir:
935 mod = target_dir[target_name]
937 # Actually make the copy.
938 dup = source_obj.clone(target_dir)
939 target_dir._items[target_name] = dup
940 target_dir._modified = True
943 self.notify(MOD, target_dir, target_name, (mod, dup))
945 self.notify(ADD, target_dir, target_name, dup)
948 def manifest_text(self, strip=False, normalize=False):
949 """Get the manifest text for this collection, sub collections and files.
952 If True, remove signing tokens from block locators if present.
953 If False, block locators are left unchanged.
956 If True, always export the manifest text in normalized form
957 even if the Collection is not modified. If False and the collection
958 is not modified, return the original manifest text even if it is not
962 if self.modified() or self._manifest_text is None or normalize:
963 return export_manifest(self, stream_name=".", portable_locators=strip)
966 return self.stripped_manifest()
968 return self._manifest_text
971 def diff(self, end_collection, prefix=".", holding_collection=None):
973 Generate list of add/modify/delete actions which, when given to `apply`, will
974 change `self` to match `end_collection`
977 if holding_collection is None:
978 holding_collection = Collection()
980 if k not in end_collection:
981 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
982 for k in end_collection:
984 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
985 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
986 elif end_collection[k] != self[k]:
987 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
989 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
994 def apply(self, changes):
996 Apply changes from `diff`. If a change conflicts with a local change, it
997 will be saved to an alternate path indicating the conflict.
1002 local = self.find(path)
1003 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
1007 # No local file at path, safe to copy over new file
1008 self.copy(initial, path)
1009 elif local is not None and local != initial:
1010 # There is already local file and it is different:
1011 # save change to conflict file.
1012 self.copy(initial, conflictpath)
1014 if local == initial:
1015 # Local matches the "initial" item so assume it hasn't
1016 # changed locally and is safe to update.
1017 if isinstance(local, ArvadosFile) and isinstance(c[3], ArvadosFile):
1018 # Replace contents of local file with new contents
1019 local.replace_contents(c[3])
1021 # Overwrite path with new item; this can happen if if
1022 # path was a file and is now a collection or vice versa
1023 self.copy(c[3], path, overwrite=True)
1025 # Local is missing (presumably deleted) or local doesn't
1026 # match the "start" value, so save change to conflict file
1027 self.copy(c[3], conflictpath)
1029 if local == initial:
1030 # Local item matches "initial" value, so it is safe to remove.
1031 self.remove(path, rm_r=True)
1032 # else, the file is modified or already removed, in either
1033 # case we don't want to try to remove it.
1035 def portable_data_hash(self):
1036 """Get the portable data hash for this collection's manifest."""
1037 stripped = self.manifest_text(strip=True)
1038 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1041 def __eq__(self, other):
1044 if not isinstance(other, SynchronizedCollectionBase):
1046 if len(self._items) != len(other):
1048 for k in self._items:
1051 if self._items[k] != other[k]:
1055 def __ne__(self, other):
1056 return not self.__eq__(other)
1058 class Collection(SynchronizedCollectionBase):
1059 """Store an Arvados collection, consisting of a set of files and
1060 sub-collections. This object
1063 def __init__(self, manifest_locator_or_text=None,
1070 sync=SYNC_READONLY):
1071 """:manifest_locator_or_text:
1072 One of Arvados collection UUID, block locator of
1073 a manifest, raw manifest text, or None (to create an empty collection).
1075 the parent Collection, may be None.
1077 the arvados configuration to get the hostname and api token.
1078 Prefer this over supplying your own api_client and keep_client (except in testing).
1079 Will use default config settings if not specified.
1081 The API client object to use for requests. If not specified, create one using `config`.
1083 the Keep client to use for requests. If not specified, create one using `config`.
1085 the number of retries for API and Keep requests.
1087 the block manager to use. If not specified, create one.
1089 Set synchronization policy with API server collection record.
1091 Collection is read only. No synchronization. This mode will
1092 also forego locking, which gives better performance.
1094 Collection is writable. Synchronize on explicit request via `update()` or `save()`
1096 Collection is writable. Synchronize with server in response to
1097 background websocket events, on block write, or on file close.
1100 super(Collection, self).__init__(parent)
1101 self._api_client = api_client
1102 self._keep_client = keep_client
1103 self._block_manager = block_manager
1104 self._config = config
1105 self.num_retries = num_retries
1106 self._manifest_locator = None
1107 self._manifest_text = None
1108 self._api_response = None
1110 self.lock = threading.RLock()
1114 if manifest_locator_or_text:
1115 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1116 self._manifest_locator = manifest_locator_or_text
1117 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1118 self._manifest_locator = manifest_locator_or_text
1119 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1120 self._manifest_text = manifest_locator_or_text
1122 raise errors.ArgumentError(
1123 "Argument to CollectionReader must be a manifest or a collection UUID")
1127 if self._sync == SYNC_LIVE:
1128 if not self._has_collection_uuid():
1129 raise errors.ArgumentError("Cannot SYNC_LIVE associated with a collection uuid")
1130 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1133 def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1134 c = Collection(sync=sync)
1135 c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1138 def root_collection(self):
1141 def sync_mode(self):
1144 def on_message(self):
1149 def update(self, other=None, num_retries=None):
1151 if self._manifest_locator is None:
1152 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1153 n = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1154 other = import_collection(n["manifest_text"])
1155 baseline = import_collection(self._manifest_text)
1156 self.apply(other.diff(baseline))
1160 if self._api_client is None:
1161 self._api_client = arvados.SafeApi(self._config)
1162 self._keep_client = self._api_client.keep
1163 return self._api_client
1167 if self._keep_client is None:
1168 if self._api_client is None:
1171 self._keep_client = KeepClient(api=self._api_client)
1172 return self._keep_client
1175 def _my_block_manager(self):
1176 if self._block_manager is None:
1177 self._block_manager = BlockManager(self._my_keep())
1178 return self._block_manager
1180 def _populate_from_api_server(self):
1181 # As in KeepClient itself, we must wait until the last
1182 # possible moment to instantiate an API client, in order to
1183 # avoid tripping up clients that don't have access to an API
1184 # server. If we do build one, make sure our Keep client uses
1185 # it. If instantiation fails, we'll fall back to the except
1186 # clause, just like any other Collection lookup
1187 # failure. Return an exception, or None if successful.
1189 self._api_response = self._my_api().collections().get(
1190 uuid=self._manifest_locator).execute(
1191 num_retries=self.num_retries)
1192 self._manifest_text = self._api_response['manifest_text']
1194 except Exception as e:
1197 def _populate_from_keep(self):
1198 # Retrieve a manifest directly from Keep. This has a chance of
1199 # working if [a] the locator includes a permission signature
1200 # or [b] the Keep services are operating in world-readable
1201 # mode. Return an exception, or None if successful.
1203 self._manifest_text = self._my_keep().get(
1204 self._manifest_locator, num_retries=self.num_retries)
1205 except Exception as e:
1208 def _populate(self):
1209 if self._manifest_locator is None and self._manifest_text is None:
1211 error_via_api = None
1212 error_via_keep = None
1213 should_try_keep = ((self._manifest_text is None) and
1214 util.keep_locator_pattern.match(
1215 self._manifest_locator))
1216 if ((self._manifest_text is None) and
1217 util.signed_locator_pattern.match(self._manifest_locator)):
1218 error_via_keep = self._populate_from_keep()
1219 if self._manifest_text is None:
1220 error_via_api = self._populate_from_api_server()
1221 if error_via_api is not None and not should_try_keep:
1223 if ((self._manifest_text is None) and
1224 not error_via_keep and
1226 # Looks like a keep locator, and we didn't already try keep above
1227 error_via_keep = self._populate_from_keep()
1228 if self._manifest_text is None:
1230 raise arvados.errors.NotFoundError(
1231 ("Failed to retrieve collection '{}' " +
1232 "from either API server ({}) or Keep ({})."
1234 self._manifest_locator,
1238 self._baseline_manifest = self._manifest_text
1239 import_manifest(self._manifest_text, self)
1241 if self._sync == SYNC_READONLY:
1242 # Now that we're populated, knowing that this will be readonly,
1243 # forego any further locking.
1244 self.lock = NoopLock()
1246 def _has_collection_uuid(self):
1247 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1249 def __enter__(self):
1252 def __exit__(self, exc_type, exc_value, traceback):
1253 """Support scoped auto-commit in a with: block"""
1254 if self._sync != SYNC_READONLY and self._has_collection_uuid():
1256 if self._block_manager is not None:
1257 self._block_manager.stop_threads()
1260 def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1261 if new_config is None:
1262 new_config = self._config
1263 c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1264 if new_sync == SYNC_READONLY:
1271 def api_response(self):
1273 api_response() -> dict or None
1275 Returns information about this Collection fetched from the API server.
1276 If the Collection exists in Keep but not the API server, currently
1277 returns None. Future versions may provide a synthetic response.
1279 return self._api_response
1284 def save(self, merge=True, num_retries=None):
1285 """Commit pending buffer blocks to Keep, merge with remote record (if
1286 update=True), write the manifest to Keep, and update the collection
1287 record. Will raise AssertionError if not associated with a collection
1288 record on the API server. If you want to save a manifest to Keep only,
1292 Update and merge remote changes before saving. Otherwise, any
1293 remote changes will be ignored and overwritten.
1297 if not self._has_collection_uuid():
1298 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1299 self._my_block_manager().commit_all()
1302 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1304 mt = self.manifest_text(strip=False)
1305 self._api_response = self._my_api().collections().update(
1306 uuid=self._manifest_locator,
1307 body={'manifest_text': mt}
1309 num_retries=num_retries)
1310 self._manifest_text = mt
1311 self.set_unmodified()
1316 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1317 """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1318 a new collection record (if create_collection_record True). After
1319 creating a new collection record, this Collection object will be
1320 associated with the new record for `save()` and SYNC_LIVE updates.
1323 The collection name.
1326 Only save the manifest to keep, do not create a collection record.
1329 the user, or project uuid that will own this collection.
1330 If None, defaults to the current user.
1332 :ensure_unique_name:
1333 If True, ask the API server to rename the collection
1334 if it conflicts with a collection with the same name and owner. If
1335 False, a name conflict will result in an error.
1338 self._my_block_manager().commit_all()
1339 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1340 mt = self.manifest_text(strip=False)
1342 if create_collection_record:
1344 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1346 body = {"manifest_text": mt,
1349 body["owner_uuid"] = owner_uuid
1351 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1354 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1356 self._manifest_locator = self._api_response["uuid"]
1359 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1361 self._manifest_text = mt
1362 self.set_unmodified()
1365 def subscribe(self, callback):
1366 self.callbacks.append(callback)
1369 def unsubscribe(self, callback):
1370 self.callbacks.remove(callback)
1373 def notify(self, event, collection, name, item):
1374 for c in self.callbacks:
1375 c(event, collection, name, item)
1377 class Subcollection(SynchronizedCollectionBase):
1378 """This is a subdirectory within a collection that doesn't have its own API
1379 server record. It falls under the umbrella of the root collection."""
1381 def __init__(self, parent):
1382 super(Subcollection, self).__init__(parent)
1383 self.lock = self.root_collection().lock
1385 def root_collection(self):
1386 return self.parent.root_collection()
1388 def sync_mode(self):
1389 return self.root_collection().sync_mode()
1392 return self.root_collection()._my_api()
1395 return self.root_collection()._my_keep()
1397 def _my_block_manager(self):
1398 return self.root_collection()._my_block_manager()
1400 def _populate(self):
1401 self.root_collection()._populate()
1403 def notify(self, event, collection, name, item):
1404 return self.root_collection().notify(event, collection, name, item)
1407 def clone(self, new_parent):
1408 c = Subcollection(new_parent)
1412 def import_manifest(manifest_text,
1413 into_collection=None,
1417 sync=SYNC_READONLY):
1418 """Import a manifest into a `Collection`.
1421 The manifest text to import from.
1424 The `Collection` that will be initialized (must be empty).
1425 If None, create a new `Collection` object.
1428 The API client object that will be used when creating a new `Collection` object.
1431 The keep client object that will be used when creating a new `Collection` object.
1434 the default number of api client and keep retries on error.
1437 Collection sync mode (only if into_collection is None)
1439 if into_collection is not None:
1440 if len(into_collection) > 0:
1441 raise ArgumentError("Can only import manifest into an empty collection")
1444 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1446 save_sync = c.sync_mode()
1456 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1460 if state == STREAM_NAME:
1461 # starting a new stream
1462 stream_name = tok.replace('\\040', ' ')
1470 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1472 blocksize = long(s.group(1))
1473 blocks.append(Range(tok, streamoffset, blocksize))
1474 streamoffset += blocksize
1478 if state == SEGMENTS:
1479 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1481 pos = long(s.group(1))
1482 size = long(s.group(2))
1483 name = s.group(3).replace('\\040', ' ')
1484 f = c.find("%s/%s" % (stream_name, name), create=True)
1485 f.add_segment(blocks, pos, size)
1488 raise errors.SyntaxError("Invalid manifest format")
1498 def export_manifest(item, stream_name=".", portable_locators=False):
1501 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1502 `item` is a is a `Collection`, this will also export subcollections.
1505 the name of the stream when exporting `item`.
1508 If True, strip any permission hints on block locators.
1509 If False, use block locators as-is.
1512 if isinstance(item, SynchronizedCollectionBase):
1514 sorted_keys = sorted(item.keys())
1515 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1518 for s in v.segments():
1520 if loc.startswith("bufferblock"):
1521 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1522 if portable_locators:
1523 loc = KeepLocator(loc).stripped()
1524 st.append(LocatorAndRange(loc, locator_block_size(loc),
1525 s.segment_offset, s.range_size))
1528 buf += ' '.join(normalize_stream(stream_name, stream))
1530 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1531 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1532 elif isinstance(item, ArvadosFile):
1534 for s in item.segments:
1536 if loc.startswith("bufferblock"):
1537 loc = item._bufferblocks[loc].calculate_locator()
1538 if portable_locators:
1539 loc = KeepLocator(loc).stripped()
1540 st.append(LocatorAndRange(loc, locator_block_size(loc),
1541 s.segment_offset, s.range_size))
1542 stream[stream_name] = st
1543 buf += ' '.join(normalize_stream(stream_name, stream))