1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
19 from collections import deque
22 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
23 from .keep import KeepLocator, KeepClient
24 from .stream import StreamReader
25 from ._normalize_stream import normalize_stream
26 from ._ranges import Range, LocatorAndRange
27 from .safeapi import ThreadSafeApiCache
28 import arvados.config as config
29 import arvados.errors as errors
31 import arvados.events as events
32 from arvados.retry import retry_method
34 _logger = logging.getLogger('arvados.collection')
36 class CollectionBase(object):
37 """Abstract base class for Collection classes."""
42 def __exit__(self, exc_type, exc_value, traceback):
46 if self._keep_client is None:
47 self._keep_client = KeepClient(api_client=self._api_client,
48 num_retries=self.num_retries)
49 return self._keep_client
51 def stripped_manifest(self):
52 """Get the manifest with locator hints stripped.
54 Return the manifest for the current collection with all
55 non-portable hints (i.e., permission signatures and other
56 hints other than size hints) removed from the locators.
58 raw = self.manifest_text()
60 for line in raw.split("\n"):
63 clean_fields = fields[:1] + [
64 (re.sub(r'\+[^\d][^\+]*', '', x)
65 if re.match(arvados.util.keep_locator_pattern, x)
68 clean += [' '.join(clean_fields), "\n"]
72 class _WriterFile(_FileLikeObjectBase):
73 def __init__(self, coll_writer, name):
74 super(_WriterFile, self).__init__(name, 'wb')
75 self.dest = coll_writer
78 super(_WriterFile, self).close()
79 self.dest.finish_current_file()
81 @_FileLikeObjectBase._before_close
82 def write(self, data):
85 @_FileLikeObjectBase._before_close
86 def writelines(self, seq):
90 @_FileLikeObjectBase._before_close
92 self.dest.flush_data()
95 class CollectionWriter(CollectionBase):
96 """Deprecated, use Collection instead."""
98 def __init__(self, api_client=None, num_retries=0, replication=None):
99 """Instantiate a CollectionWriter.
101 CollectionWriter lets you build a new Arvados Collection from scratch.
102 Write files to it. The CollectionWriter will upload data to Keep as
103 appropriate, and provide you with the Collection manifest text when
107 * api_client: The API client to use to look up Collections. If not
108 provided, CollectionReader will build one from available Arvados
110 * num_retries: The default number of times to retry failed
111 service requests. Default 0. You may change this value
112 after instantiation, but note those changes may not
113 propagate to related objects like the Keep client.
114 * replication: The number of copies of each block to store.
115 If this argument is None or not supplied, replication is
116 the server-provided default if available, otherwise 2.
118 self._api_client = api_client
119 self.num_retries = num_retries
120 self.replication = (2 if replication is None else replication)
121 self._keep_client = None
122 self._data_buffer = []
123 self._data_buffer_len = 0
124 self._current_stream_files = []
125 self._current_stream_length = 0
126 self._current_stream_locators = []
127 self._current_stream_name = '.'
128 self._current_file_name = None
129 self._current_file_pos = 0
130 self._finished_streams = []
131 self._close_file = None
132 self._queued_file = None
133 self._queued_dirents = deque()
134 self._queued_trees = deque()
135 self._last_open = None
137 def __exit__(self, exc_type, exc_value, traceback):
141 def do_queued_work(self):
142 # The work queue consists of three pieces:
143 # * _queued_file: The file object we're currently writing to the
145 # * _queued_dirents: Entries under the current directory
146 # (_queued_trees[0]) that we want to write or recurse through.
147 # This may contain files from subdirectories if
148 # max_manifest_depth == 0 for this directory.
149 # * _queued_trees: Directories that should be written as separate
150 # streams to the Collection.
151 # This function handles the smallest piece of work currently queued
152 # (current file, then current directory, then next directory) until
153 # no work remains. The _work_THING methods each do a unit of work on
154 # THING. _queue_THING methods add a THING to the work queue.
156 if self._queued_file:
158 elif self._queued_dirents:
160 elif self._queued_trees:
165 def _work_file(self):
167 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
171 self.finish_current_file()
173 self._queued_file.close()
174 self._close_file = None
175 self._queued_file = None
177 def _work_dirents(self):
178 path, stream_name, max_manifest_depth = self._queued_trees[0]
179 if stream_name != self.current_stream_name():
180 self.start_new_stream(stream_name)
181 while self._queued_dirents:
182 dirent = self._queued_dirents.popleft()
183 target = os.path.join(path, dirent)
184 if os.path.isdir(target):
185 self._queue_tree(target,
186 os.path.join(stream_name, dirent),
187 max_manifest_depth - 1)
189 self._queue_file(target, dirent)
191 if not self._queued_dirents:
192 self._queued_trees.popleft()
194 def _work_trees(self):
195 path, stream_name, max_manifest_depth = self._queued_trees[0]
196 d = arvados.util.listdir_recursive(
197 path, max_depth = (None if max_manifest_depth == 0 else 0))
199 self._queue_dirents(stream_name, d)
201 self._queued_trees.popleft()
203 def _queue_file(self, source, filename=None):
204 assert (self._queued_file is None), "tried to queue more than one file"
205 if not hasattr(source, 'read'):
206 source = open(source, 'rb')
207 self._close_file = True
209 self._close_file = False
211 filename = os.path.basename(source.name)
212 self.start_new_file(filename)
213 self._queued_file = source
215 def _queue_dirents(self, stream_name, dirents):
216 assert (not self._queued_dirents), "tried to queue more than one tree"
217 self._queued_dirents = deque(sorted(dirents))
219 def _queue_tree(self, path, stream_name, max_manifest_depth):
220 self._queued_trees.append((path, stream_name, max_manifest_depth))
222 def write_file(self, source, filename=None):
223 self._queue_file(source, filename)
224 self.do_queued_work()
226 def write_directory_tree(self,
227 path, stream_name='.', max_manifest_depth=-1):
228 self._queue_tree(path, stream_name, max_manifest_depth)
229 self.do_queued_work()
231 def write(self, newdata):
232 if isinstance(newdata, bytes):
234 elif isinstance(newdata, str):
235 newdata = newdata.encode()
236 elif hasattr(newdata, '__iter__'):
240 self._data_buffer.append(newdata)
241 self._data_buffer_len += len(newdata)
242 self._current_stream_length += len(newdata)
243 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
246 def open(self, streampath, filename=None):
247 """open(streampath[, filename]) -> file-like object
249 Pass in the path of a file to write to the Collection, either as a
250 single string or as two separate stream name and file name arguments.
251 This method returns a file-like object you can write to add it to the
254 You may only have one file object from the Collection open at a time,
255 so be sure to close the object when you're done. Using the object in
256 a with statement makes that easy::
258 with cwriter.open('./doc/page1.txt') as outfile:
259 outfile.write(page1_data)
260 with cwriter.open('./doc/page2.txt') as outfile:
261 outfile.write(page2_data)
264 streampath, filename = split(streampath)
265 if self._last_open and not self._last_open.closed:
266 raise errors.AssertionError(
267 "can't open '{}' when '{}' is still open".format(
268 filename, self._last_open.name))
269 if streampath != self.current_stream_name():
270 self.start_new_stream(streampath)
271 self.set_current_file_name(filename)
272 self._last_open = _WriterFile(self, filename)
273 return self._last_open
275 def flush_data(self):
276 data_buffer = b''.join(self._data_buffer)
278 self._current_stream_locators.append(
280 data_buffer[0:config.KEEP_BLOCK_SIZE],
281 copies=self.replication))
282 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
283 self._data_buffer_len = len(self._data_buffer[0])
285 def start_new_file(self, newfilename=None):
286 self.finish_current_file()
287 self.set_current_file_name(newfilename)
289 def set_current_file_name(self, newfilename):
290 if re.search(r'[\t\n]', newfilename):
291 raise errors.AssertionError(
292 "Manifest filenames cannot contain whitespace: %s" %
294 elif re.search(r'\x00', newfilename):
295 raise errors.AssertionError(
296 "Manifest filenames cannot contain NUL characters: %s" %
298 self._current_file_name = newfilename
300 def current_file_name(self):
301 return self._current_file_name
303 def finish_current_file(self):
304 if self._current_file_name is None:
305 if self._current_file_pos == self._current_stream_length:
307 raise errors.AssertionError(
308 "Cannot finish an unnamed file " +
309 "(%d bytes at offset %d in '%s' stream)" %
310 (self._current_stream_length - self._current_file_pos,
311 self._current_file_pos,
312 self._current_stream_name))
313 self._current_stream_files.append([
314 self._current_file_pos,
315 self._current_stream_length - self._current_file_pos,
316 self._current_file_name])
317 self._current_file_pos = self._current_stream_length
318 self._current_file_name = None
320 def start_new_stream(self, newstreamname='.'):
321 self.finish_current_stream()
322 self.set_current_stream_name(newstreamname)
324 def set_current_stream_name(self, newstreamname):
325 if re.search(r'[\t\n]', newstreamname):
326 raise errors.AssertionError(
327 "Manifest stream names cannot contain whitespace: '%s'" %
329 self._current_stream_name = '.' if newstreamname=='' else newstreamname
331 def current_stream_name(self):
332 return self._current_stream_name
334 def finish_current_stream(self):
335 self.finish_current_file()
337 if not self._current_stream_files:
339 elif self._current_stream_name is None:
340 raise errors.AssertionError(
341 "Cannot finish an unnamed stream (%d bytes in %d files)" %
342 (self._current_stream_length, len(self._current_stream_files)))
344 if not self._current_stream_locators:
345 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
346 self._finished_streams.append([self._current_stream_name,
347 self._current_stream_locators,
348 self._current_stream_files])
349 self._current_stream_files = []
350 self._current_stream_length = 0
351 self._current_stream_locators = []
352 self._current_stream_name = None
353 self._current_file_pos = 0
354 self._current_file_name = None
357 """Store the manifest in Keep and return its locator.
359 This is useful for storing manifest fragments (task outputs)
360 temporarily in Keep during a Crunch job.
362 In other cases you should make a collection instead, by
363 sending manifest_text() to the API server's "create
364 collection" endpoint.
366 return self._my_keep().put(self.manifest_text().encode(),
367 copies=self.replication)
369 def portable_data_hash(self):
370 stripped = self.stripped_manifest().encode()
371 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
373 def manifest_text(self):
374 self.finish_current_stream()
377 for stream in self._finished_streams:
378 if not re.search(r'^\.(/.*)?$', stream[0]):
380 manifest += stream[0].replace(' ', '\\040')
381 manifest += ' ' + ' '.join(stream[1])
382 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
387 def data_locators(self):
389 for name, locators, files in self._finished_streams:
393 def save_new(self, name=None):
394 return self._api_client.collections().create(
395 ensure_unique_name=True,
398 'manifest_text': self.manifest_text(),
399 }).execute(num_retries=self.num_retries)
402 class ResumableCollectionWriter(CollectionWriter):
403 """Deprecated, use Collection instead."""
405 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
406 '_current_stream_locators', '_current_stream_name',
407 '_current_file_name', '_current_file_pos', '_close_file',
408 '_data_buffer', '_dependencies', '_finished_streams',
409 '_queued_dirents', '_queued_trees']
411 def __init__(self, api_client=None, **kwargs):
412 self._dependencies = {}
413 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
416 def from_state(cls, state, *init_args, **init_kwargs):
417 # Try to build a new writer from scratch with the given state.
418 # If the state is not suitable to resume (because files have changed,
419 # been deleted, aren't predictable, etc.), raise a
420 # StaleWriterStateError. Otherwise, return the initialized writer.
421 # The caller is responsible for calling writer.do_queued_work()
422 # appropriately after it's returned.
423 writer = cls(*init_args, **init_kwargs)
424 for attr_name in cls.STATE_PROPS:
425 attr_value = state[attr_name]
426 attr_class = getattr(writer, attr_name).__class__
427 # Coerce the value into the same type as the initial value, if
429 if attr_class not in (type(None), attr_value.__class__):
430 attr_value = attr_class(attr_value)
431 setattr(writer, attr_name, attr_value)
432 # Check dependencies before we try to resume anything.
433 if any(KeepLocator(ls).permission_expired()
434 for ls in writer._current_stream_locators):
435 raise errors.StaleWriterStateError(
436 "locators include expired permission hint")
437 writer.check_dependencies()
438 if state['_current_file'] is not None:
439 path, pos = state['_current_file']
441 writer._queued_file = open(path, 'rb')
442 writer._queued_file.seek(pos)
443 except IOError as error:
444 raise errors.StaleWriterStateError(
445 "failed to reopen active file {}: {}".format(path, error))
448 def check_dependencies(self):
449 for path, orig_stat in listitems(self._dependencies):
450 if not S_ISREG(orig_stat[ST_MODE]):
451 raise errors.StaleWriterStateError("{} not file".format(path))
453 now_stat = tuple(os.stat(path))
454 except OSError as error:
455 raise errors.StaleWriterStateError(
456 "failed to stat {}: {}".format(path, error))
457 if ((not S_ISREG(now_stat[ST_MODE])) or
458 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
459 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
460 raise errors.StaleWriterStateError("{} changed".format(path))
462 def dump_state(self, copy_func=lambda x: x):
463 state = {attr: copy_func(getattr(self, attr))
464 for attr in self.STATE_PROPS}
465 if self._queued_file is None:
466 state['_current_file'] = None
468 state['_current_file'] = (os.path.realpath(self._queued_file.name),
469 self._queued_file.tell())
472 def _queue_file(self, source, filename=None):
474 src_path = os.path.realpath(source)
476 raise errors.AssertionError("{} not a file path".format(source))
478 path_stat = os.stat(src_path)
479 except OSError as stat_error:
481 super(ResumableCollectionWriter, self)._queue_file(source, filename)
482 fd_stat = os.fstat(self._queued_file.fileno())
483 if not S_ISREG(fd_stat.st_mode):
484 # We won't be able to resume from this cache anyway, so don't
485 # worry about further checks.
486 self._dependencies[source] = tuple(fd_stat)
487 elif path_stat is None:
488 raise errors.AssertionError(
489 "could not stat {}: {}".format(source, stat_error))
490 elif path_stat.st_ino != fd_stat.st_ino:
491 raise errors.AssertionError(
492 "{} changed between open and stat calls".format(source))
494 self._dependencies[src_path] = tuple(fd_stat)
496 def write(self, data):
497 if self._queued_file is None:
498 raise errors.AssertionError(
499 "resumable writer can't accept unsourced data")
500 return super(ResumableCollectionWriter, self).write(data)
508 COLLECTION = "collection"
510 class RichCollectionBase(CollectionBase):
511 """Base class for Collections and Subcollections.
513 Implements the majority of functionality relating to accessing items in the
518 def __init__(self, parent=None):
520 self._committed = False
521 self._callback = None
525 raise NotImplementedError()
528 raise NotImplementedError()
530 def _my_block_manager(self):
531 raise NotImplementedError()
534 raise NotImplementedError()
536 def root_collection(self):
537 raise NotImplementedError()
539 def notify(self, event, collection, name, item):
540 raise NotImplementedError()
542 def stream_name(self):
543 raise NotImplementedError()
547 def find_or_create(self, path, create_type):
548 """Recursively search the specified file path.
550 May return either a `Collection` or `ArvadosFile`. If not found, will
551 create a new item at the specified path based on `create_type`. Will
552 create intermediate subcollections needed to contain the final item in
556 One of `arvados.collection.FILE` or
557 `arvados.collection.COLLECTION`. If the path is not found, and value
558 of create_type is FILE then create and return a new ArvadosFile for
559 the last path component. If COLLECTION, then create and return a new
560 Collection for the last path component.
564 pathcomponents = path.split("/", 1)
565 if pathcomponents[0]:
566 item = self._items.get(pathcomponents[0])
567 if len(pathcomponents) == 1:
570 if create_type == COLLECTION:
571 item = Subcollection(self, pathcomponents[0])
573 item = ArvadosFile(self, pathcomponents[0])
574 self._items[pathcomponents[0]] = item
575 self.set_committed(False)
576 self.notify(ADD, self, pathcomponents[0], item)
580 # create new collection
581 item = Subcollection(self, pathcomponents[0])
582 self._items[pathcomponents[0]] = item
583 self.set_committed(False)
584 self.notify(ADD, self, pathcomponents[0], item)
585 if isinstance(item, RichCollectionBase):
586 return item.find_or_create(pathcomponents[1], create_type)
588 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
593 def find(self, path):
594 """Recursively search the specified file path.
596 May return either a Collection or ArvadosFile. Return None if not
598 If path is invalid (ex: starts with '/'), an IOError exception will be
603 raise errors.ArgumentError("Parameter 'path' is empty.")
605 pathcomponents = path.split("/", 1)
606 if pathcomponents[0] == '':
607 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
609 item = self._items.get(pathcomponents[0])
612 elif len(pathcomponents) == 1:
615 if isinstance(item, RichCollectionBase):
616 if pathcomponents[1]:
617 return item.find(pathcomponents[1])
621 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
624 def mkdirs(self, path):
625 """Recursive subcollection create.
627 Like `os.makedirs()`. Will create intermediate subcollections needed
628 to contain the leaf subcollection path.
632 if self.find(path) != None:
633 raise IOError(errno.EEXIST, "Directory or file exists", path)
635 return self.find_or_create(path, COLLECTION)
637 def open(self, path, mode="r"):
638 """Open a file-like object for access.
641 path to a file in the collection
643 a string consisting of "r", "w", or "a", optionally followed
644 by "b" or "t", optionally followed by "+".
646 binary mode: write() accepts bytes, read() returns bytes.
648 text mode (default): write() accepts strings, read() returns strings.
652 opens for reading and writing. Reads/writes share a file pointer.
654 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
656 opens for reading and writing. All writes are appended to
657 the end of the file. Writing does not affect the file pointer for
661 if not re.search(r'^[rwa][bt]?\+?$', mode):
662 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
664 if mode[0] == 'r' and '+' not in mode:
665 fclass = ArvadosFileReader
666 arvfile = self.find(path)
667 elif not self.writable():
668 raise IOError(errno.EROFS, "Collection is read only")
670 fclass = ArvadosFileWriter
671 arvfile = self.find_or_create(path, FILE)
674 raise IOError(errno.ENOENT, "File not found", path)
675 if not isinstance(arvfile, ArvadosFile):
676 raise IOError(errno.EISDIR, "Is a directory", path)
681 return fclass(arvfile, mode=mode, num_retries=self.num_retries)
684 """Determine if the collection has been modified since last commited."""
685 return not self.committed()
689 """Determine if the collection has been committed to the API server."""
690 return self._committed
693 def set_committed(self, value=True):
694 """Recursively set committed flag.
696 If value is True, set committed to be True for this and all children.
698 If value is False, set committed to be False for this and all parents.
700 if value == self._committed:
703 for k,v in listitems(self._items):
704 v.set_committed(True)
705 self._committed = True
707 self._committed = False
708 if self.parent is not None:
709 self.parent.set_committed(False)
713 """Iterate over names of files and collections contained in this collection."""
714 return iter(viewkeys(self._items))
717 def __getitem__(self, k):
718 """Get a file or collection that is directly contained by this collection.
720 If you want to search a path, use `find()` instead.
723 return self._items[k]
726 def __contains__(self, k):
727 """Test if there is a file or collection a directly contained by this collection."""
728 return k in self._items
732 """Get the number of items directly contained in this collection."""
733 return len(self._items)
737 def __delitem__(self, p):
738 """Delete an item by name which is directly contained by this collection."""
740 self.set_committed(False)
741 self.notify(DEL, self, p, None)
745 """Get a list of names of files and collections directly contained in this collection."""
746 return self._items.keys()
750 """Get a list of files and collection objects directly contained in this collection."""
751 return listvalues(self._items)
755 """Get a list of (name, object) tuples directly contained in this collection."""
756 return listitems(self._items)
758 def exists(self, path):
759 """Test if there is a file or collection at `path`."""
760 return self.find(path) is not None
764 def remove(self, path, recursive=False):
765 """Remove the file or subcollection (directory) at `path`.
768 Specify whether to remove non-empty subcollections (True), or raise an error (False).
772 raise errors.ArgumentError("Parameter 'path' is empty.")
774 pathcomponents = path.split("/", 1)
775 item = self._items.get(pathcomponents[0])
777 raise IOError(errno.ENOENT, "File not found", path)
778 if len(pathcomponents) == 1:
779 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
780 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
781 deleteditem = self._items[pathcomponents[0]]
782 del self._items[pathcomponents[0]]
783 self.set_committed(False)
784 self.notify(DEL, self, pathcomponents[0], deleteditem)
786 item.remove(pathcomponents[1])
788 def _clonefrom(self, source):
789 for k,v in listitems(source):
790 self._items[k] = v.clone(self, k)
793 raise NotImplementedError()
797 def add(self, source_obj, target_name, overwrite=False, reparent=False):
798 """Copy or move a file or subcollection to this collection.
801 An ArvadosFile, or Subcollection object
804 Destination item name. If the target name already exists and is a
805 file, this will raise an error unless you specify `overwrite=True`.
808 Whether to overwrite target file if it already exists.
811 If True, source_obj will be moved from its parent collection to this collection.
812 If False, source_obj will be copied and the parent collection will be
817 if target_name in self and not overwrite:
818 raise IOError(errno.EEXIST, "File already exists", target_name)
821 if target_name in self:
822 modified_from = self[target_name]
824 # Actually make the move or copy.
826 source_obj._reparent(self, target_name)
829 item = source_obj.clone(self, target_name)
831 self._items[target_name] = item
832 self.set_committed(False)
835 self.notify(MOD, self, target_name, (modified_from, item))
837 self.notify(ADD, self, target_name, item)
839 def _get_src_target(self, source, target_path, source_collection, create_dest):
840 if source_collection is None:
841 source_collection = self
844 if isinstance(source, basestring):
845 source_obj = source_collection.find(source)
846 if source_obj is None:
847 raise IOError(errno.ENOENT, "File not found", source)
848 sourcecomponents = source.split("/")
851 sourcecomponents = None
853 # Find parent collection the target path
854 targetcomponents = target_path.split("/")
856 # Determine the name to use.
857 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
860 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
863 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
865 if len(targetcomponents) > 1:
866 target_dir = self.find("/".join(targetcomponents[0:-1]))
870 if target_dir is None:
871 raise IOError(errno.ENOENT, "Target directory not found", target_name)
873 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
874 target_dir = target_dir[target_name]
875 target_name = sourcecomponents[-1]
877 return (source_obj, target_dir, target_name)
881 def copy(self, source, target_path, source_collection=None, overwrite=False):
882 """Copy a file or subcollection to a new path in this collection.
885 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
888 Destination file or path. If the target path already exists and is a
889 subcollection, the item will be placed inside the subcollection. If
890 the target path already exists and is a file, this will raise an error
891 unless you specify `overwrite=True`.
894 Collection to copy `source_path` from (default `self`)
897 Whether to overwrite target file if it already exists.
900 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
901 target_dir.add(source_obj, target_name, overwrite, False)
905 def rename(self, source, target_path, source_collection=None, overwrite=False):
906 """Move a file or subcollection from `source_collection` to a new path in this collection.
909 A string with a path to source file or subcollection.
912 Destination file or path. If the target path already exists and is a
913 subcollection, the item will be placed inside the subcollection. If
914 the target path already exists and is a file, this will raise an error
915 unless you specify `overwrite=True`.
918 Collection to copy `source_path` from (default `self`)
921 Whether to overwrite target file if it already exists.
924 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
925 if not source_obj.writable():
926 raise IOError(errno.EROFS, "Source collection is read only", source)
927 target_dir.add(source_obj, target_name, overwrite, True)
929 def portable_manifest_text(self, stream_name="."):
930 """Get the manifest text for this collection, sub collections and files.
932 This method does not flush outstanding blocks to Keep. It will return
933 a normalized manifest with access tokens stripped.
936 Name to use for this stream (directory)
939 return self._get_manifest_text(stream_name, True, True)
942 def manifest_text(self, stream_name=".", strip=False, normalize=False,
943 only_committed=False):
944 """Get the manifest text for this collection, sub collections and files.
946 This method will flush outstanding blocks to Keep. By default, it will
947 not normalize an unmodified manifest or strip access tokens.
950 Name to use for this stream (directory)
953 If True, remove signing tokens from block locators if present.
954 If False (default), block locators are left unchanged.
957 If True, always export the manifest text in normalized form
958 even if the Collection is not modified. If False (default) and the collection
959 is not modified, return the original manifest text even if it is not
963 If True, don't commit pending blocks.
967 if not only_committed:
968 self._my_block_manager().commit_all()
969 return self._get_manifest_text(stream_name, strip, normalize,
970 only_committed=only_committed)
973 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
974 """Get the manifest text for this collection, sub collections and files.
977 Name to use for this stream (directory)
980 If True, remove signing tokens from block locators if present.
981 If False (default), block locators are left unchanged.
984 If True, always export the manifest text in normalized form
985 even if the Collection is not modified. If False (default) and the collection
986 is not modified, return the original manifest text even if it is not
990 If True, only include blocks that were already committed to Keep.
994 if not self.committed() or self._manifest_text is None or normalize:
997 sorted_keys = sorted(self.keys())
998 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
999 # Create a stream per file `k`
1000 arvfile = self[filename]
1002 for segment in arvfile.segments():
1003 loc = segment.locator
1004 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1007 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1009 loc = KeepLocator(loc).stripped()
1010 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1011 segment.segment_offset, segment.range_size))
1012 stream[filename] = filestream
1014 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1015 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1016 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
1020 return self.stripped_manifest()
1022 return self._manifest_text
1025 def diff(self, end_collection, prefix=".", holding_collection=None):
1026 """Generate list of add/modify/delete actions.
1028 When given to `apply`, will change `self` to match `end_collection`
1032 if holding_collection is None:
1033 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1035 if k not in end_collection:
1036 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1037 for k in end_collection:
1039 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1040 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1041 elif end_collection[k] != self[k]:
1042 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1044 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1046 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1051 def apply(self, changes):
1052 """Apply changes from `diff`.
1054 If a change conflicts with a local change, it will be saved to an
1055 alternate path indicating the conflict.
1059 self.set_committed(False)
1060 for change in changes:
1061 event_type = change[0]
1064 local = self.find(path)
1065 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1067 if event_type == ADD:
1069 # No local file at path, safe to copy over new file
1070 self.copy(initial, path)
1071 elif local is not None and local != initial:
1072 # There is already local file and it is different:
1073 # save change to conflict file.
1074 self.copy(initial, conflictpath)
1075 elif event_type == MOD or event_type == TOK:
1077 if local == initial:
1078 # Local matches the "initial" item so it has not
1079 # changed locally and is safe to update.
1080 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1081 # Replace contents of local file with new contents
1082 local.replace_contents(final)
1084 # Overwrite path with new item; this can happen if
1085 # path was a file and is now a collection or vice versa
1086 self.copy(final, path, overwrite=True)
1088 # Local is missing (presumably deleted) or local doesn't
1089 # match the "start" value, so save change to conflict file
1090 self.copy(final, conflictpath)
1091 elif event_type == DEL:
1092 if local == initial:
1093 # Local item matches "initial" value, so it is safe to remove.
1094 self.remove(path, recursive=True)
1095 # else, the file is modified or already removed, in either
1096 # case we don't want to try to remove it.
1098 def portable_data_hash(self):
1099 """Get the portable data hash for this collection's manifest."""
1100 if self._manifest_locator and self.committed():
1101 # If the collection is already saved on the API server, and it's committed
1102 # then return API server's PDH response.
1103 return self._portable_data_hash
1105 stripped = self.portable_manifest_text().encode()
1106 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1109 def subscribe(self, callback):
1110 if self._callback is None:
1111 self._callback = callback
1113 raise errors.ArgumentError("A callback is already set on this collection.")
1116 def unsubscribe(self):
1117 if self._callback is not None:
1118 self._callback = None
1121 def notify(self, event, collection, name, item):
1123 self._callback(event, collection, name, item)
1124 self.root_collection().notify(event, collection, name, item)
1127 def __eq__(self, other):
1130 if not isinstance(other, RichCollectionBase):
1132 if len(self._items) != len(other):
1134 for k in self._items:
1137 if self._items[k] != other[k]:
1141 def __ne__(self, other):
1142 return not self.__eq__(other)
1146 """Flush bufferblocks to Keep."""
1147 for e in listvalues(self):
1151 class Collection(RichCollectionBase):
1152 """Represents the root of an Arvados Collection.
1154 This class is threadsafe. The root collection object, all subcollections
1155 and files are protected by a single lock (i.e. each access locks the entire
1161 :To read an existing file:
1162 `c.open("myfile", "r")`
1164 :To write a new file:
1165 `c.open("myfile", "w")`
1167 :To determine if a file exists:
1168 `c.find("myfile") is not None`
1171 `c.copy("source", "dest")`
1174 `c.remove("myfile")`
1176 :To save to an existing collection record:
1179 :To save a new collection record:
1182 :To merge remote changes into this object:
1185 Must be associated with an API server Collection record (during
1186 initialization, or using `save_new`) to use `save` or `update`
1190 def __init__(self, manifest_locator_or_text=None,
1197 replication_desired=None,
1199 """Collection constructor.
1201 :manifest_locator_or_text:
1202 An Arvados collection UUID, portable data hash, raw manifest
1203 text, or (if creating an empty collection) None.
1206 the parent Collection, may be None.
1209 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1210 Prefer this over supplying your own api_client and keep_client (except in testing).
1211 Will use default config settings if not specified.
1214 The API client object to use for requests. If not specified, create one using `apiconfig`.
1217 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1220 the number of retries for API and Keep requests.
1223 the block manager to use. If not specified, create one.
1225 :replication_desired:
1226 How many copies should Arvados maintain. If None, API server default
1227 configuration applies. If not None, this value will also be used
1228 for determining the number of block copies being written.
1231 super(Collection, self).__init__(parent)
1232 self._api_client = api_client
1233 self._keep_client = keep_client
1234 self._block_manager = block_manager
1235 self.replication_desired = replication_desired
1236 self.put_threads = put_threads
1239 self._config = apiconfig
1241 self._config = config.settings()
1243 self.num_retries = num_retries if num_retries is not None else 0
1244 self._manifest_locator = None
1245 self._manifest_text = None
1246 self._portable_data_hash = None
1247 self._api_response = None
1248 self._past_versions = set()
1250 self.lock = threading.RLock()
1253 if manifest_locator_or_text:
1254 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1255 self._manifest_locator = manifest_locator_or_text
1256 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1257 self._manifest_locator = manifest_locator_or_text
1258 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1259 self._manifest_text = manifest_locator_or_text
1261 raise errors.ArgumentError(
1262 "Argument to CollectionReader is not a manifest or a collection UUID")
1266 except (IOError, errors.SyntaxError) as e:
1267 raise errors.ArgumentError("Error processing manifest text: %s", e)
1269 def root_collection(self):
1272 def stream_name(self):
1279 def known_past_version(self, modified_at_and_portable_data_hash):
1280 return modified_at_and_portable_data_hash in self._past_versions
1284 def update(self, other=None, num_retries=None):
1285 """Merge the latest collection on the API server with the current collection."""
1288 if self._manifest_locator is None:
1289 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1290 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1291 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1292 response.get("portable_data_hash") != self.portable_data_hash()):
1293 # The record on the server is different from our current one, but we've seen it before,
1294 # so ignore it because it's already been merged.
1295 # However, if it's the same as our current record, proceed with the update, because we want to update
1299 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1300 other = CollectionReader(response["manifest_text"])
1301 baseline = CollectionReader(self._manifest_text)
1302 self.apply(baseline.diff(other))
1303 self._manifest_text = self.manifest_text()
1307 if self._api_client is None:
1308 self._api_client = ThreadSafeApiCache(self._config)
1309 if self._keep_client is None:
1310 self._keep_client = self._api_client.keep
1311 return self._api_client
1315 if self._keep_client is None:
1316 if self._api_client is None:
1319 self._keep_client = KeepClient(api_client=self._api_client)
1320 return self._keep_client
1323 def _my_block_manager(self):
1324 if self._block_manager is None:
1325 copies = (self.replication_desired or
1326 self._my_api()._rootDesc.get('defaultCollectionReplication',
1328 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1329 return self._block_manager
1331 def _remember_api_response(self, response):
1332 self._api_response = response
1333 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1335 def _populate_from_api_server(self):
1336 # As in KeepClient itself, we must wait until the last
1337 # possible moment to instantiate an API client, in order to
1338 # avoid tripping up clients that don't have access to an API
1339 # server. If we do build one, make sure our Keep client uses
1340 # it. If instantiation fails, we'll fall back to the except
1341 # clause, just like any other Collection lookup
1342 # failure. Return an exception, or None if successful.
1343 self._remember_api_response(self._my_api().collections().get(
1344 uuid=self._manifest_locator).execute(
1345 num_retries=self.num_retries))
1346 self._manifest_text = self._api_response['manifest_text']
1347 self._portable_data_hash = self._api_response['portable_data_hash']
1348 # If not overriden via kwargs, we should try to load the
1349 # replication_desired from the API server
1350 if self.replication_desired is None:
1351 self.replication_desired = self._api_response.get('replication_desired', None)
1353 def _populate(self):
1354 if self._manifest_text is None:
1355 if self._manifest_locator is None:
1358 self._populate_from_api_server()
1359 self._baseline_manifest = self._manifest_text
1360 self._import_manifest(self._manifest_text)
1362 def _has_collection_uuid(self):
1363 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1365 def __enter__(self):
1368 def __exit__(self, exc_type, exc_value, traceback):
1369 """Support scoped auto-commit in a with: block."""
1370 if exc_type is None:
1371 if self.writable() and self._has_collection_uuid():
1375 def stop_threads(self):
1376 if self._block_manager is not None:
1377 self._block_manager.stop_threads()
1380 def manifest_locator(self):
1381 """Get the manifest locator, if any.
1383 The manifest locator will be set when the collection is loaded from an
1384 API server record or the portable data hash of a manifest.
1386 The manifest locator will be None if the collection is newly created or
1387 was created directly from manifest text. The method `save_new()` will
1388 assign a manifest locator.
1391 return self._manifest_locator
1394 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1395 if new_config is None:
1396 new_config = self._config
1398 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1400 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1402 newcollection._clonefrom(self)
1403 return newcollection
1406 def api_response(self):
1407 """Returns information about this Collection fetched from the API server.
1409 If the Collection exists in Keep but not the API server, currently
1410 returns None. Future versions may provide a synthetic response.
1413 return self._api_response
1415 def find_or_create(self, path, create_type):
1416 """See `RichCollectionBase.find_or_create`"""
1420 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1422 def find(self, path):
1423 """See `RichCollectionBase.find`"""
1427 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1429 def remove(self, path, recursive=False):
1430 """See `RichCollectionBase.remove`"""
1432 raise errors.ArgumentError("Cannot remove '.'")
1434 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1439 def save(self, merge=True, num_retries=None):
1440 """Save collection to an existing collection record.
1442 Commit pending buffer blocks to Keep, merge with remote record (if
1443 merge=True, the default), and update the collection record. Returns
1444 the current manifest text.
1446 Will raise AssertionError if not associated with a collection record on
1447 the API server. If you want to save a manifest to Keep only, see
1451 Update and merge remote changes before saving. Otherwise, any
1452 remote changes will be ignored and overwritten.
1455 Retry count on API calls (if None, use the collection default)
1458 if not self.committed():
1459 if not self._has_collection_uuid():
1460 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1462 self._my_block_manager().commit_all()
1467 text = self.manifest_text(strip=False)
1468 self._remember_api_response(self._my_api().collections().update(
1469 uuid=self._manifest_locator,
1470 body={'manifest_text': text}
1472 num_retries=num_retries))
1473 self._manifest_text = self._api_response["manifest_text"]
1474 self._portable_data_hash = self._api_response["portable_data_hash"]
1475 self.set_committed(True)
1477 return self._manifest_text
1483 def save_new(self, name=None,
1484 create_collection_record=True,
1486 ensure_unique_name=False,
1488 """Save collection to a new collection record.
1490 Commit pending buffer blocks to Keep and, when create_collection_record
1491 is True (default), create a new collection record. After creating a
1492 new collection record, this Collection object will be associated with
1493 the new record used by `save()`. Returns the current manifest text.
1496 The collection name.
1498 :create_collection_record:
1499 If True, create a collection record on the API server.
1500 If False, only commit blocks to Keep and return the manifest text.
1503 the user, or project uuid that will own this collection.
1504 If None, defaults to the current user.
1506 :ensure_unique_name:
1507 If True, ask the API server to rename the collection
1508 if it conflicts with a collection with the same name and owner. If
1509 False, a name conflict will result in an error.
1512 Retry count on API calls (if None, use the collection default)
1515 self._my_block_manager().commit_all()
1516 text = self.manifest_text(strip=False)
1518 if create_collection_record:
1520 name = "New collection"
1521 ensure_unique_name = True
1523 body = {"manifest_text": text,
1525 "replication_desired": self.replication_desired}
1527 body["owner_uuid"] = owner_uuid
1529 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1530 text = self._api_response["manifest_text"]
1532 self._manifest_locator = self._api_response["uuid"]
1533 self._portable_data_hash = self._api_response["portable_data_hash"]
1535 self._manifest_text = text
1536 self.set_committed(True)
1540 _token_re = re.compile(r'(\S+)(\s+|$)')
1541 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1542 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1545 def _import_manifest(self, manifest_text):
1546 """Import a manifest into a `Collection`.
1549 The manifest text to import from.
1553 raise ArgumentError("Can only import manifest into an empty collection")
1562 for token_and_separator in self._token_re.finditer(manifest_text):
1563 tok = token_and_separator.group(1)
1564 sep = token_and_separator.group(2)
1566 if state == STREAM_NAME:
1567 # starting a new stream
1568 stream_name = tok.replace('\\040', ' ')
1573 self.find_or_create(stream_name, COLLECTION)
1577 block_locator = self._block_re.match(tok)
1579 blocksize = int(block_locator.group(1))
1580 blocks.append(Range(tok, streamoffset, blocksize, 0))
1581 streamoffset += blocksize
1585 if state == SEGMENTS:
1586 file_segment = self._segment_re.match(tok)
1588 pos = int(file_segment.group(1))
1589 size = int(file_segment.group(2))
1590 name = file_segment.group(3).replace('\\040', ' ')
1591 filepath = os.path.join(stream_name, name)
1592 afile = self.find_or_create(filepath, FILE)
1593 if isinstance(afile, ArvadosFile):
1594 afile.add_segment(blocks, pos, size)
1596 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1599 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1605 self.set_committed(True)
1608 def notify(self, event, collection, name, item):
1610 self._callback(event, collection, name, item)
1613 class Subcollection(RichCollectionBase):
1614 """This is a subdirectory within a collection that doesn't have its own API
1617 Subcollection locking falls under the umbrella lock of its root collection.
1621 def __init__(self, parent, name):
1622 super(Subcollection, self).__init__(parent)
1623 self.lock = self.root_collection().lock
1624 self._manifest_text = None
1626 self.num_retries = parent.num_retries
1628 def root_collection(self):
1629 return self.parent.root_collection()
1632 return self.root_collection().writable()
1635 return self.root_collection()._my_api()
1638 return self.root_collection()._my_keep()
1640 def _my_block_manager(self):
1641 return self.root_collection()._my_block_manager()
1643 def stream_name(self):
1644 return os.path.join(self.parent.stream_name(), self.name)
1647 def clone(self, new_parent, new_name):
1648 c = Subcollection(new_parent, new_name)
1654 def _reparent(self, newparent, newname):
1655 self.set_committed(False)
1657 self.parent.remove(self.name, recursive=True)
1658 self.parent = newparent
1660 self.lock = self.parent.root_collection().lock
1663 class CollectionReader(Collection):
1664 """A read-only collection object.
1666 Initialize from a collection UUID or portable data hash, or raw
1667 manifest text. See `Collection` constructor for detailed options.
1670 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1671 self._in_init = True
1672 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1673 self._in_init = False
1675 # Forego any locking since it should never change once initialized.
1676 self.lock = NoopLock()
1678 # Backwards compatability with old CollectionReader
1679 # all_streams() and all_files()
1680 self._streams = None
1683 return self._in_init
1685 def _populate_streams(orig_func):
1686 @functools.wraps(orig_func)
1687 def populate_streams_wrapper(self, *args, **kwargs):
1688 # Defer populating self._streams until needed since it creates a copy of the manifest.
1689 if self._streams is None:
1690 if self._manifest_text:
1691 self._streams = [sline.split()
1692 for sline in self._manifest_text.split("\n")
1696 return orig_func(self, *args, **kwargs)
1697 return populate_streams_wrapper
1700 def normalize(self):
1701 """Normalize the streams returned by `all_streams`.
1703 This method is kept for backwards compatability and only affects the
1704 behavior of `all_streams()` and `all_files()`
1710 for s in self.all_streams():
1711 for f in s.all_files():
1712 streamname, filename = split(s.name() + "/" + f.name())
1713 if streamname not in streams:
1714 streams[streamname] = {}
1715 if filename not in streams[streamname]:
1716 streams[streamname][filename] = []
1717 for r in f.segments:
1718 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1720 self._streams = [normalize_stream(s, streams[s])
1721 for s in sorted(streams)]
1723 def all_streams(self):
1724 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1725 for s in self._streams]
1728 def all_files(self):
1729 for s in self.all_streams():
1730 for f in s.all_files():