1 from __future__ import absolute_import
11 from collections import deque
14 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
15 from .keep import KeepLocator, KeepClient
16 from .stream import StreamReader
17 from ._normalize_stream import normalize_stream
18 from ._ranges import Range, LocatorAndRange
19 from .safeapi import ThreadSafeApiCache
20 import arvados.config as config
21 import arvados.errors as errors
23 import arvados.events as events
24 from arvados.retry import retry_method
26 _logger = logging.getLogger('arvados.collection')
28 class CollectionBase(object):
32 def __exit__(self, exc_type, exc_value, traceback):
36 if self._keep_client is None:
37 self._keep_client = KeepClient(api_client=self._api_client,
38 num_retries=self.num_retries)
39 return self._keep_client
41 def stripped_manifest(self):
42 """Get the manifest with locator hints stripped.
44 Return the manifest for the current collection with all
45 non-portable hints (i.e., permission signatures and other
46 hints other than size hints) removed from the locators.
48 raw = self.manifest_text()
50 for line in raw.split("\n"):
53 clean_fields = fields[:1] + [
54 (re.sub(r'\+[^\d][^\+]*', '', x)
55 if re.match(arvados.util.keep_locator_pattern, x)
58 clean += [' '.join(clean_fields), "\n"]
62 class _WriterFile(_FileLikeObjectBase):
63 def __init__(self, coll_writer, name):
64 super(_WriterFile, self).__init__(name, 'wb')
65 self.dest = coll_writer
68 super(_WriterFile, self).close()
69 self.dest.finish_current_file()
71 @_FileLikeObjectBase._before_close
72 def write(self, data):
75 @_FileLikeObjectBase._before_close
76 def writelines(self, seq):
80 @_FileLikeObjectBase._before_close
82 self.dest.flush_data()
85 class CollectionWriter(CollectionBase):
86 def __init__(self, api_client=None, num_retries=0, replication=None):
87 """Instantiate a CollectionWriter.
89 CollectionWriter lets you build a new Arvados Collection from scratch.
90 Write files to it. The CollectionWriter will upload data to Keep as
91 appropriate, and provide you with the Collection manifest text when
95 * api_client: The API client to use to look up Collections. If not
96 provided, CollectionReader will build one from available Arvados
98 * num_retries: The default number of times to retry failed
99 service requests. Default 0. You may change this value
100 after instantiation, but note those changes may not
101 propagate to related objects like the Keep client.
102 * replication: The number of copies of each block to store.
103 If this argument is None or not supplied, replication is
104 the server-provided default if available, otherwise 2.
106 self._api_client = api_client
107 self.num_retries = num_retries
108 self.replication = (2 if replication is None else replication)
109 self._keep_client = None
110 self._data_buffer = []
111 self._data_buffer_len = 0
112 self._current_stream_files = []
113 self._current_stream_length = 0
114 self._current_stream_locators = []
115 self._current_stream_name = '.'
116 self._current_file_name = None
117 self._current_file_pos = 0
118 self._finished_streams = []
119 self._close_file = None
120 self._queued_file = None
121 self._queued_dirents = deque()
122 self._queued_trees = deque()
123 self._last_open = None
125 def __exit__(self, exc_type, exc_value, traceback):
129 def do_queued_work(self):
130 # The work queue consists of three pieces:
131 # * _queued_file: The file object we're currently writing to the
133 # * _queued_dirents: Entries under the current directory
134 # (_queued_trees[0]) that we want to write or recurse through.
135 # This may contain files from subdirectories if
136 # max_manifest_depth == 0 for this directory.
137 # * _queued_trees: Directories that should be written as separate
138 # streams to the Collection.
139 # This function handles the smallest piece of work currently queued
140 # (current file, then current directory, then next directory) until
141 # no work remains. The _work_THING methods each do a unit of work on
142 # THING. _queue_THING methods add a THING to the work queue.
144 if self._queued_file:
146 elif self._queued_dirents:
148 elif self._queued_trees:
153 def _work_file(self):
155 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
159 self.finish_current_file()
161 self._queued_file.close()
162 self._close_file = None
163 self._queued_file = None
165 def _work_dirents(self):
166 path, stream_name, max_manifest_depth = self._queued_trees[0]
167 if stream_name != self.current_stream_name():
168 self.start_new_stream(stream_name)
169 while self._queued_dirents:
170 dirent = self._queued_dirents.popleft()
171 target = os.path.join(path, dirent)
172 if os.path.isdir(target):
173 self._queue_tree(target,
174 os.path.join(stream_name, dirent),
175 max_manifest_depth - 1)
177 self._queue_file(target, dirent)
179 if not self._queued_dirents:
180 self._queued_trees.popleft()
182 def _work_trees(self):
183 path, stream_name, max_manifest_depth = self._queued_trees[0]
184 d = arvados.util.listdir_recursive(
185 path, max_depth = (None if max_manifest_depth == 0 else 0))
187 self._queue_dirents(stream_name, d)
189 self._queued_trees.popleft()
191 def _queue_file(self, source, filename=None):
192 assert (self._queued_file is None), "tried to queue more than one file"
193 if not hasattr(source, 'read'):
194 source = open(source, 'rb')
195 self._close_file = True
197 self._close_file = False
199 filename = os.path.basename(source.name)
200 self.start_new_file(filename)
201 self._queued_file = source
203 def _queue_dirents(self, stream_name, dirents):
204 assert (not self._queued_dirents), "tried to queue more than one tree"
205 self._queued_dirents = deque(sorted(dirents))
207 def _queue_tree(self, path, stream_name, max_manifest_depth):
208 self._queued_trees.append((path, stream_name, max_manifest_depth))
210 def write_file(self, source, filename=None):
211 self._queue_file(source, filename)
212 self.do_queued_work()
214 def write_directory_tree(self,
215 path, stream_name='.', max_manifest_depth=-1):
216 self._queue_tree(path, stream_name, max_manifest_depth)
217 self.do_queued_work()
219 def write(self, newdata):
220 if hasattr(newdata, '__iter__'):
224 self._data_buffer.append(newdata)
225 self._data_buffer_len += len(newdata)
226 self._current_stream_length += len(newdata)
227 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
230 def open(self, streampath, filename=None):
231 """open(streampath[, filename]) -> file-like object
233 Pass in the path of a file to write to the Collection, either as a
234 single string or as two separate stream name and file name arguments.
235 This method returns a file-like object you can write to add it to the
238 You may only have one file object from the Collection open at a time,
239 so be sure to close the object when you're done. Using the object in
240 a with statement makes that easy::
242 with cwriter.open('./doc/page1.txt') as outfile:
243 outfile.write(page1_data)
244 with cwriter.open('./doc/page2.txt') as outfile:
245 outfile.write(page2_data)
248 streampath, filename = split(streampath)
249 if self._last_open and not self._last_open.closed:
250 raise errors.AssertionError(
251 "can't open '{}' when '{}' is still open".format(
252 filename, self._last_open.name))
253 if streampath != self.current_stream_name():
254 self.start_new_stream(streampath)
255 self.set_current_file_name(filename)
256 self._last_open = _WriterFile(self, filename)
257 return self._last_open
259 def flush_data(self):
260 data_buffer = ''.join(self._data_buffer)
262 self._current_stream_locators.append(
264 data_buffer[0:config.KEEP_BLOCK_SIZE],
265 copies=self.replication))
266 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
267 self._data_buffer_len = len(self._data_buffer[0])
269 def start_new_file(self, newfilename=None):
270 self.finish_current_file()
271 self.set_current_file_name(newfilename)
273 def set_current_file_name(self, newfilename):
274 if re.search(r'[\t\n]', newfilename):
275 raise errors.AssertionError(
276 "Manifest filenames cannot contain whitespace: %s" %
278 elif re.search(r'\x00', newfilename):
279 raise errors.AssertionError(
280 "Manifest filenames cannot contain NUL characters: %s" %
282 self._current_file_name = newfilename
284 def current_file_name(self):
285 return self._current_file_name
287 def finish_current_file(self):
288 if self._current_file_name is None:
289 if self._current_file_pos == self._current_stream_length:
291 raise errors.AssertionError(
292 "Cannot finish an unnamed file " +
293 "(%d bytes at offset %d in '%s' stream)" %
294 (self._current_stream_length - self._current_file_pos,
295 self._current_file_pos,
296 self._current_stream_name))
297 self._current_stream_files.append([
298 self._current_file_pos,
299 self._current_stream_length - self._current_file_pos,
300 self._current_file_name])
301 self._current_file_pos = self._current_stream_length
302 self._current_file_name = None
304 def start_new_stream(self, newstreamname='.'):
305 self.finish_current_stream()
306 self.set_current_stream_name(newstreamname)
308 def set_current_stream_name(self, newstreamname):
309 if re.search(r'[\t\n]', newstreamname):
310 raise errors.AssertionError(
311 "Manifest stream names cannot contain whitespace: '%s'" %
313 self._current_stream_name = '.' if newstreamname=='' else newstreamname
315 def current_stream_name(self):
316 return self._current_stream_name
318 def finish_current_stream(self):
319 self.finish_current_file()
321 if not self._current_stream_files:
323 elif self._current_stream_name is None:
324 raise errors.AssertionError(
325 "Cannot finish an unnamed stream (%d bytes in %d files)" %
326 (self._current_stream_length, len(self._current_stream_files)))
328 if not self._current_stream_locators:
329 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
330 self._finished_streams.append([self._current_stream_name,
331 self._current_stream_locators,
332 self._current_stream_files])
333 self._current_stream_files = []
334 self._current_stream_length = 0
335 self._current_stream_locators = []
336 self._current_stream_name = None
337 self._current_file_pos = 0
338 self._current_file_name = None
341 """Store the manifest in Keep and return its locator.
343 This is useful for storing manifest fragments (task outputs)
344 temporarily in Keep during a Crunch job.
346 In other cases you should make a collection instead, by
347 sending manifest_text() to the API server's "create
348 collection" endpoint.
350 return self._my_keep().put(self.manifest_text(), copies=self.replication)
352 def portable_data_hash(self):
353 stripped = self.stripped_manifest()
354 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
356 def manifest_text(self):
357 self.finish_current_stream()
360 for stream in self._finished_streams:
361 if not re.search(r'^\.(/.*)?$', stream[0]):
363 manifest += stream[0].replace(' ', '\\040')
364 manifest += ' ' + ' '.join(stream[1])
365 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
370 def data_locators(self):
372 for name, locators, files in self._finished_streams:
377 class ResumableCollectionWriter(CollectionWriter):
378 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
379 '_current_stream_locators', '_current_stream_name',
380 '_current_file_name', '_current_file_pos', '_close_file',
381 '_data_buffer', '_dependencies', '_finished_streams',
382 '_queued_dirents', '_queued_trees']
384 def __init__(self, api_client=None, **kwargs):
385 self._dependencies = {}
386 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
389 def from_state(cls, state, *init_args, **init_kwargs):
390 # Try to build a new writer from scratch with the given state.
391 # If the state is not suitable to resume (because files have changed,
392 # been deleted, aren't predictable, etc.), raise a
393 # StaleWriterStateError. Otherwise, return the initialized writer.
394 # The caller is responsible for calling writer.do_queued_work()
395 # appropriately after it's returned.
396 writer = cls(*init_args, **init_kwargs)
397 for attr_name in cls.STATE_PROPS:
398 attr_value = state[attr_name]
399 attr_class = getattr(writer, attr_name).__class__
400 # Coerce the value into the same type as the initial value, if
402 if attr_class not in (type(None), attr_value.__class__):
403 attr_value = attr_class(attr_value)
404 setattr(writer, attr_name, attr_value)
405 # Check dependencies before we try to resume anything.
406 if any(KeepLocator(ls).permission_expired()
407 for ls in writer._current_stream_locators):
408 raise errors.StaleWriterStateError(
409 "locators include expired permission hint")
410 writer.check_dependencies()
411 if state['_current_file'] is not None:
412 path, pos = state['_current_file']
414 writer._queued_file = open(path, 'rb')
415 writer._queued_file.seek(pos)
416 except IOError as error:
417 raise errors.StaleWriterStateError(
418 "failed to reopen active file {}: {}".format(path, error))
421 def check_dependencies(self):
422 for path, orig_stat in self._dependencies.items():
423 if not S_ISREG(orig_stat[ST_MODE]):
424 raise errors.StaleWriterStateError("{} not file".format(path))
426 now_stat = tuple(os.stat(path))
427 except OSError as error:
428 raise errors.StaleWriterStateError(
429 "failed to stat {}: {}".format(path, error))
430 if ((not S_ISREG(now_stat[ST_MODE])) or
431 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
432 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
433 raise errors.StaleWriterStateError("{} changed".format(path))
435 def dump_state(self, copy_func=lambda x: x):
436 state = {attr: copy_func(getattr(self, attr))
437 for attr in self.STATE_PROPS}
438 if self._queued_file is None:
439 state['_current_file'] = None
441 state['_current_file'] = (os.path.realpath(self._queued_file.name),
442 self._queued_file.tell())
445 def _queue_file(self, source, filename=None):
447 src_path = os.path.realpath(source)
449 raise errors.AssertionError("{} not a file path".format(source))
451 path_stat = os.stat(src_path)
452 except OSError as stat_error:
454 super(ResumableCollectionWriter, self)._queue_file(source, filename)
455 fd_stat = os.fstat(self._queued_file.fileno())
456 if not S_ISREG(fd_stat.st_mode):
457 # We won't be able to resume from this cache anyway, so don't
458 # worry about further checks.
459 self._dependencies[source] = tuple(fd_stat)
460 elif path_stat is None:
461 raise errors.AssertionError(
462 "could not stat {}: {}".format(source, stat_error))
463 elif path_stat.st_ino != fd_stat.st_ino:
464 raise errors.AssertionError(
465 "{} changed between open and stat calls".format(source))
467 self._dependencies[src_path] = tuple(fd_stat)
469 def write(self, data):
470 if self._queued_file is None:
471 raise errors.AssertionError(
472 "resumable writer can't accept unsourced data")
473 return super(ResumableCollectionWriter, self).write(data)
481 COLLECTION = "collection"
483 class RichCollectionBase(CollectionBase):
484 """Base class for Collections and Subcollections.
486 Implements the majority of functionality relating to accessing items in the
491 def __init__(self, parent=None):
493 self._committed = False
494 self._callback = None
498 raise NotImplementedError()
501 raise NotImplementedError()
503 def _my_block_manager(self):
504 raise NotImplementedError()
507 raise NotImplementedError()
509 def root_collection(self):
510 raise NotImplementedError()
512 def notify(self, event, collection, name, item):
513 raise NotImplementedError()
515 def stream_name(self):
516 raise NotImplementedError()
520 def find_or_create(self, path, create_type):
521 """Recursively search the specified file path.
523 May return either a `Collection` or `ArvadosFile`. If not found, will
524 create a new item at the specified path based on `create_type`. Will
525 create intermediate subcollections needed to contain the final item in
529 One of `arvados.collection.FILE` or
530 `arvados.collection.COLLECTION`. If the path is not found, and value
531 of create_type is FILE then create and return a new ArvadosFile for
532 the last path component. If COLLECTION, then create and return a new
533 Collection for the last path component.
537 pathcomponents = path.split("/", 1)
538 if pathcomponents[0]:
539 item = self._items.get(pathcomponents[0])
540 if len(pathcomponents) == 1:
543 if create_type == COLLECTION:
544 item = Subcollection(self, pathcomponents[0])
546 item = ArvadosFile(self, pathcomponents[0])
547 self._items[pathcomponents[0]] = item
548 self.set_committed(False)
549 self.notify(ADD, self, pathcomponents[0], item)
553 # create new collection
554 item = Subcollection(self, pathcomponents[0])
555 self._items[pathcomponents[0]] = item
556 self.set_committed(False)
557 self.notify(ADD, self, pathcomponents[0], item)
558 if isinstance(item, RichCollectionBase):
559 return item.find_or_create(pathcomponents[1], create_type)
561 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
566 def find(self, path):
567 """Recursively search the specified file path.
569 May return either a Collection or ArvadosFile. Return None if not
571 If path is invalid (ex: starts with '/'), an IOError exception will be
576 raise errors.ArgumentError("Parameter 'path' is empty.")
578 pathcomponents = path.split("/", 1)
579 if pathcomponents[0] == '':
580 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
582 item = self._items.get(pathcomponents[0])
585 elif len(pathcomponents) == 1:
588 if isinstance(item, RichCollectionBase):
589 if pathcomponents[1]:
590 return item.find(pathcomponents[1])
594 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
597 def mkdirs(self, path):
598 """Recursive subcollection create.
600 Like `os.makedirs()`. Will create intermediate subcollections needed
601 to contain the leaf subcollection path.
605 if self.find(path) != None:
606 raise IOError(errno.EEXIST, "Directory or file exists", path)
608 return self.find_or_create(path, COLLECTION)
610 def open(self, path, mode="r"):
611 """Open a file-like object for access.
614 path to a file in the collection
616 one of "r", "r+", "w", "w+", "a", "a+"
620 opens for reading and writing. Reads/writes share a file pointer.
622 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
624 opens for reading and writing. All writes are appended to
625 the end of the file. Writing does not affect the file pointer for
628 mode = mode.replace("b", "")
629 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
630 raise errors.ArgumentError("Bad mode '%s'" % mode)
631 create = (mode != "r")
633 if create and not self.writable():
634 raise IOError(errno.EROFS, "Collection is read only")
637 arvfile = self.find_or_create(path, FILE)
639 arvfile = self.find(path)
642 raise IOError(errno.ENOENT, "File not found", path)
643 if not isinstance(arvfile, ArvadosFile):
644 raise IOError(errno.EISDIR, "Is a directory", path)
649 name = os.path.basename(path)
652 return ArvadosFileReader(arvfile, num_retries=self.num_retries)
654 return ArvadosFileWriter(arvfile, mode, num_retries=self.num_retries)
657 """Determine if the collection has been modified since last commited."""
658 return not self.committed()
662 """Determine if the collection has been committed to the API server."""
663 return self._committed
666 def set_committed(self, value=True):
667 """Recursively set committed flag.
669 If value is True, set committed to be True for this and all children.
671 If value is False, set committed to be False for this and all parents.
673 if value == self._committed:
676 for k,v in self._items.items():
677 v.set_committed(True)
678 self._committed = True
680 self._committed = False
681 if self.parent is not None:
682 self.parent.set_committed(False)
686 """Iterate over names of files and collections contained in this collection."""
687 return iter(self._items.keys())
690 def __getitem__(self, k):
691 """Get a file or collection that is directly contained by this collection.
693 If you want to search a path, use `find()` instead.
696 return self._items[k]
699 def __contains__(self, k):
700 """Test if there is a file or collection a directly contained by this collection."""
701 return k in self._items
705 """Get the number of items directly contained in this collection."""
706 return len(self._items)
710 def __delitem__(self, p):
711 """Delete an item by name which is directly contained by this collection."""
713 self.set_committed(False)
714 self.notify(DEL, self, p, None)
718 """Get a list of names of files and collections directly contained in this collection."""
719 return self._items.keys()
723 """Get a list of files and collection objects directly contained in this collection."""
724 return self._items.values()
728 """Get a list of (name, object) tuples directly contained in this collection."""
729 return self._items.items()
731 def exists(self, path):
732 """Test if there is a file or collection at `path`."""
733 return self.find(path) is not None
737 def remove(self, path, recursive=False):
738 """Remove the file or subcollection (directory) at `path`.
741 Specify whether to remove non-empty subcollections (True), or raise an error (False).
745 raise errors.ArgumentError("Parameter 'path' is empty.")
747 pathcomponents = path.split("/", 1)
748 item = self._items.get(pathcomponents[0])
750 raise IOError(errno.ENOENT, "File not found", path)
751 if len(pathcomponents) == 1:
752 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
753 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
754 deleteditem = self._items[pathcomponents[0]]
755 del self._items[pathcomponents[0]]
756 self.set_committed(False)
757 self.notify(DEL, self, pathcomponents[0], deleteditem)
759 item.remove(pathcomponents[1])
761 def _clonefrom(self, source):
762 for k,v in source.items():
763 self._items[k] = v.clone(self, k)
766 raise NotImplementedError()
770 def add(self, source_obj, target_name, overwrite=False, reparent=False):
771 """Copy or move a file or subcollection to this collection.
774 An ArvadosFile, or Subcollection object
777 Destination item name. If the target name already exists and is a
778 file, this will raise an error unless you specify `overwrite=True`.
781 Whether to overwrite target file if it already exists.
784 If True, source_obj will be moved from its parent collection to this collection.
785 If False, source_obj will be copied and the parent collection will be
790 if target_name in self and not overwrite:
791 raise IOError(errno.EEXIST, "File already exists", target_name)
794 if target_name in self:
795 modified_from = self[target_name]
797 # Actually make the move or copy.
799 source_obj._reparent(self, target_name)
802 item = source_obj.clone(self, target_name)
804 self._items[target_name] = item
805 self.set_committed(False)
808 self.notify(MOD, self, target_name, (modified_from, item))
810 self.notify(ADD, self, target_name, item)
812 def _get_src_target(self, source, target_path, source_collection, create_dest):
813 if source_collection is None:
814 source_collection = self
817 if isinstance(source, basestring):
818 source_obj = source_collection.find(source)
819 if source_obj is None:
820 raise IOError(errno.ENOENT, "File not found", source)
821 sourcecomponents = source.split("/")
824 sourcecomponents = None
826 # Find parent collection the target path
827 targetcomponents = target_path.split("/")
829 # Determine the name to use.
830 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
833 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
836 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
838 if len(targetcomponents) > 1:
839 target_dir = self.find("/".join(targetcomponents[0:-1]))
843 if target_dir is None:
844 raise IOError(errno.ENOENT, "Target directory not found", target_name)
846 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
847 target_dir = target_dir[target_name]
848 target_name = sourcecomponents[-1]
850 return (source_obj, target_dir, target_name)
854 def copy(self, source, target_path, source_collection=None, overwrite=False):
855 """Copy a file or subcollection to a new path in this collection.
858 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
861 Destination file or path. If the target path already exists and is a
862 subcollection, the item will be placed inside the subcollection. If
863 the target path already exists and is a file, this will raise an error
864 unless you specify `overwrite=True`.
867 Collection to copy `source_path` from (default `self`)
870 Whether to overwrite target file if it already exists.
873 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
874 target_dir.add(source_obj, target_name, overwrite, False)
878 def rename(self, source, target_path, source_collection=None, overwrite=False):
879 """Move a file or subcollection from `source_collection` to a new path in this collection.
882 A string with a path to source file or subcollection.
885 Destination file or path. If the target path already exists and is a
886 subcollection, the item will be placed inside the subcollection. If
887 the target path already exists and is a file, this will raise an error
888 unless you specify `overwrite=True`.
891 Collection to copy `source_path` from (default `self`)
894 Whether to overwrite target file if it already exists.
897 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
898 if not source_obj.writable():
899 raise IOError(errno.EROFS, "Source collection is read only", source)
900 target_dir.add(source_obj, target_name, overwrite, True)
902 def portable_manifest_text(self, stream_name="."):
903 """Get the manifest text for this collection, sub collections and files.
905 This method does not flush outstanding blocks to Keep. It will return
906 a normalized manifest with access tokens stripped.
909 Name to use for this stream (directory)
912 return self._get_manifest_text(stream_name, True, True)
915 def manifest_text(self, stream_name=".", strip=False, normalize=False,
916 only_committed=False):
917 """Get the manifest text for this collection, sub collections and files.
919 This method will flush outstanding blocks to Keep. By default, it will
920 not normalize an unmodified manifest or strip access tokens.
923 Name to use for this stream (directory)
926 If True, remove signing tokens from block locators if present.
927 If False (default), block locators are left unchanged.
930 If True, always export the manifest text in normalized form
931 even if the Collection is not modified. If False (default) and the collection
932 is not modified, return the original manifest text even if it is not
936 If True, don't commit pending blocks.
940 if not only_committed:
941 self._my_block_manager().commit_all()
942 return self._get_manifest_text(stream_name, strip, normalize,
943 only_committed=only_committed)
946 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
947 """Get the manifest text for this collection, sub collections and files.
950 Name to use for this stream (directory)
953 If True, remove signing tokens from block locators if present.
954 If False (default), block locators are left unchanged.
957 If True, always export the manifest text in normalized form
958 even if the Collection is not modified. If False (default) and the collection
959 is not modified, return the original manifest text even if it is not
963 If True, only include blocks that were already committed to Keep.
967 if not self.committed() or self._manifest_text is None or normalize:
970 sorted_keys = sorted(self.keys())
971 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
972 # Create a stream per file `k`
973 arvfile = self[filename]
975 for segment in arvfile.segments():
976 loc = segment.locator
977 if arvfile.parent._my_block_manager().is_bufferblock(loc):
980 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
982 loc = KeepLocator(loc).stripped()
983 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
984 segment.segment_offset, segment.range_size))
985 stream[filename] = filestream
987 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
988 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
989 buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
993 return self.stripped_manifest()
995 return self._manifest_text
998 def diff(self, end_collection, prefix=".", holding_collection=None):
999 """Generate list of add/modify/delete actions.
1001 When given to `apply`, will change `self` to match `end_collection`
1005 if holding_collection is None:
1006 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1008 if k not in end_collection:
1009 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1010 for k in end_collection:
1012 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1013 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1014 elif end_collection[k] != self[k]:
1015 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1017 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1019 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1024 def apply(self, changes):
1025 """Apply changes from `diff`.
1027 If a change conflicts with a local change, it will be saved to an
1028 alternate path indicating the conflict.
1032 self.set_committed(False)
1033 for change in changes:
1034 event_type = change[0]
1037 local = self.find(path)
1038 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1040 if event_type == ADD:
1042 # No local file at path, safe to copy over new file
1043 self.copy(initial, path)
1044 elif local is not None and local != initial:
1045 # There is already local file and it is different:
1046 # save change to conflict file.
1047 self.copy(initial, conflictpath)
1048 elif event_type == MOD or event_type == TOK:
1050 if local == initial:
1051 # Local matches the "initial" item so it has not
1052 # changed locally and is safe to update.
1053 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1054 # Replace contents of local file with new contents
1055 local.replace_contents(final)
1057 # Overwrite path with new item; this can happen if
1058 # path was a file and is now a collection or vice versa
1059 self.copy(final, path, overwrite=True)
1061 # Local is missing (presumably deleted) or local doesn't
1062 # match the "start" value, so save change to conflict file
1063 self.copy(final, conflictpath)
1064 elif event_type == DEL:
1065 if local == initial:
1066 # Local item matches "initial" value, so it is safe to remove.
1067 self.remove(path, recursive=True)
1068 # else, the file is modified or already removed, in either
1069 # case we don't want to try to remove it.
1071 def portable_data_hash(self):
1072 """Get the portable data hash for this collection's manifest."""
1073 if self._manifest_locator and self.committed():
1074 # If the collection is already saved on the API server, and it's committed
1075 # then return API server's PDH response.
1076 return self._portable_data_hash
1078 stripped = self.portable_manifest_text()
1079 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
1082 def subscribe(self, callback):
1083 if self._callback is None:
1084 self._callback = callback
1086 raise errors.ArgumentError("A callback is already set on this collection.")
1089 def unsubscribe(self):
1090 if self._callback is not None:
1091 self._callback = None
1094 def notify(self, event, collection, name, item):
1096 self._callback(event, collection, name, item)
1097 self.root_collection().notify(event, collection, name, item)
1100 def __eq__(self, other):
1103 if not isinstance(other, RichCollectionBase):
1105 if len(self._items) != len(other):
1107 for k in self._items:
1110 if self._items[k] != other[k]:
1114 def __ne__(self, other):
1115 return not self.__eq__(other)
1119 """Flush bufferblocks to Keep."""
1120 for e in self.values():
1124 class Collection(RichCollectionBase):
1125 """Represents the root of an Arvados Collection.
1127 This class is threadsafe. The root collection object, all subcollections
1128 and files are protected by a single lock (i.e. each access locks the entire
1134 :To read an existing file:
1135 `c.open("myfile", "r")`
1137 :To write a new file:
1138 `c.open("myfile", "w")`
1140 :To determine if a file exists:
1141 `c.find("myfile") is not None`
1144 `c.copy("source", "dest")`
1147 `c.remove("myfile")`
1149 :To save to an existing collection record:
1152 :To save a new collection record:
1155 :To merge remote changes into this object:
1158 Must be associated with an API server Collection record (during
1159 initialization, or using `save_new`) to use `save` or `update`
1163 def __init__(self, manifest_locator_or_text=None,
1170 replication_desired=None,
1172 """Collection constructor.
1174 :manifest_locator_or_text:
1175 One of Arvados collection UUID, block locator of
1176 a manifest, raw manifest text, or None (to create an empty collection).
1178 the parent Collection, may be None.
1181 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1182 Prefer this over supplying your own api_client and keep_client (except in testing).
1183 Will use default config settings if not specified.
1186 The API client object to use for requests. If not specified, create one using `apiconfig`.
1189 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1192 the number of retries for API and Keep requests.
1195 the block manager to use. If not specified, create one.
1197 :replication_desired:
1198 How many copies should Arvados maintain. If None, API server default
1199 configuration applies. If not None, this value will also be used
1200 for determining the number of block copies being written.
1203 super(Collection, self).__init__(parent)
1204 self._api_client = api_client
1205 self._keep_client = keep_client
1206 self._block_manager = block_manager
1207 self.replication_desired = replication_desired
1208 self.put_threads = put_threads
1211 self._config = apiconfig
1213 self._config = config.settings()
1215 self.num_retries = num_retries if num_retries is not None else 0
1216 self._manifest_locator = None
1217 self._manifest_text = None
1218 self._portable_data_hash = None
1219 self._api_response = None
1220 self._past_versions = set()
1222 self.lock = threading.RLock()
1225 if manifest_locator_or_text:
1226 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1227 self._manifest_locator = manifest_locator_or_text
1228 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1229 self._manifest_locator = manifest_locator_or_text
1230 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1231 self._manifest_text = manifest_locator_or_text
1233 raise errors.ArgumentError(
1234 "Argument to CollectionReader is not a manifest or a collection UUID")
1238 except (IOError, errors.SyntaxError) as e:
1239 raise errors.ArgumentError("Error processing manifest text: %s", e)
1241 def root_collection(self):
1244 def stream_name(self):
1251 def known_past_version(self, modified_at_and_portable_data_hash):
1252 return modified_at_and_portable_data_hash in self._past_versions
1256 def update(self, other=None, num_retries=None):
1257 """Merge the latest collection on the API server with the current collection."""
1260 if self._manifest_locator is None:
1261 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1262 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1263 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1264 response.get("portable_data_hash") != self.portable_data_hash()):
1265 # The record on the server is different from our current one, but we've seen it before,
1266 # so ignore it because it's already been merged.
1267 # However, if it's the same as our current record, proceed with the update, because we want to update
1271 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1272 other = CollectionReader(response["manifest_text"])
1273 baseline = CollectionReader(self._manifest_text)
1274 self.apply(baseline.diff(other))
1275 self._manifest_text = self.manifest_text()
1279 if self._api_client is None:
1280 self._api_client = ThreadSafeApiCache(self._config)
1281 if self._keep_client is None:
1282 self._keep_client = self._api_client.keep
1283 return self._api_client
1287 if self._keep_client is None:
1288 if self._api_client is None:
1291 self._keep_client = KeepClient(api_client=self._api_client)
1292 return self._keep_client
1295 def _my_block_manager(self):
1296 if self._block_manager is None:
1297 copies = (self.replication_desired or
1298 self._my_api()._rootDesc.get('defaultCollectionReplication',
1300 self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
1301 return self._block_manager
1303 def _remember_api_response(self, response):
1304 self._api_response = response
1305 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1307 def _populate_from_api_server(self):
1308 # As in KeepClient itself, we must wait until the last
1309 # possible moment to instantiate an API client, in order to
1310 # avoid tripping up clients that don't have access to an API
1311 # server. If we do build one, make sure our Keep client uses
1312 # it. If instantiation fails, we'll fall back to the except
1313 # clause, just like any other Collection lookup
1314 # failure. Return an exception, or None if successful.
1316 self._remember_api_response(self._my_api().collections().get(
1317 uuid=self._manifest_locator).execute(
1318 num_retries=self.num_retries))
1319 self._manifest_text = self._api_response['manifest_text']
1320 self._portable_data_hash = self._api_response['portable_data_hash']
1321 # If not overriden via kwargs, we should try to load the
1322 # replication_desired from the API server
1323 if self.replication_desired is None:
1324 self.replication_desired = self._api_response.get('replication_desired', None)
1326 except Exception as e:
1329 def _populate_from_keep(self):
1330 # Retrieve a manifest directly from Keep. This has a chance of
1331 # working if [a] the locator includes a permission signature
1332 # or [b] the Keep services are operating in world-readable
1333 # mode. Return an exception, or None if successful.
1335 self._manifest_text = self._my_keep().get(
1336 self._manifest_locator, num_retries=self.num_retries)
1337 except Exception as e:
1340 def _populate(self):
1341 if self._manifest_locator is None and self._manifest_text is None:
1343 error_via_api = None
1344 error_via_keep = None
1345 should_try_keep = ((self._manifest_text is None) and
1346 arvados.util.keep_locator_pattern.match(
1347 self._manifest_locator))
1348 if ((self._manifest_text is None) and
1349 arvados.util.signed_locator_pattern.match(self._manifest_locator)):
1350 error_via_keep = self._populate_from_keep()
1351 if self._manifest_text is None:
1352 error_via_api = self._populate_from_api_server()
1353 if error_via_api is not None and not should_try_keep:
1355 if ((self._manifest_text is None) and
1356 not error_via_keep and
1358 # Looks like a keep locator, and we didn't already try keep above
1359 error_via_keep = self._populate_from_keep()
1360 if self._manifest_text is None:
1362 raise errors.NotFoundError(
1363 ("Failed to retrieve collection '{}' " +
1364 "from either API server ({}) or Keep ({})."
1366 self._manifest_locator,
1370 self._baseline_manifest = self._manifest_text
1371 self._import_manifest(self._manifest_text)
1374 def _has_collection_uuid(self):
1375 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1377 def __enter__(self):
1380 def __exit__(self, exc_type, exc_value, traceback):
1381 """Support scoped auto-commit in a with: block."""
1382 if exc_type is None:
1383 if self.writable() and self._has_collection_uuid():
1387 def stop_threads(self):
1388 if self._block_manager is not None:
1389 self._block_manager.stop_threads()
1392 def manifest_locator(self):
1393 """Get the manifest locator, if any.
1395 The manifest locator will be set when the collection is loaded from an
1396 API server record or the portable data hash of a manifest.
1398 The manifest locator will be None if the collection is newly created or
1399 was created directly from manifest text. The method `save_new()` will
1400 assign a manifest locator.
1403 return self._manifest_locator
1406 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1407 if new_config is None:
1408 new_config = self._config
1410 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1412 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1414 newcollection._clonefrom(self)
1415 return newcollection
1418 def api_response(self):
1419 """Returns information about this Collection fetched from the API server.
1421 If the Collection exists in Keep but not the API server, currently
1422 returns None. Future versions may provide a synthetic response.
1425 return self._api_response
1427 def find_or_create(self, path, create_type):
1428 """See `RichCollectionBase.find_or_create`"""
1432 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1434 def find(self, path):
1435 """See `RichCollectionBase.find`"""
1439 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1441 def remove(self, path, recursive=False):
1442 """See `RichCollectionBase.remove`"""
1444 raise errors.ArgumentError("Cannot remove '.'")
1446 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1451 def save(self, merge=True, num_retries=None):
1452 """Save collection to an existing collection record.
1454 Commit pending buffer blocks to Keep, merge with remote record (if
1455 merge=True, the default), and update the collection record. Returns
1456 the current manifest text.
1458 Will raise AssertionError if not associated with a collection record on
1459 the API server. If you want to save a manifest to Keep only, see
1463 Update and merge remote changes before saving. Otherwise, any
1464 remote changes will be ignored and overwritten.
1467 Retry count on API calls (if None, use the collection default)
1470 if not self.committed():
1471 if not self._has_collection_uuid():
1472 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1474 self._my_block_manager().commit_all()
1479 text = self.manifest_text(strip=False)
1480 self._remember_api_response(self._my_api().collections().update(
1481 uuid=self._manifest_locator,
1482 body={'manifest_text': text}
1484 num_retries=num_retries))
1485 self._manifest_text = self._api_response["manifest_text"]
1486 self._portable_data_hash = self._api_response["portable_data_hash"]
1487 self.set_committed(True)
1489 return self._manifest_text
1495 def save_new(self, name=None,
1496 create_collection_record=True,
1498 ensure_unique_name=False,
1500 """Save collection to a new collection record.
1502 Commit pending buffer blocks to Keep and, when create_collection_record
1503 is True (default), create a new collection record. After creating a
1504 new collection record, this Collection object will be associated with
1505 the new record used by `save()`. Returns the current manifest text.
1508 The collection name.
1510 :create_collection_record:
1511 If True, create a collection record on the API server.
1512 If False, only commit blocks to Keep and return the manifest text.
1515 the user, or project uuid that will own this collection.
1516 If None, defaults to the current user.
1518 :ensure_unique_name:
1519 If True, ask the API server to rename the collection
1520 if it conflicts with a collection with the same name and owner. If
1521 False, a name conflict will result in an error.
1524 Retry count on API calls (if None, use the collection default)
1527 self._my_block_manager().commit_all()
1528 text = self.manifest_text(strip=False)
1530 if create_collection_record:
1532 name = "New collection"
1533 ensure_unique_name = True
1535 body = {"manifest_text": text,
1537 "replication_desired": self.replication_desired}
1539 body["owner_uuid"] = owner_uuid
1541 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1542 text = self._api_response["manifest_text"]
1544 self._manifest_locator = self._api_response["uuid"]
1545 self._portable_data_hash = self._api_response["portable_data_hash"]
1547 self._manifest_text = text
1548 self.set_committed(True)
1553 def _import_manifest(self, manifest_text):
1554 """Import a manifest into a `Collection`.
1557 The manifest text to import from.
1561 raise ArgumentError("Can only import manifest into an empty collection")
1570 for token_and_separator in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1571 tok = token_and_separator.group(1)
1572 sep = token_and_separator.group(2)
1574 if state == STREAM_NAME:
1575 # starting a new stream
1576 stream_name = tok.replace('\\040', ' ')
1581 self.find_or_create(stream_name, COLLECTION)
1585 block_locator = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1587 blocksize = long(block_locator.group(1))
1588 blocks.append(Range(tok, streamoffset, blocksize, 0))
1589 streamoffset += blocksize
1593 if state == SEGMENTS:
1594 file_segment = re.search(r'^(\d+):(\d+):(\S+)', tok)
1596 pos = long(file_segment.group(1))
1597 size = long(file_segment.group(2))
1598 name = file_segment.group(3).replace('\\040', ' ')
1599 filepath = os.path.join(stream_name, name)
1600 afile = self.find_or_create(filepath, FILE)
1601 if isinstance(afile, ArvadosFile):
1602 afile.add_segment(blocks, pos, size)
1604 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1607 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1613 self.set_committed(True)
1616 def notify(self, event, collection, name, item):
1618 self._callback(event, collection, name, item)
1621 class Subcollection(RichCollectionBase):
1622 """This is a subdirectory within a collection that doesn't have its own API
1625 Subcollection locking falls under the umbrella lock of its root collection.
1629 def __init__(self, parent, name):
1630 super(Subcollection, self).__init__(parent)
1631 self.lock = self.root_collection().lock
1632 self._manifest_text = None
1634 self.num_retries = parent.num_retries
1636 def root_collection(self):
1637 return self.parent.root_collection()
1640 return self.root_collection().writable()
1643 return self.root_collection()._my_api()
1646 return self.root_collection()._my_keep()
1648 def _my_block_manager(self):
1649 return self.root_collection()._my_block_manager()
1651 def stream_name(self):
1652 return os.path.join(self.parent.stream_name(), self.name)
1655 def clone(self, new_parent, new_name):
1656 c = Subcollection(new_parent, new_name)
1662 def _reparent(self, newparent, newname):
1663 self.set_committed(False)
1665 self.parent.remove(self.name, recursive=True)
1666 self.parent = newparent
1668 self.lock = self.parent.root_collection().lock
1671 class CollectionReader(Collection):
1672 """A read-only collection object.
1674 Initialize from an api collection record locator, a portable data hash of a
1675 manifest, or raw manifest text. See `Collection` constructor for detailed
1679 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1680 self._in_init = True
1681 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1682 self._in_init = False
1684 # Forego any locking since it should never change once initialized.
1685 self.lock = NoopLock()
1687 # Backwards compatability with old CollectionReader
1688 # all_streams() and all_files()
1689 self._streams = None
1692 return self._in_init
1694 def _populate_streams(orig_func):
1695 @functools.wraps(orig_func)
1696 def populate_streams_wrapper(self, *args, **kwargs):
1697 # Defer populating self._streams until needed since it creates a copy of the manifest.
1698 if self._streams is None:
1699 if self._manifest_text:
1700 self._streams = [sline.split()
1701 for sline in self._manifest_text.split("\n")
1705 return orig_func(self, *args, **kwargs)
1706 return populate_streams_wrapper
1709 def normalize(self):
1710 """Normalize the streams returned by `all_streams`.
1712 This method is kept for backwards compatability and only affects the
1713 behavior of `all_streams()` and `all_files()`
1719 for s in self.all_streams():
1720 for f in s.all_files():
1721 streamname, filename = split(s.name() + "/" + f.name())
1722 if streamname not in streams:
1723 streams[streamname] = {}
1724 if filename not in streams[streamname]:
1725 streams[streamname][filename] = []
1726 for r in f.segments:
1727 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1729 self._streams = [normalize_stream(s, streams[s])
1730 for s in sorted(streams)]
1732 def all_streams(self):
1733 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1734 for s in self._streams]
1737 def all_files(self):
1738 for s in self.all_streams():
1739 for f in s.all_files():