1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
23 from collections import deque
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
35 import arvados.events as events
36 from arvados.retry import retry_method
38 _logger = logging.getLogger('arvados.collection')
40 class CollectionBase(object):
41 """Abstract base class for Collection classes."""
46 def __exit__(self, exc_type, exc_value, traceback):
50 if self._keep_client is None:
51 self._keep_client = KeepClient(api_client=self._api_client,
52 num_retries=self.num_retries)
53 return self._keep_client
55 def stripped_manifest(self):
56 """Get the manifest with locator hints stripped.
58 Return the manifest for the current collection with all
59 non-portable hints (i.e., permission signatures and other
60 hints other than size hints) removed from the locators.
62 raw = self.manifest_text()
64 for line in raw.split("\n"):
67 clean_fields = fields[:1] + [
68 (re.sub(r'\+[^\d][^\+]*', '', x)
69 if re.match(arvados.util.keep_locator_pattern, x)
72 clean += [' '.join(clean_fields), "\n"]
76 class _WriterFile(_FileLikeObjectBase):
77 def __init__(self, coll_writer, name):
78 super(_WriterFile, self).__init__(name, 'wb')
79 self.dest = coll_writer
82 super(_WriterFile, self).close()
83 self.dest.finish_current_file()
85 @_FileLikeObjectBase._before_close
86 def write(self, data):
89 @_FileLikeObjectBase._before_close
90 def writelines(self, seq):
94 @_FileLikeObjectBase._before_close
96 self.dest.flush_data()
99 class CollectionWriter(CollectionBase):
100 """Deprecated, use Collection instead."""
102 def __init__(self, api_client=None, num_retries=0, replication=None):
103 """Instantiate a CollectionWriter.
105 CollectionWriter lets you build a new Arvados Collection from scratch.
106 Write files to it. The CollectionWriter will upload data to Keep as
107 appropriate, and provide you with the Collection manifest text when
111 * api_client: The API client to use to look up Collections. If not
112 provided, CollectionReader will build one from available Arvados
114 * num_retries: The default number of times to retry failed
115 service requests. Default 0. You may change this value
116 after instantiation, but note those changes may not
117 propagate to related objects like the Keep client.
118 * replication: The number of copies of each block to store.
119 If this argument is None or not supplied, replication is
120 the server-provided default if available, otherwise 2.
122 self._api_client = api_client
123 self.num_retries = num_retries
124 self.replication = (2 if replication is None else replication)
125 self._keep_client = None
126 self._data_buffer = []
127 self._data_buffer_len = 0
128 self._current_stream_files = []
129 self._current_stream_length = 0
130 self._current_stream_locators = []
131 self._current_stream_name = '.'
132 self._current_file_name = None
133 self._current_file_pos = 0
134 self._finished_streams = []
135 self._close_file = None
136 self._queued_file = None
137 self._queued_dirents = deque()
138 self._queued_trees = deque()
139 self._last_open = None
141 def __exit__(self, exc_type, exc_value, traceback):
145 def do_queued_work(self):
146 # The work queue consists of three pieces:
147 # * _queued_file: The file object we're currently writing to the
149 # * _queued_dirents: Entries under the current directory
150 # (_queued_trees[0]) that we want to write or recurse through.
151 # This may contain files from subdirectories if
152 # max_manifest_depth == 0 for this directory.
153 # * _queued_trees: Directories that should be written as separate
154 # streams to the Collection.
155 # This function handles the smallest piece of work currently queued
156 # (current file, then current directory, then next directory) until
157 # no work remains. The _work_THING methods each do a unit of work on
158 # THING. _queue_THING methods add a THING to the work queue.
160 if self._queued_file:
162 elif self._queued_dirents:
164 elif self._queued_trees:
169 def _work_file(self):
171 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
175 self.finish_current_file()
177 self._queued_file.close()
178 self._close_file = None
179 self._queued_file = None
181 def _work_dirents(self):
182 path, stream_name, max_manifest_depth = self._queued_trees[0]
183 if stream_name != self.current_stream_name():
184 self.start_new_stream(stream_name)
185 while self._queued_dirents:
186 dirent = self._queued_dirents.popleft()
187 target = os.path.join(path, dirent)
188 if os.path.isdir(target):
189 self._queue_tree(target,
190 os.path.join(stream_name, dirent),
191 max_manifest_depth - 1)
193 self._queue_file(target, dirent)
195 if not self._queued_dirents:
196 self._queued_trees.popleft()
198 def _work_trees(self):
199 path, stream_name, max_manifest_depth = self._queued_trees[0]
200 d = arvados.util.listdir_recursive(
201 path, max_depth = (None if max_manifest_depth == 0 else 0))
203 self._queue_dirents(stream_name, d)
205 self._queued_trees.popleft()
207 def _queue_file(self, source, filename=None):
208 assert (self._queued_file is None), "tried to queue more than one file"
209 if not hasattr(source, 'read'):
210 source = open(source, 'rb')
211 self._close_file = True
213 self._close_file = False
215 filename = os.path.basename(source.name)
216 self.start_new_file(filename)
217 self._queued_file = source
219 def _queue_dirents(self, stream_name, dirents):
220 assert (not self._queued_dirents), "tried to queue more than one tree"
221 self._queued_dirents = deque(sorted(dirents))
223 def _queue_tree(self, path, stream_name, max_manifest_depth):
224 self._queued_trees.append((path, stream_name, max_manifest_depth))
226 def write_file(self, source, filename=None):
227 self._queue_file(source, filename)
228 self.do_queued_work()
230 def write_directory_tree(self,
231 path, stream_name='.', max_manifest_depth=-1):
232 self._queue_tree(path, stream_name, max_manifest_depth)
233 self.do_queued_work()
235 def write(self, newdata):
236 if isinstance(newdata, bytes):
238 elif isinstance(newdata, str):
239 newdata = newdata.encode()
240 elif hasattr(newdata, '__iter__'):
244 self._data_buffer.append(newdata)
245 self._data_buffer_len += len(newdata)
246 self._current_stream_length += len(newdata)
247 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
250 def open(self, streampath, filename=None):
251 """open(streampath[, filename]) -> file-like object
253 Pass in the path of a file to write to the Collection, either as a
254 single string or as two separate stream name and file name arguments.
255 This method returns a file-like object you can write to add it to the
258 You may only have one file object from the Collection open at a time,
259 so be sure to close the object when you're done. Using the object in
260 a with statement makes that easy::
262 with cwriter.open('./doc/page1.txt') as outfile:
263 outfile.write(page1_data)
264 with cwriter.open('./doc/page2.txt') as outfile:
265 outfile.write(page2_data)
268 streampath, filename = split(streampath)
269 if self._last_open and not self._last_open.closed:
270 raise errors.AssertionError(
271 u"can't open '{}' when '{}' is still open".format(
272 filename, self._last_open.name))
273 if streampath != self.current_stream_name():
274 self.start_new_stream(streampath)
275 self.set_current_file_name(filename)
276 self._last_open = _WriterFile(self, filename)
277 return self._last_open
279 def flush_data(self):
280 data_buffer = b''.join(self._data_buffer)
282 self._current_stream_locators.append(
284 data_buffer[0:config.KEEP_BLOCK_SIZE],
285 copies=self.replication))
286 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
287 self._data_buffer_len = len(self._data_buffer[0])
289 def start_new_file(self, newfilename=None):
290 self.finish_current_file()
291 self.set_current_file_name(newfilename)
293 def set_current_file_name(self, newfilename):
294 if re.search(r'[\t\n]', newfilename):
295 raise errors.AssertionError(
296 "Manifest filenames cannot contain whitespace: %s" %
298 elif re.search(r'\x00', newfilename):
299 raise errors.AssertionError(
300 "Manifest filenames cannot contain NUL characters: %s" %
302 self._current_file_name = newfilename
304 def current_file_name(self):
305 return self._current_file_name
307 def finish_current_file(self):
308 if self._current_file_name is None:
309 if self._current_file_pos == self._current_stream_length:
311 raise errors.AssertionError(
312 "Cannot finish an unnamed file " +
313 "(%d bytes at offset %d in '%s' stream)" %
314 (self._current_stream_length - self._current_file_pos,
315 self._current_file_pos,
316 self._current_stream_name))
317 self._current_stream_files.append([
318 self._current_file_pos,
319 self._current_stream_length - self._current_file_pos,
320 self._current_file_name])
321 self._current_file_pos = self._current_stream_length
322 self._current_file_name = None
324 def start_new_stream(self, newstreamname='.'):
325 self.finish_current_stream()
326 self.set_current_stream_name(newstreamname)
328 def set_current_stream_name(self, newstreamname):
329 if re.search(r'[\t\n]', newstreamname):
330 raise errors.AssertionError(
331 "Manifest stream names cannot contain whitespace: '%s'" %
333 self._current_stream_name = '.' if newstreamname=='' else newstreamname
335 def current_stream_name(self):
336 return self._current_stream_name
338 def finish_current_stream(self):
339 self.finish_current_file()
341 if not self._current_stream_files:
343 elif self._current_stream_name is None:
344 raise errors.AssertionError(
345 "Cannot finish an unnamed stream (%d bytes in %d files)" %
346 (self._current_stream_length, len(self._current_stream_files)))
348 if not self._current_stream_locators:
349 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
350 self._finished_streams.append([self._current_stream_name,
351 self._current_stream_locators,
352 self._current_stream_files])
353 self._current_stream_files = []
354 self._current_stream_length = 0
355 self._current_stream_locators = []
356 self._current_stream_name = None
357 self._current_file_pos = 0
358 self._current_file_name = None
361 """Store the manifest in Keep and return its locator.
363 This is useful for storing manifest fragments (task outputs)
364 temporarily in Keep during a Crunch job.
366 In other cases you should make a collection instead, by
367 sending manifest_text() to the API server's "create
368 collection" endpoint.
370 return self._my_keep().put(self.manifest_text().encode(),
371 copies=self.replication)
373 def portable_data_hash(self):
374 stripped = self.stripped_manifest().encode()
375 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
377 def manifest_text(self):
378 self.finish_current_stream()
381 for stream in self._finished_streams:
382 if not re.search(r'^\.(/.*)?$', stream[0]):
384 manifest += stream[0].replace(' ', '\\040')
385 manifest += ' ' + ' '.join(stream[1])
386 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
391 def data_locators(self):
393 for name, locators, files in self._finished_streams:
397 def save_new(self, name=None):
398 return self._api_client.collections().create(
399 ensure_unique_name=True,
402 'manifest_text': self.manifest_text(),
403 }).execute(num_retries=self.num_retries)
406 class ResumableCollectionWriter(CollectionWriter):
407 """Deprecated, use Collection instead."""
409 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
410 '_current_stream_locators', '_current_stream_name',
411 '_current_file_name', '_current_file_pos', '_close_file',
412 '_data_buffer', '_dependencies', '_finished_streams',
413 '_queued_dirents', '_queued_trees']
415 def __init__(self, api_client=None, **kwargs):
416 self._dependencies = {}
417 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
420 def from_state(cls, state, *init_args, **init_kwargs):
421 # Try to build a new writer from scratch with the given state.
422 # If the state is not suitable to resume (because files have changed,
423 # been deleted, aren't predictable, etc.), raise a
424 # StaleWriterStateError. Otherwise, return the initialized writer.
425 # The caller is responsible for calling writer.do_queued_work()
426 # appropriately after it's returned.
427 writer = cls(*init_args, **init_kwargs)
428 for attr_name in cls.STATE_PROPS:
429 attr_value = state[attr_name]
430 attr_class = getattr(writer, attr_name).__class__
431 # Coerce the value into the same type as the initial value, if
433 if attr_class not in (type(None), attr_value.__class__):
434 attr_value = attr_class(attr_value)
435 setattr(writer, attr_name, attr_value)
436 # Check dependencies before we try to resume anything.
437 if any(KeepLocator(ls).permission_expired()
438 for ls in writer._current_stream_locators):
439 raise errors.StaleWriterStateError(
440 "locators include expired permission hint")
441 writer.check_dependencies()
442 if state['_current_file'] is not None:
443 path, pos = state['_current_file']
445 writer._queued_file = open(path, 'rb')
446 writer._queued_file.seek(pos)
447 except IOError as error:
448 raise errors.StaleWriterStateError(
449 u"failed to reopen active file {}: {}".format(path, error))
452 def check_dependencies(self):
453 for path, orig_stat in listitems(self._dependencies):
454 if not S_ISREG(orig_stat[ST_MODE]):
455 raise errors.StaleWriterStateError(u"{} not file".format(path))
457 now_stat = tuple(os.stat(path))
458 except OSError as error:
459 raise errors.StaleWriterStateError(
460 u"failed to stat {}: {}".format(path, error))
461 if ((not S_ISREG(now_stat[ST_MODE])) or
462 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
463 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
464 raise errors.StaleWriterStateError(u"{} changed".format(path))
466 def dump_state(self, copy_func=lambda x: x):
467 state = {attr: copy_func(getattr(self, attr))
468 for attr in self.STATE_PROPS}
469 if self._queued_file is None:
470 state['_current_file'] = None
472 state['_current_file'] = (os.path.realpath(self._queued_file.name),
473 self._queued_file.tell())
476 def _queue_file(self, source, filename=None):
478 src_path = os.path.realpath(source)
480 raise errors.AssertionError(u"{} not a file path".format(source))
482 path_stat = os.stat(src_path)
483 except OSError as stat_error:
485 super(ResumableCollectionWriter, self)._queue_file(source, filename)
486 fd_stat = os.fstat(self._queued_file.fileno())
487 if not S_ISREG(fd_stat.st_mode):
488 # We won't be able to resume from this cache anyway, so don't
489 # worry about further checks.
490 self._dependencies[source] = tuple(fd_stat)
491 elif path_stat is None:
492 raise errors.AssertionError(
493 u"could not stat {}: {}".format(source, stat_error))
494 elif path_stat.st_ino != fd_stat.st_ino:
495 raise errors.AssertionError(
496 u"{} changed between open and stat calls".format(source))
498 self._dependencies[src_path] = tuple(fd_stat)
500 def write(self, data):
501 if self._queued_file is None:
502 raise errors.AssertionError(
503 "resumable writer can't accept unsourced data")
504 return super(ResumableCollectionWriter, self).write(data)
512 COLLECTION = "collection"
514 class RichCollectionBase(CollectionBase):
515 """Base class for Collections and Subcollections.
517 Implements the majority of functionality relating to accessing items in the
522 def __init__(self, parent=None):
524 self._committed = False
525 self._has_remote_blocks = False
526 self._callback = None
530 raise NotImplementedError()
533 raise NotImplementedError()
535 def _my_block_manager(self):
536 raise NotImplementedError()
539 raise NotImplementedError()
541 def root_collection(self):
542 raise NotImplementedError()
544 def notify(self, event, collection, name, item):
545 raise NotImplementedError()
547 def stream_name(self):
548 raise NotImplementedError()
552 def has_remote_blocks(self):
553 """Recursively check for a +R segment locator signature."""
555 if self._has_remote_blocks:
558 if self[item].has_remote_blocks():
563 def set_has_remote_blocks(self, val):
564 self._has_remote_blocks = val
566 self.parent.set_has_remote_blocks(val)
570 def find_or_create(self, path, create_type):
571 """Recursively search the specified file path.
573 May return either a `Collection` or `ArvadosFile`. If not found, will
574 create a new item at the specified path based on `create_type`. Will
575 create intermediate subcollections needed to contain the final item in
579 One of `arvados.collection.FILE` or
580 `arvados.collection.COLLECTION`. If the path is not found, and value
581 of create_type is FILE then create and return a new ArvadosFile for
582 the last path component. If COLLECTION, then create and return a new
583 Collection for the last path component.
587 pathcomponents = path.split("/", 1)
588 if pathcomponents[0]:
589 item = self._items.get(pathcomponents[0])
590 if len(pathcomponents) == 1:
593 if create_type == COLLECTION:
594 item = Subcollection(self, pathcomponents[0])
596 item = ArvadosFile(self, pathcomponents[0])
597 self._items[pathcomponents[0]] = item
598 self.set_committed(False)
599 self.notify(ADD, self, pathcomponents[0], item)
603 # create new collection
604 item = Subcollection(self, pathcomponents[0])
605 self._items[pathcomponents[0]] = item
606 self.set_committed(False)
607 self.notify(ADD, self, pathcomponents[0], item)
608 if isinstance(item, RichCollectionBase):
609 return item.find_or_create(pathcomponents[1], create_type)
611 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
616 def find(self, path):
617 """Recursively search the specified file path.
619 May return either a Collection or ArvadosFile. Return None if not
621 If path is invalid (ex: starts with '/'), an IOError exception will be
626 raise errors.ArgumentError("Parameter 'path' is empty.")
628 pathcomponents = path.split("/", 1)
629 if pathcomponents[0] == '':
630 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
632 item = self._items.get(pathcomponents[0])
635 elif len(pathcomponents) == 1:
638 if isinstance(item, RichCollectionBase):
639 if pathcomponents[1]:
640 return item.find(pathcomponents[1])
644 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
647 def mkdirs(self, path):
648 """Recursive subcollection create.
650 Like `os.makedirs()`. Will create intermediate subcollections needed
651 to contain the leaf subcollection path.
655 if self.find(path) != None:
656 raise IOError(errno.EEXIST, "Directory or file exists", path)
658 return self.find_or_create(path, COLLECTION)
660 def open(self, path, mode="r", encoding=None):
661 """Open a file-like object for access.
664 path to a file in the collection
666 a string consisting of "r", "w", or "a", optionally followed
667 by "b" or "t", optionally followed by "+".
669 binary mode: write() accepts bytes, read() returns bytes.
671 text mode (default): write() accepts strings, read() returns strings.
675 opens for reading and writing. Reads/writes share a file pointer.
677 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
679 opens for reading and writing. All writes are appended to
680 the end of the file. Writing does not affect the file pointer for
685 if not re.search(r'^[rwa][bt]?\+?$', mode):
686 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
688 if mode[0] == 'r' and '+' not in mode:
689 fclass = ArvadosFileReader
690 arvfile = self.find(path)
691 elif not self.writable():
692 raise IOError(errno.EROFS, "Collection is read only")
694 fclass = ArvadosFileWriter
695 arvfile = self.find_or_create(path, FILE)
698 raise IOError(errno.ENOENT, "File not found", path)
699 if not isinstance(arvfile, ArvadosFile):
700 raise IOError(errno.EISDIR, "Is a directory", path)
705 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
706 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
708 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
709 f = io.TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
713 """Determine if the collection has been modified since last commited."""
714 return not self.committed()
718 """Determine if the collection has been committed to the API server."""
719 return self._committed
722 def set_committed(self, value=True):
723 """Recursively set committed flag.
725 If value is True, set committed to be True for this and all children.
727 If value is False, set committed to be False for this and all parents.
729 if value == self._committed:
732 for k,v in listitems(self._items):
733 v.set_committed(True)
734 self._committed = True
736 self._committed = False
737 if self.parent is not None:
738 self.parent.set_committed(False)
742 """Iterate over names of files and collections contained in this collection."""
743 return iter(viewkeys(self._items))
746 def __getitem__(self, k):
747 """Get a file or collection that is directly contained by this collection.
749 If you want to search a path, use `find()` instead.
752 return self._items[k]
755 def __contains__(self, k):
756 """Test if there is a file or collection a directly contained by this collection."""
757 return k in self._items
761 """Get the number of items directly contained in this collection."""
762 return len(self._items)
766 def __delitem__(self, p):
767 """Delete an item by name which is directly contained by this collection."""
769 self.set_committed(False)
770 self.notify(DEL, self, p, None)
774 """Get a list of names of files and collections directly contained in this collection."""
775 return self._items.keys()
779 """Get a list of files and collection objects directly contained in this collection."""
780 return listvalues(self._items)
784 """Get a list of (name, object) tuples directly contained in this collection."""
785 return listitems(self._items)
787 def exists(self, path):
788 """Test if there is a file or collection at `path`."""
789 return self.find(path) is not None
793 def remove(self, path, recursive=False):
794 """Remove the file or subcollection (directory) at `path`.
797 Specify whether to remove non-empty subcollections (True), or raise an error (False).
801 raise errors.ArgumentError("Parameter 'path' is empty.")
803 pathcomponents = path.split("/", 1)
804 item = self._items.get(pathcomponents[0])
806 raise IOError(errno.ENOENT, "File not found", path)
807 if len(pathcomponents) == 1:
808 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
809 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
810 deleteditem = self._items[pathcomponents[0]]
811 del self._items[pathcomponents[0]]
812 self.set_committed(False)
813 self.notify(DEL, self, pathcomponents[0], deleteditem)
815 item.remove(pathcomponents[1], recursive=recursive)
817 def _clonefrom(self, source):
818 for k,v in listitems(source):
819 self._items[k] = v.clone(self, k)
822 raise NotImplementedError()
826 def add(self, source_obj, target_name, overwrite=False, reparent=False):
827 """Copy or move a file or subcollection to this collection.
830 An ArvadosFile, or Subcollection object
833 Destination item name. If the target name already exists and is a
834 file, this will raise an error unless you specify `overwrite=True`.
837 Whether to overwrite target file if it already exists.
840 If True, source_obj will be moved from its parent collection to this collection.
841 If False, source_obj will be copied and the parent collection will be
846 if target_name in self and not overwrite:
847 raise IOError(errno.EEXIST, "File already exists", target_name)
850 if target_name in self:
851 modified_from = self[target_name]
853 # Actually make the move or copy.
855 source_obj._reparent(self, target_name)
858 item = source_obj.clone(self, target_name)
860 self._items[target_name] = item
861 self.set_committed(False)
862 if not self._has_remote_blocks and source_obj.has_remote_blocks():
863 self.set_has_remote_blocks(True)
866 self.notify(MOD, self, target_name, (modified_from, item))
868 self.notify(ADD, self, target_name, item)
870 def _get_src_target(self, source, target_path, source_collection, create_dest):
871 if source_collection is None:
872 source_collection = self
875 if isinstance(source, basestring):
876 source_obj = source_collection.find(source)
877 if source_obj is None:
878 raise IOError(errno.ENOENT, "File not found", source)
879 sourcecomponents = source.split("/")
882 sourcecomponents = None
884 # Find parent collection the target path
885 targetcomponents = target_path.split("/")
887 # Determine the name to use.
888 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
891 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
894 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
896 if len(targetcomponents) > 1:
897 target_dir = self.find("/".join(targetcomponents[0:-1]))
901 if target_dir is None:
902 raise IOError(errno.ENOENT, "Target directory not found", target_name)
904 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
905 target_dir = target_dir[target_name]
906 target_name = sourcecomponents[-1]
908 return (source_obj, target_dir, target_name)
912 def copy(self, source, target_path, source_collection=None, overwrite=False):
913 """Copy a file or subcollection to a new path in this collection.
916 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
919 Destination file or path. If the target path already exists and is a
920 subcollection, the item will be placed inside the subcollection. If
921 the target path already exists and is a file, this will raise an error
922 unless you specify `overwrite=True`.
925 Collection to copy `source_path` from (default `self`)
928 Whether to overwrite target file if it already exists.
931 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
932 target_dir.add(source_obj, target_name, overwrite, False)
936 def rename(self, source, target_path, source_collection=None, overwrite=False):
937 """Move a file or subcollection from `source_collection` to a new path in this collection.
940 A string with a path to source file or subcollection.
943 Destination file or path. If the target path already exists and is a
944 subcollection, the item will be placed inside the subcollection. If
945 the target path already exists and is a file, this will raise an error
946 unless you specify `overwrite=True`.
949 Collection to copy `source_path` from (default `self`)
952 Whether to overwrite target file if it already exists.
955 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
956 if not source_obj.writable():
957 raise IOError(errno.EROFS, "Source collection is read only", source)
958 target_dir.add(source_obj, target_name, overwrite, True)
960 def portable_manifest_text(self, stream_name="."):
961 """Get the manifest text for this collection, sub collections and files.
963 This method does not flush outstanding blocks to Keep. It will return
964 a normalized manifest with access tokens stripped.
967 Name to use for this stream (directory)
970 return self._get_manifest_text(stream_name, True, True)
973 def manifest_text(self, stream_name=".", strip=False, normalize=False,
974 only_committed=False):
975 """Get the manifest text for this collection, sub collections and files.
977 This method will flush outstanding blocks to Keep. By default, it will
978 not normalize an unmodified manifest or strip access tokens.
981 Name to use for this stream (directory)
984 If True, remove signing tokens from block locators if present.
985 If False (default), block locators are left unchanged.
988 If True, always export the manifest text in normalized form
989 even if the Collection is not modified. If False (default) and the collection
990 is not modified, return the original manifest text even if it is not
994 If True, don't commit pending blocks.
998 if not only_committed:
999 self._my_block_manager().commit_all()
1000 return self._get_manifest_text(stream_name, strip, normalize,
1001 only_committed=only_committed)
1004 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1005 """Get the manifest text for this collection, sub collections and files.
1008 Name to use for this stream (directory)
1011 If True, remove signing tokens from block locators if present.
1012 If False (default), block locators are left unchanged.
1015 If True, always export the manifest text in normalized form
1016 even if the Collection is not modified. If False (default) and the collection
1017 is not modified, return the original manifest text even if it is not
1021 If True, only include blocks that were already committed to Keep.
1025 if not self.committed() or self._manifest_text is None or normalize:
1028 sorted_keys = sorted(self.keys())
1029 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1030 # Create a stream per file `k`
1031 arvfile = self[filename]
1033 for segment in arvfile.segments():
1034 loc = segment.locator
1035 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1038 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1040 loc = KeepLocator(loc).stripped()
1041 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1042 segment.segment_offset, segment.range_size))
1043 stream[filename] = filestream
1045 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1046 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1047 buf.append(self[dirname].manifest_text(
1048 stream_name=os.path.join(stream_name, dirname),
1049 strip=strip, normalize=True, only_committed=only_committed))
1053 return self.stripped_manifest()
1055 return self._manifest_text
1058 def _copy_remote_blocks(self, remote_blocks={}):
1059 """Scan through the entire collection and ask Keep to copy remote blocks.
1061 When accessing a remote collection, blocks will have a remote signature
1062 (+R instead of +A). Collect these signatures and request Keep to copy the
1063 blocks to the local cluster, returning local (+A) signatures.
1066 Shared cache of remote to local block mappings. This is used to avoid
1067 doing extra work when blocks are shared by more than one file in
1068 different subdirectories.
1072 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1073 return remote_blocks
1076 def diff(self, end_collection, prefix=".", holding_collection=None):
1077 """Generate list of add/modify/delete actions.
1079 When given to `apply`, will change `self` to match `end_collection`
1083 if holding_collection is None:
1084 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1086 if k not in end_collection:
1087 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1088 for k in end_collection:
1090 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1091 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1092 elif end_collection[k] != self[k]:
1093 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1095 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1097 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1102 def apply(self, changes):
1103 """Apply changes from `diff`.
1105 If a change conflicts with a local change, it will be saved to an
1106 alternate path indicating the conflict.
1110 self.set_committed(False)
1111 for change in changes:
1112 event_type = change[0]
1115 local = self.find(path)
1116 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1118 if event_type == ADD:
1120 # No local file at path, safe to copy over new file
1121 self.copy(initial, path)
1122 elif local is not None and local != initial:
1123 # There is already local file and it is different:
1124 # save change to conflict file.
1125 self.copy(initial, conflictpath)
1126 elif event_type == MOD or event_type == TOK:
1128 if local == initial:
1129 # Local matches the "initial" item so it has not
1130 # changed locally and is safe to update.
1131 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1132 # Replace contents of local file with new contents
1133 local.replace_contents(final)
1135 # Overwrite path with new item; this can happen if
1136 # path was a file and is now a collection or vice versa
1137 self.copy(final, path, overwrite=True)
1139 # Local is missing (presumably deleted) or local doesn't
1140 # match the "start" value, so save change to conflict file
1141 self.copy(final, conflictpath)
1142 elif event_type == DEL:
1143 if local == initial:
1144 # Local item matches "initial" value, so it is safe to remove.
1145 self.remove(path, recursive=True)
1146 # else, the file is modified or already removed, in either
1147 # case we don't want to try to remove it.
1149 def portable_data_hash(self):
1150 """Get the portable data hash for this collection's manifest."""
1151 if self._manifest_locator and self.committed():
1152 # If the collection is already saved on the API server, and it's committed
1153 # then return API server's PDH response.
1154 return self._portable_data_hash
1156 stripped = self.portable_manifest_text().encode()
1157 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1160 def subscribe(self, callback):
1161 if self._callback is None:
1162 self._callback = callback
1164 raise errors.ArgumentError("A callback is already set on this collection.")
1167 def unsubscribe(self):
1168 if self._callback is not None:
1169 self._callback = None
1172 def notify(self, event, collection, name, item):
1174 self._callback(event, collection, name, item)
1175 self.root_collection().notify(event, collection, name, item)
1178 def __eq__(self, other):
1181 if not isinstance(other, RichCollectionBase):
1183 if len(self._items) != len(other):
1185 for k in self._items:
1188 if self._items[k] != other[k]:
1192 def __ne__(self, other):
1193 return not self.__eq__(other)
1197 """Flush bufferblocks to Keep."""
1198 for e in listvalues(self):
1202 class Collection(RichCollectionBase):
1203 """Represents the root of an Arvados Collection.
1205 This class is threadsafe. The root collection object, all subcollections
1206 and files are protected by a single lock (i.e. each access locks the entire
1212 :To read an existing file:
1213 `c.open("myfile", "r")`
1215 :To write a new file:
1216 `c.open("myfile", "w")`
1218 :To determine if a file exists:
1219 `c.find("myfile") is not None`
1222 `c.copy("source", "dest")`
1225 `c.remove("myfile")`
1227 :To save to an existing collection record:
1230 :To save a new collection record:
1233 :To merge remote changes into this object:
1236 Must be associated with an API server Collection record (during
1237 initialization, or using `save_new`) to use `save` or `update`
1241 def __init__(self, manifest_locator_or_text=None,
1248 replication_desired=None,
1249 storage_classes_desired=None,
1251 """Collection constructor.
1253 :manifest_locator_or_text:
1254 An Arvados collection UUID, portable data hash, raw manifest
1255 text, or (if creating an empty collection) None.
1258 the parent Collection, may be None.
1261 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1262 Prefer this over supplying your own api_client and keep_client (except in testing).
1263 Will use default config settings if not specified.
1266 The API client object to use for requests. If not specified, create one using `apiconfig`.
1269 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1272 the number of retries for API and Keep requests.
1275 the block manager to use. If not specified, create one.
1277 :replication_desired:
1278 How many copies should Arvados maintain. If None, API server default
1279 configuration applies. If not None, this value will also be used
1280 for determining the number of block copies being written.
1282 :storage_classes_desired:
1283 A list of storage class names where to upload the data. If None,
1284 the keep client is expected to store the data into the cluster's
1285 default storage class(es).
1289 if storage_classes_desired and type(storage_classes_desired) is not list:
1290 raise errors.ArgumentError("storage_classes_desired must be list type.")
1292 super(Collection, self).__init__(parent)
1293 self._api_client = api_client
1294 self._keep_client = keep_client
1296 # Use the keep client from ThreadSafeApiCache
1297 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1298 self._keep_client = self._api_client.keep
1300 self._block_manager = block_manager
1301 self.replication_desired = replication_desired
1302 self._storage_classes_desired = storage_classes_desired
1303 self.put_threads = put_threads
1306 self._config = apiconfig
1308 self._config = config.settings()
1310 self.num_retries = num_retries
1311 self._manifest_locator = None
1312 self._manifest_text = None
1313 self._portable_data_hash = None
1314 self._api_response = None
1315 self._past_versions = set()
1317 self.lock = threading.RLock()
1320 if manifest_locator_or_text:
1321 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1322 self._manifest_locator = manifest_locator_or_text
1323 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1324 self._manifest_locator = manifest_locator_or_text
1325 if not self._has_local_collection_uuid():
1326 self._has_remote_blocks = True
1327 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1328 self._manifest_text = manifest_locator_or_text
1329 if '+R' in self._manifest_text:
1330 self._has_remote_blocks = True
1332 raise errors.ArgumentError(
1333 "Argument to CollectionReader is not a manifest or a collection UUID")
1337 except errors.SyntaxError as e:
1338 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1340 def storage_classes_desired(self):
1341 return self._storage_classes_desired or []
1343 def root_collection(self):
1346 def get_properties(self):
1347 if self._api_response and self._api_response["properties"]:
1348 return self._api_response["properties"]
1352 def get_trash_at(self):
1353 if self._api_response and self._api_response["trash_at"]:
1355 return ciso8601.parse_datetime(self._api_response["trash_at"])
1361 def stream_name(self):
1368 def known_past_version(self, modified_at_and_portable_data_hash):
1369 return modified_at_and_portable_data_hash in self._past_versions
1373 def update(self, other=None, num_retries=None):
1374 """Merge the latest collection on the API server with the current collection."""
1377 if self._manifest_locator is None:
1378 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1379 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1380 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1381 response.get("portable_data_hash") != self.portable_data_hash()):
1382 # The record on the server is different from our current one, but we've seen it before,
1383 # so ignore it because it's already been merged.
1384 # However, if it's the same as our current record, proceed with the update, because we want to update
1388 self._remember_api_response(response)
1389 other = CollectionReader(response["manifest_text"])
1390 baseline = CollectionReader(self._manifest_text)
1391 self.apply(baseline.diff(other))
1392 self._manifest_text = self.manifest_text()
1396 if self._api_client is None:
1397 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1398 if self._keep_client is None:
1399 self._keep_client = self._api_client.keep
1400 return self._api_client
1404 if self._keep_client is None:
1405 if self._api_client is None:
1408 self._keep_client = KeepClient(api_client=self._api_client)
1409 return self._keep_client
1412 def _my_block_manager(self):
1413 if self._block_manager is None:
1414 copies = (self.replication_desired or
1415 self._my_api()._rootDesc.get('defaultCollectionReplication',
1417 self._block_manager = _BlockManager(self._my_keep(),
1419 put_threads=self.put_threads,
1420 num_retries=self.num_retries,
1421 storage_classes_func=self.storage_classes_desired)
1422 return self._block_manager
1424 def _remember_api_response(self, response):
1425 self._api_response = response
1426 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1428 def _populate_from_api_server(self):
1429 # As in KeepClient itself, we must wait until the last
1430 # possible moment to instantiate an API client, in order to
1431 # avoid tripping up clients that don't have access to an API
1432 # server. If we do build one, make sure our Keep client uses
1433 # it. If instantiation fails, we'll fall back to the except
1434 # clause, just like any other Collection lookup
1435 # failure. Return an exception, or None if successful.
1436 self._remember_api_response(self._my_api().collections().get(
1437 uuid=self._manifest_locator).execute(
1438 num_retries=self.num_retries))
1439 self._manifest_text = self._api_response['manifest_text']
1440 self._portable_data_hash = self._api_response['portable_data_hash']
1441 # If not overriden via kwargs, we should try to load the
1442 # replication_desired and storage_classes_desired from the API server
1443 if self.replication_desired is None:
1444 self.replication_desired = self._api_response.get('replication_desired', None)
1445 if self._storage_classes_desired is None:
1446 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1448 def _populate(self):
1449 if self._manifest_text is None:
1450 if self._manifest_locator is None:
1453 self._populate_from_api_server()
1454 self._baseline_manifest = self._manifest_text
1455 self._import_manifest(self._manifest_text)
1457 def _has_collection_uuid(self):
1458 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1460 def _has_local_collection_uuid(self):
1461 return self._has_collection_uuid and \
1462 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1464 def __enter__(self):
1467 def __exit__(self, exc_type, exc_value, traceback):
1468 """Support scoped auto-commit in a with: block."""
1469 if exc_type is None:
1470 if self.writable() and self._has_collection_uuid():
1474 def stop_threads(self):
1475 if self._block_manager is not None:
1476 self._block_manager.stop_threads()
1479 def manifest_locator(self):
1480 """Get the manifest locator, if any.
1482 The manifest locator will be set when the collection is loaded from an
1483 API server record or the portable data hash of a manifest.
1485 The manifest locator will be None if the collection is newly created or
1486 was created directly from manifest text. The method `save_new()` will
1487 assign a manifest locator.
1490 return self._manifest_locator
1493 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1494 if new_config is None:
1495 new_config = self._config
1497 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1499 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1501 newcollection._clonefrom(self)
1502 return newcollection
1505 def api_response(self):
1506 """Returns information about this Collection fetched from the API server.
1508 If the Collection exists in Keep but not the API server, currently
1509 returns None. Future versions may provide a synthetic response.
1512 return self._api_response
1514 def find_or_create(self, path, create_type):
1515 """See `RichCollectionBase.find_or_create`"""
1519 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1521 def find(self, path):
1522 """See `RichCollectionBase.find`"""
1526 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1528 def remove(self, path, recursive=False):
1529 """See `RichCollectionBase.remove`"""
1531 raise errors.ArgumentError("Cannot remove '.'")
1533 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1540 storage_classes=None,
1544 preserve_version=False):
1545 """Save collection to an existing collection record.
1547 Commit pending buffer blocks to Keep, merge with remote record (if
1548 merge=True, the default), and update the collection record. Returns
1549 the current manifest text.
1551 Will raise AssertionError if not associated with a collection record on
1552 the API server. If you want to save a manifest to Keep only, see
1556 Additional properties of collection. This value will replace any existing
1557 properties of collection.
1560 Specify desirable storage classes to be used when writing data to Keep.
1563 A collection is *expiring* when it has a *trash_at* time in the future.
1564 An expiring collection can be accessed as normal,
1565 but is scheduled to be trashed automatically at the *trash_at* time.
1568 Update and merge remote changes before saving. Otherwise, any
1569 remote changes will be ignored and overwritten.
1572 Retry count on API calls (if None, use the collection default)
1575 If True, indicate that the collection content being saved right now
1576 should be preserved in a version snapshot if the collection record is
1577 updated in the future. Requires that the API server has
1578 Collections.CollectionVersioning enabled, if not, setting this will
1582 if properties and type(properties) is not dict:
1583 raise errors.ArgumentError("properties must be dictionary type.")
1585 if storage_classes and type(storage_classes) is not list:
1586 raise errors.ArgumentError("storage_classes must be list type.")
1588 self._storage_classes_desired = storage_classes
1590 if trash_at and type(trash_at) is not datetime.datetime:
1591 raise errors.ArgumentError("trash_at must be datetime type.")
1593 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1594 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1598 body["properties"] = properties
1599 if self.storage_classes_desired():
1600 body["storage_classes_desired"] = self.storage_classes_desired()
1602 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1603 body["trash_at"] = t
1604 if preserve_version:
1605 body["preserve_version"] = preserve_version
1607 if not self.committed():
1608 if self._has_remote_blocks:
1609 # Copy any remote blocks to the local cluster.
1610 self._copy_remote_blocks(remote_blocks={})
1611 self._has_remote_blocks = False
1612 if not self._has_collection_uuid():
1613 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1614 elif not self._has_local_collection_uuid():
1615 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1617 self._my_block_manager().commit_all()
1622 text = self.manifest_text(strip=False)
1623 body['manifest_text'] = text
1625 self._remember_api_response(self._my_api().collections().update(
1626 uuid=self._manifest_locator,
1628 ).execute(num_retries=num_retries))
1629 self._manifest_text = self._api_response["manifest_text"]
1630 self._portable_data_hash = self._api_response["portable_data_hash"]
1631 self.set_committed(True)
1633 self._remember_api_response(self._my_api().collections().update(
1634 uuid=self._manifest_locator,
1636 ).execute(num_retries=num_retries))
1638 return self._manifest_text
1644 def save_new(self, name=None,
1645 create_collection_record=True,
1648 storage_classes=None,
1650 ensure_unique_name=False,
1652 preserve_version=False):
1653 """Save collection to a new collection record.
1655 Commit pending buffer blocks to Keep and, when create_collection_record
1656 is True (default), create a new collection record. After creating a
1657 new collection record, this Collection object will be associated with
1658 the new record used by `save()`. Returns the current manifest text.
1661 The collection name.
1663 :create_collection_record:
1664 If True, create a collection record on the API server.
1665 If False, only commit blocks to Keep and return the manifest text.
1668 the user, or project uuid that will own this collection.
1669 If None, defaults to the current user.
1672 Additional properties of collection. This value will replace any existing
1673 properties of collection.
1676 Specify desirable storage classes to be used when writing data to Keep.
1679 A collection is *expiring* when it has a *trash_at* time in the future.
1680 An expiring collection can be accessed as normal,
1681 but is scheduled to be trashed automatically at the *trash_at* time.
1683 :ensure_unique_name:
1684 If True, ask the API server to rename the collection
1685 if it conflicts with a collection with the same name and owner. If
1686 False, a name conflict will result in an error.
1689 Retry count on API calls (if None, use the collection default)
1692 If True, indicate that the collection content being saved right now
1693 should be preserved in a version snapshot if the collection record is
1694 updated in the future. Requires that the API server has
1695 Collections.CollectionVersioning enabled, if not, setting this will
1699 if properties and type(properties) is not dict:
1700 raise errors.ArgumentError("properties must be dictionary type.")
1702 if storage_classes and type(storage_classes) is not list:
1703 raise errors.ArgumentError("storage_classes must be list type.")
1705 if trash_at and type(trash_at) is not datetime.datetime:
1706 raise errors.ArgumentError("trash_at must be datetime type.")
1708 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1709 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1711 if self._has_remote_blocks:
1712 # Copy any remote blocks to the local cluster.
1713 self._copy_remote_blocks(remote_blocks={})
1714 self._has_remote_blocks = False
1717 self._storage_classes_desired = storage_classes
1719 self._my_block_manager().commit_all()
1720 text = self.manifest_text(strip=False)
1722 if create_collection_record:
1724 name = "New collection"
1725 ensure_unique_name = True
1727 body = {"manifest_text": text,
1729 "replication_desired": self.replication_desired}
1731 body["owner_uuid"] = owner_uuid
1733 body["properties"] = properties
1734 if self.storage_classes_desired():
1735 body["storage_classes_desired"] = self.storage_classes_desired()
1737 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1738 body["trash_at"] = t
1739 if preserve_version:
1740 body["preserve_version"] = preserve_version
1742 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1743 text = self._api_response["manifest_text"]
1745 self._manifest_locator = self._api_response["uuid"]
1746 self._portable_data_hash = self._api_response["portable_data_hash"]
1748 self._manifest_text = text
1749 self.set_committed(True)
1753 _token_re = re.compile(r'(\S+)(\s+|$)')
1754 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1755 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1757 def _unescape_manifest_path(self, path):
1758 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1761 def _import_manifest(self, manifest_text):
1762 """Import a manifest into a `Collection`.
1765 The manifest text to import from.
1769 raise ArgumentError("Can only import manifest into an empty collection")
1778 for token_and_separator in self._token_re.finditer(manifest_text):
1779 tok = token_and_separator.group(1)
1780 sep = token_and_separator.group(2)
1782 if state == STREAM_NAME:
1783 # starting a new stream
1784 stream_name = self._unescape_manifest_path(tok)
1789 self.find_or_create(stream_name, COLLECTION)
1793 block_locator = self._block_re.match(tok)
1795 blocksize = int(block_locator.group(1))
1796 blocks.append(Range(tok, streamoffset, blocksize, 0))
1797 streamoffset += blocksize
1801 if state == SEGMENTS:
1802 file_segment = self._segment_re.match(tok)
1804 pos = int(file_segment.group(1))
1805 size = int(file_segment.group(2))
1806 name = self._unescape_manifest_path(file_segment.group(3))
1807 if name.split('/')[-1] == '.':
1808 # placeholder for persisting an empty directory, not a real file
1810 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1812 filepath = os.path.join(stream_name, name)
1814 afile = self.find_or_create(filepath, FILE)
1815 except IOError as e:
1816 if e.errno == errno.ENOTDIR:
1817 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1820 if isinstance(afile, ArvadosFile):
1821 afile.add_segment(blocks, pos, size)
1823 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1826 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1832 self.set_committed(True)
1835 def notify(self, event, collection, name, item):
1837 self._callback(event, collection, name, item)
1840 class Subcollection(RichCollectionBase):
1841 """This is a subdirectory within a collection that doesn't have its own API
1844 Subcollection locking falls under the umbrella lock of its root collection.
1848 def __init__(self, parent, name):
1849 super(Subcollection, self).__init__(parent)
1850 self.lock = self.root_collection().lock
1851 self._manifest_text = None
1853 self.num_retries = parent.num_retries
1855 def root_collection(self):
1856 return self.parent.root_collection()
1859 return self.root_collection().writable()
1862 return self.root_collection()._my_api()
1865 return self.root_collection()._my_keep()
1867 def _my_block_manager(self):
1868 return self.root_collection()._my_block_manager()
1870 def stream_name(self):
1871 return os.path.join(self.parent.stream_name(), self.name)
1874 def clone(self, new_parent, new_name):
1875 c = Subcollection(new_parent, new_name)
1881 def _reparent(self, newparent, newname):
1882 self.set_committed(False)
1884 self.parent.remove(self.name, recursive=True)
1885 self.parent = newparent
1887 self.lock = self.parent.root_collection().lock
1890 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1891 """Encode empty directories by using an \056-named (".") empty file"""
1892 if len(self._items) == 0:
1893 return "%s %s 0:0:\\056\n" % (
1894 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1895 return super(Subcollection, self)._get_manifest_text(stream_name,
1900 class CollectionReader(Collection):
1901 """A read-only collection object.
1903 Initialize from a collection UUID or portable data hash, or raw
1904 manifest text. See `Collection` constructor for detailed options.
1907 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1908 self._in_init = True
1909 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1910 self._in_init = False
1912 # Forego any locking since it should never change once initialized.
1913 self.lock = NoopLock()
1915 # Backwards compatability with old CollectionReader
1916 # all_streams() and all_files()
1917 self._streams = None
1920 return self._in_init
1922 def _populate_streams(orig_func):
1923 @functools.wraps(orig_func)
1924 def populate_streams_wrapper(self, *args, **kwargs):
1925 # Defer populating self._streams until needed since it creates a copy of the manifest.
1926 if self._streams is None:
1927 if self._manifest_text:
1928 self._streams = [sline.split()
1929 for sline in self._manifest_text.split("\n")
1933 return orig_func(self, *args, **kwargs)
1934 return populate_streams_wrapper
1937 def normalize(self):
1938 """Normalize the streams returned by `all_streams`.
1940 This method is kept for backwards compatability and only affects the
1941 behavior of `all_streams()` and `all_files()`
1947 for s in self.all_streams():
1948 for f in s.all_files():
1949 streamname, filename = split(s.name() + "/" + f.name())
1950 if streamname not in streams:
1951 streams[streamname] = {}
1952 if filename not in streams[streamname]:
1953 streams[streamname][filename] = []
1954 for r in f.segments:
1955 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1957 self._streams = [normalize_stream(s, streams[s])
1958 for s in sorted(streams)]
1960 def all_streams(self):
1961 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1962 for s in self._streams]
1965 def all_files(self):
1966 for s in self.all_streams():
1967 for f in s.all_files():