1 # Copyright (C) The Arvados Authors. All rights reserved.
3 # SPDX-License-Identifier: Apache-2.0
5 from __future__ import absolute_import
6 from future.utils import listitems, listvalues, viewkeys
7 from builtins import str
8 from past.builtins import basestring
9 from builtins import object
23 from collections import deque
26 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
27 from .keep import KeepLocator, KeepClient
28 from .stream import StreamReader
29 from ._normalize_stream import normalize_stream, escape
30 from ._ranges import Range, LocatorAndRange
31 from .safeapi import ThreadSafeApiCache
32 import arvados.config as config
33 import arvados.errors as errors
35 import arvados.events as events
36 from arvados.retry import retry_method
38 _logger = logging.getLogger('arvados.collection')
41 if sys.version_info >= (3, 0):
42 TextIOWrapper = io.TextIOWrapper
44 class TextIOWrapper(io.TextIOWrapper):
45 """To maintain backward compatibility, cast str to unicode in
49 def write(self, data):
50 if isinstance(data, basestring):
52 return super(TextIOWrapper, self).write(data)
55 class CollectionBase(object):
56 """Abstract base class for Collection classes."""
61 def __exit__(self, exc_type, exc_value, traceback):
65 if self._keep_client is None:
66 self._keep_client = KeepClient(api_client=self._api_client,
67 num_retries=self.num_retries)
68 return self._keep_client
70 def stripped_manifest(self):
71 """Get the manifest with locator hints stripped.
73 Return the manifest for the current collection with all
74 non-portable hints (i.e., permission signatures and other
75 hints other than size hints) removed from the locators.
77 raw = self.manifest_text()
79 for line in raw.split("\n"):
82 clean_fields = fields[:1] + [
83 (re.sub(r'\+[^\d][^\+]*', '', x)
84 if re.match(arvados.util.keep_locator_pattern, x)
87 clean += [' '.join(clean_fields), "\n"]
91 class _WriterFile(_FileLikeObjectBase):
92 def __init__(self, coll_writer, name):
93 super(_WriterFile, self).__init__(name, 'wb')
94 self.dest = coll_writer
97 super(_WriterFile, self).close()
98 self.dest.finish_current_file()
100 @_FileLikeObjectBase._before_close
101 def write(self, data):
102 self.dest.write(data)
104 @_FileLikeObjectBase._before_close
105 def writelines(self, seq):
109 @_FileLikeObjectBase._before_close
111 self.dest.flush_data()
114 class CollectionWriter(CollectionBase):
115 """Deprecated, use Collection instead."""
117 def __init__(self, api_client=None, num_retries=0, replication=None):
118 """Instantiate a CollectionWriter.
120 CollectionWriter lets you build a new Arvados Collection from scratch.
121 Write files to it. The CollectionWriter will upload data to Keep as
122 appropriate, and provide you with the Collection manifest text when
126 * api_client: The API client to use to look up Collections. If not
127 provided, CollectionReader will build one from available Arvados
129 * num_retries: The default number of times to retry failed
130 service requests. Default 0. You may change this value
131 after instantiation, but note those changes may not
132 propagate to related objects like the Keep client.
133 * replication: The number of copies of each block to store.
134 If this argument is None or not supplied, replication is
135 the server-provided default if available, otherwise 2.
137 self._api_client = api_client
138 self.num_retries = num_retries
139 self.replication = (2 if replication is None else replication)
140 self._keep_client = None
141 self._data_buffer = []
142 self._data_buffer_len = 0
143 self._current_stream_files = []
144 self._current_stream_length = 0
145 self._current_stream_locators = []
146 self._current_stream_name = '.'
147 self._current_file_name = None
148 self._current_file_pos = 0
149 self._finished_streams = []
150 self._close_file = None
151 self._queued_file = None
152 self._queued_dirents = deque()
153 self._queued_trees = deque()
154 self._last_open = None
156 def __exit__(self, exc_type, exc_value, traceback):
160 def do_queued_work(self):
161 # The work queue consists of three pieces:
162 # * _queued_file: The file object we're currently writing to the
164 # * _queued_dirents: Entries under the current directory
165 # (_queued_trees[0]) that we want to write or recurse through.
166 # This may contain files from subdirectories if
167 # max_manifest_depth == 0 for this directory.
168 # * _queued_trees: Directories that should be written as separate
169 # streams to the Collection.
170 # This function handles the smallest piece of work currently queued
171 # (current file, then current directory, then next directory) until
172 # no work remains. The _work_THING methods each do a unit of work on
173 # THING. _queue_THING methods add a THING to the work queue.
175 if self._queued_file:
177 elif self._queued_dirents:
179 elif self._queued_trees:
184 def _work_file(self):
186 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
190 self.finish_current_file()
192 self._queued_file.close()
193 self._close_file = None
194 self._queued_file = None
196 def _work_dirents(self):
197 path, stream_name, max_manifest_depth = self._queued_trees[0]
198 if stream_name != self.current_stream_name():
199 self.start_new_stream(stream_name)
200 while self._queued_dirents:
201 dirent = self._queued_dirents.popleft()
202 target = os.path.join(path, dirent)
203 if os.path.isdir(target):
204 self._queue_tree(target,
205 os.path.join(stream_name, dirent),
206 max_manifest_depth - 1)
208 self._queue_file(target, dirent)
210 if not self._queued_dirents:
211 self._queued_trees.popleft()
213 def _work_trees(self):
214 path, stream_name, max_manifest_depth = self._queued_trees[0]
215 d = arvados.util.listdir_recursive(
216 path, max_depth = (None if max_manifest_depth == 0 else 0))
218 self._queue_dirents(stream_name, d)
220 self._queued_trees.popleft()
222 def _queue_file(self, source, filename=None):
223 assert (self._queued_file is None), "tried to queue more than one file"
224 if not hasattr(source, 'read'):
225 source = open(source, 'rb')
226 self._close_file = True
228 self._close_file = False
230 filename = os.path.basename(source.name)
231 self.start_new_file(filename)
232 self._queued_file = source
234 def _queue_dirents(self, stream_name, dirents):
235 assert (not self._queued_dirents), "tried to queue more than one tree"
236 self._queued_dirents = deque(sorted(dirents))
238 def _queue_tree(self, path, stream_name, max_manifest_depth):
239 self._queued_trees.append((path, stream_name, max_manifest_depth))
241 def write_file(self, source, filename=None):
242 self._queue_file(source, filename)
243 self.do_queued_work()
245 def write_directory_tree(self,
246 path, stream_name='.', max_manifest_depth=-1):
247 self._queue_tree(path, stream_name, max_manifest_depth)
248 self.do_queued_work()
250 def write(self, newdata):
251 if isinstance(newdata, bytes):
253 elif isinstance(newdata, str):
254 newdata = newdata.encode()
255 elif hasattr(newdata, '__iter__'):
259 self._data_buffer.append(newdata)
260 self._data_buffer_len += len(newdata)
261 self._current_stream_length += len(newdata)
262 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
265 def open(self, streampath, filename=None):
266 """open(streampath[, filename]) -> file-like object
268 Pass in the path of a file to write to the Collection, either as a
269 single string or as two separate stream name and file name arguments.
270 This method returns a file-like object you can write to add it to the
273 You may only have one file object from the Collection open at a time,
274 so be sure to close the object when you're done. Using the object in
275 a with statement makes that easy::
277 with cwriter.open('./doc/page1.txt') as outfile:
278 outfile.write(page1_data)
279 with cwriter.open('./doc/page2.txt') as outfile:
280 outfile.write(page2_data)
283 streampath, filename = split(streampath)
284 if self._last_open and not self._last_open.closed:
285 raise errors.AssertionError(
286 u"can't open '{}' when '{}' is still open".format(
287 filename, self._last_open.name))
288 if streampath != self.current_stream_name():
289 self.start_new_stream(streampath)
290 self.set_current_file_name(filename)
291 self._last_open = _WriterFile(self, filename)
292 return self._last_open
294 def flush_data(self):
295 data_buffer = b''.join(self._data_buffer)
297 self._current_stream_locators.append(
299 data_buffer[0:config.KEEP_BLOCK_SIZE],
300 copies=self.replication))
301 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
302 self._data_buffer_len = len(self._data_buffer[0])
304 def start_new_file(self, newfilename=None):
305 self.finish_current_file()
306 self.set_current_file_name(newfilename)
308 def set_current_file_name(self, newfilename):
309 if re.search(r'[\t\n]', newfilename):
310 raise errors.AssertionError(
311 "Manifest filenames cannot contain whitespace: %s" %
313 elif re.search(r'\x00', newfilename):
314 raise errors.AssertionError(
315 "Manifest filenames cannot contain NUL characters: %s" %
317 self._current_file_name = newfilename
319 def current_file_name(self):
320 return self._current_file_name
322 def finish_current_file(self):
323 if self._current_file_name is None:
324 if self._current_file_pos == self._current_stream_length:
326 raise errors.AssertionError(
327 "Cannot finish an unnamed file " +
328 "(%d bytes at offset %d in '%s' stream)" %
329 (self._current_stream_length - self._current_file_pos,
330 self._current_file_pos,
331 self._current_stream_name))
332 self._current_stream_files.append([
333 self._current_file_pos,
334 self._current_stream_length - self._current_file_pos,
335 self._current_file_name])
336 self._current_file_pos = self._current_stream_length
337 self._current_file_name = None
339 def start_new_stream(self, newstreamname='.'):
340 self.finish_current_stream()
341 self.set_current_stream_name(newstreamname)
343 def set_current_stream_name(self, newstreamname):
344 if re.search(r'[\t\n]', newstreamname):
345 raise errors.AssertionError(
346 "Manifest stream names cannot contain whitespace: '%s'" %
348 self._current_stream_name = '.' if newstreamname=='' else newstreamname
350 def current_stream_name(self):
351 return self._current_stream_name
353 def finish_current_stream(self):
354 self.finish_current_file()
356 if not self._current_stream_files:
358 elif self._current_stream_name is None:
359 raise errors.AssertionError(
360 "Cannot finish an unnamed stream (%d bytes in %d files)" %
361 (self._current_stream_length, len(self._current_stream_files)))
363 if not self._current_stream_locators:
364 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
365 self._finished_streams.append([self._current_stream_name,
366 self._current_stream_locators,
367 self._current_stream_files])
368 self._current_stream_files = []
369 self._current_stream_length = 0
370 self._current_stream_locators = []
371 self._current_stream_name = None
372 self._current_file_pos = 0
373 self._current_file_name = None
376 """Store the manifest in Keep and return its locator.
378 This is useful for storing manifest fragments (task outputs)
379 temporarily in Keep during a Crunch job.
381 In other cases you should make a collection instead, by
382 sending manifest_text() to the API server's "create
383 collection" endpoint.
385 return self._my_keep().put(self.manifest_text().encode(),
386 copies=self.replication)
388 def portable_data_hash(self):
389 stripped = self.stripped_manifest().encode()
390 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
392 def manifest_text(self):
393 self.finish_current_stream()
396 for stream in self._finished_streams:
397 if not re.search(r'^\.(/.*)?$', stream[0]):
399 manifest += stream[0].replace(' ', '\\040')
400 manifest += ' ' + ' '.join(stream[1])
401 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
406 def data_locators(self):
408 for name, locators, files in self._finished_streams:
412 def save_new(self, name=None):
413 return self._api_client.collections().create(
414 ensure_unique_name=True,
417 'manifest_text': self.manifest_text(),
418 }).execute(num_retries=self.num_retries)
421 class ResumableCollectionWriter(CollectionWriter):
422 """Deprecated, use Collection instead."""
424 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425 '_current_stream_locators', '_current_stream_name',
426 '_current_file_name', '_current_file_pos', '_close_file',
427 '_data_buffer', '_dependencies', '_finished_streams',
428 '_queued_dirents', '_queued_trees']
430 def __init__(self, api_client=None, **kwargs):
431 self._dependencies = {}
432 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
435 def from_state(cls, state, *init_args, **init_kwargs):
436 # Try to build a new writer from scratch with the given state.
437 # If the state is not suitable to resume (because files have changed,
438 # been deleted, aren't predictable, etc.), raise a
439 # StaleWriterStateError. Otherwise, return the initialized writer.
440 # The caller is responsible for calling writer.do_queued_work()
441 # appropriately after it's returned.
442 writer = cls(*init_args, **init_kwargs)
443 for attr_name in cls.STATE_PROPS:
444 attr_value = state[attr_name]
445 attr_class = getattr(writer, attr_name).__class__
446 # Coerce the value into the same type as the initial value, if
448 if attr_class not in (type(None), attr_value.__class__):
449 attr_value = attr_class(attr_value)
450 setattr(writer, attr_name, attr_value)
451 # Check dependencies before we try to resume anything.
452 if any(KeepLocator(ls).permission_expired()
453 for ls in writer._current_stream_locators):
454 raise errors.StaleWriterStateError(
455 "locators include expired permission hint")
456 writer.check_dependencies()
457 if state['_current_file'] is not None:
458 path, pos = state['_current_file']
460 writer._queued_file = open(path, 'rb')
461 writer._queued_file.seek(pos)
462 except IOError as error:
463 raise errors.StaleWriterStateError(
464 u"failed to reopen active file {}: {}".format(path, error))
467 def check_dependencies(self):
468 for path, orig_stat in listitems(self._dependencies):
469 if not S_ISREG(orig_stat[ST_MODE]):
470 raise errors.StaleWriterStateError(u"{} not file".format(path))
472 now_stat = tuple(os.stat(path))
473 except OSError as error:
474 raise errors.StaleWriterStateError(
475 u"failed to stat {}: {}".format(path, error))
476 if ((not S_ISREG(now_stat[ST_MODE])) or
477 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479 raise errors.StaleWriterStateError(u"{} changed".format(path))
481 def dump_state(self, copy_func=lambda x: x):
482 state = {attr: copy_func(getattr(self, attr))
483 for attr in self.STATE_PROPS}
484 if self._queued_file is None:
485 state['_current_file'] = None
487 state['_current_file'] = (os.path.realpath(self._queued_file.name),
488 self._queued_file.tell())
491 def _queue_file(self, source, filename=None):
493 src_path = os.path.realpath(source)
495 raise errors.AssertionError(u"{} not a file path".format(source))
497 path_stat = os.stat(src_path)
498 except OSError as stat_error:
500 super(ResumableCollectionWriter, self)._queue_file(source, filename)
501 fd_stat = os.fstat(self._queued_file.fileno())
502 if not S_ISREG(fd_stat.st_mode):
503 # We won't be able to resume from this cache anyway, so don't
504 # worry about further checks.
505 self._dependencies[source] = tuple(fd_stat)
506 elif path_stat is None:
507 raise errors.AssertionError(
508 u"could not stat {}: {}".format(source, stat_error))
509 elif path_stat.st_ino != fd_stat.st_ino:
510 raise errors.AssertionError(
511 u"{} changed between open and stat calls".format(source))
513 self._dependencies[src_path] = tuple(fd_stat)
515 def write(self, data):
516 if self._queued_file is None:
517 raise errors.AssertionError(
518 "resumable writer can't accept unsourced data")
519 return super(ResumableCollectionWriter, self).write(data)
527 COLLECTION = "collection"
529 class RichCollectionBase(CollectionBase):
530 """Base class for Collections and Subcollections.
532 Implements the majority of functionality relating to accessing items in the
537 def __init__(self, parent=None):
539 self._committed = False
540 self._has_remote_blocks = False
541 self._callback = None
545 raise NotImplementedError()
548 raise NotImplementedError()
550 def _my_block_manager(self):
551 raise NotImplementedError()
554 raise NotImplementedError()
556 def root_collection(self):
557 raise NotImplementedError()
559 def notify(self, event, collection, name, item):
560 raise NotImplementedError()
562 def stream_name(self):
563 raise NotImplementedError()
567 def has_remote_blocks(self):
568 """Recursively check for a +R segment locator signature."""
570 if self._has_remote_blocks:
573 if self[item].has_remote_blocks():
578 def set_has_remote_blocks(self, val):
579 self._has_remote_blocks = val
581 self.parent.set_has_remote_blocks(val)
585 def find_or_create(self, path, create_type):
586 """Recursively search the specified file path.
588 May return either a `Collection` or `ArvadosFile`. If not found, will
589 create a new item at the specified path based on `create_type`. Will
590 create intermediate subcollections needed to contain the final item in
594 One of `arvados.collection.FILE` or
595 `arvados.collection.COLLECTION`. If the path is not found, and value
596 of create_type is FILE then create and return a new ArvadosFile for
597 the last path component. If COLLECTION, then create and return a new
598 Collection for the last path component.
602 pathcomponents = path.split("/", 1)
603 if pathcomponents[0]:
604 item = self._items.get(pathcomponents[0])
605 if len(pathcomponents) == 1:
608 if create_type == COLLECTION:
609 item = Subcollection(self, pathcomponents[0])
611 item = ArvadosFile(self, pathcomponents[0])
612 self._items[pathcomponents[0]] = item
613 self.set_committed(False)
614 self.notify(ADD, self, pathcomponents[0], item)
618 # create new collection
619 item = Subcollection(self, pathcomponents[0])
620 self._items[pathcomponents[0]] = item
621 self.set_committed(False)
622 self.notify(ADD, self, pathcomponents[0], item)
623 if isinstance(item, RichCollectionBase):
624 return item.find_or_create(pathcomponents[1], create_type)
626 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
631 def find(self, path):
632 """Recursively search the specified file path.
634 May return either a Collection or ArvadosFile. Return None if not
636 If path is invalid (ex: starts with '/'), an IOError exception will be
641 raise errors.ArgumentError("Parameter 'path' is empty.")
643 pathcomponents = path.split("/", 1)
644 if pathcomponents[0] == '':
645 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
647 item = self._items.get(pathcomponents[0])
650 elif len(pathcomponents) == 1:
653 if isinstance(item, RichCollectionBase):
654 if pathcomponents[1]:
655 return item.find(pathcomponents[1])
659 raise IOError(errno.ENOTDIR, "Not a directory", pathcomponents[0])
662 def mkdirs(self, path):
663 """Recursive subcollection create.
665 Like `os.makedirs()`. Will create intermediate subcollections needed
666 to contain the leaf subcollection path.
670 if self.find(path) != None:
671 raise IOError(errno.EEXIST, "Directory or file exists", path)
673 return self.find_or_create(path, COLLECTION)
675 def open(self, path, mode="r", encoding=None):
676 """Open a file-like object for access.
679 path to a file in the collection
681 a string consisting of "r", "w", or "a", optionally followed
682 by "b" or "t", optionally followed by "+".
684 binary mode: write() accepts bytes, read() returns bytes.
686 text mode (default): write() accepts strings, read() returns strings.
690 opens for reading and writing. Reads/writes share a file pointer.
692 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
694 opens for reading and writing. All writes are appended to
695 the end of the file. Writing does not affect the file pointer for
700 if not re.search(r'^[rwa][bt]?\+?$', mode):
701 raise errors.ArgumentError("Invalid mode {!r}".format(mode))
703 if mode[0] == 'r' and '+' not in mode:
704 fclass = ArvadosFileReader
705 arvfile = self.find(path)
706 elif not self.writable():
707 raise IOError(errno.EROFS, "Collection is read only")
709 fclass = ArvadosFileWriter
710 arvfile = self.find_or_create(path, FILE)
713 raise IOError(errno.ENOENT, "File not found", path)
714 if not isinstance(arvfile, ArvadosFile):
715 raise IOError(errno.EISDIR, "Is a directory", path)
720 binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
721 f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
723 bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
724 f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
728 """Determine if the collection has been modified since last commited."""
729 return not self.committed()
733 """Determine if the collection has been committed to the API server."""
734 return self._committed
737 def set_committed(self, value=True):
738 """Recursively set committed flag.
740 If value is True, set committed to be True for this and all children.
742 If value is False, set committed to be False for this and all parents.
744 if value == self._committed:
747 for k,v in listitems(self._items):
748 v.set_committed(True)
749 self._committed = True
751 self._committed = False
752 if self.parent is not None:
753 self.parent.set_committed(False)
757 """Iterate over names of files and collections contained in this collection."""
758 return iter(viewkeys(self._items))
761 def __getitem__(self, k):
762 """Get a file or collection that is directly contained by this collection.
764 If you want to search a path, use `find()` instead.
767 return self._items[k]
770 def __contains__(self, k):
771 """Test if there is a file or collection a directly contained by this collection."""
772 return k in self._items
776 """Get the number of items directly contained in this collection."""
777 return len(self._items)
781 def __delitem__(self, p):
782 """Delete an item by name which is directly contained by this collection."""
784 self.set_committed(False)
785 self.notify(DEL, self, p, None)
789 """Get a list of names of files and collections directly contained in this collection."""
790 return self._items.keys()
794 """Get a list of files and collection objects directly contained in this collection."""
795 return listvalues(self._items)
799 """Get a list of (name, object) tuples directly contained in this collection."""
800 return listitems(self._items)
802 def exists(self, path):
803 """Test if there is a file or collection at `path`."""
804 return self.find(path) is not None
808 def remove(self, path, recursive=False):
809 """Remove the file or subcollection (directory) at `path`.
812 Specify whether to remove non-empty subcollections (True), or raise an error (False).
816 raise errors.ArgumentError("Parameter 'path' is empty.")
818 pathcomponents = path.split("/", 1)
819 item = self._items.get(pathcomponents[0])
821 raise IOError(errno.ENOENT, "File not found", path)
822 if len(pathcomponents) == 1:
823 if isinstance(self._items[pathcomponents[0]], RichCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
824 raise IOError(errno.ENOTEMPTY, "Directory not empty", path)
825 deleteditem = self._items[pathcomponents[0]]
826 del self._items[pathcomponents[0]]
827 self.set_committed(False)
828 self.notify(DEL, self, pathcomponents[0], deleteditem)
830 item.remove(pathcomponents[1], recursive=recursive)
832 def _clonefrom(self, source):
833 for k,v in listitems(source):
834 self._items[k] = v.clone(self, k)
837 raise NotImplementedError()
841 def add(self, source_obj, target_name, overwrite=False, reparent=False):
842 """Copy or move a file or subcollection to this collection.
845 An ArvadosFile, or Subcollection object
848 Destination item name. If the target name already exists and is a
849 file, this will raise an error unless you specify `overwrite=True`.
852 Whether to overwrite target file if it already exists.
855 If True, source_obj will be moved from its parent collection to this collection.
856 If False, source_obj will be copied and the parent collection will be
861 if target_name in self and not overwrite:
862 raise IOError(errno.EEXIST, "File already exists", target_name)
865 if target_name in self:
866 modified_from = self[target_name]
868 # Actually make the move or copy.
870 source_obj._reparent(self, target_name)
873 item = source_obj.clone(self, target_name)
875 self._items[target_name] = item
876 self.set_committed(False)
877 if not self._has_remote_blocks and source_obj.has_remote_blocks():
878 self.set_has_remote_blocks(True)
881 self.notify(MOD, self, target_name, (modified_from, item))
883 self.notify(ADD, self, target_name, item)
885 def _get_src_target(self, source, target_path, source_collection, create_dest):
886 if source_collection is None:
887 source_collection = self
890 if isinstance(source, basestring):
891 source_obj = source_collection.find(source)
892 if source_obj is None:
893 raise IOError(errno.ENOENT, "File not found", source)
894 sourcecomponents = source.split("/")
897 sourcecomponents = None
899 # Find parent collection the target path
900 targetcomponents = target_path.split("/")
902 # Determine the name to use.
903 target_name = targetcomponents[-1] if targetcomponents[-1] else sourcecomponents[-1]
906 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
909 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
911 if len(targetcomponents) > 1:
912 target_dir = self.find("/".join(targetcomponents[0:-1]))
916 if target_dir is None:
917 raise IOError(errno.ENOENT, "Target directory not found", target_name)
919 if target_name in target_dir and isinstance(target_dir[target_name], RichCollectionBase) and sourcecomponents:
920 target_dir = target_dir[target_name]
921 target_name = sourcecomponents[-1]
923 return (source_obj, target_dir, target_name)
927 def copy(self, source, target_path, source_collection=None, overwrite=False):
928 """Copy a file or subcollection to a new path in this collection.
931 A string with a path to source file or subcollection, or an actual ArvadosFile or Subcollection object.
934 Destination file or path. If the target path already exists and is a
935 subcollection, the item will be placed inside the subcollection. If
936 the target path already exists and is a file, this will raise an error
937 unless you specify `overwrite=True`.
940 Collection to copy `source_path` from (default `self`)
943 Whether to overwrite target file if it already exists.
946 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, True)
947 target_dir.add(source_obj, target_name, overwrite, False)
951 def rename(self, source, target_path, source_collection=None, overwrite=False):
952 """Move a file or subcollection from `source_collection` to a new path in this collection.
955 A string with a path to source file or subcollection.
958 Destination file or path. If the target path already exists and is a
959 subcollection, the item will be placed inside the subcollection. If
960 the target path already exists and is a file, this will raise an error
961 unless you specify `overwrite=True`.
964 Collection to copy `source_path` from (default `self`)
967 Whether to overwrite target file if it already exists.
970 source_obj, target_dir, target_name = self._get_src_target(source, target_path, source_collection, False)
971 if not source_obj.writable():
972 raise IOError(errno.EROFS, "Source collection is read only", source)
973 target_dir.add(source_obj, target_name, overwrite, True)
975 def portable_manifest_text(self, stream_name="."):
976 """Get the manifest text for this collection, sub collections and files.
978 This method does not flush outstanding blocks to Keep. It will return
979 a normalized manifest with access tokens stripped.
982 Name to use for this stream (directory)
985 return self._get_manifest_text(stream_name, True, True)
988 def manifest_text(self, stream_name=".", strip=False, normalize=False,
989 only_committed=False):
990 """Get the manifest text for this collection, sub collections and files.
992 This method will flush outstanding blocks to Keep. By default, it will
993 not normalize an unmodified manifest or strip access tokens.
996 Name to use for this stream (directory)
999 If True, remove signing tokens from block locators if present.
1000 If False (default), block locators are left unchanged.
1003 If True, always export the manifest text in normalized form
1004 even if the Collection is not modified. If False (default) and the collection
1005 is not modified, return the original manifest text even if it is not
1009 If True, don't commit pending blocks.
1013 if not only_committed:
1014 self._my_block_manager().commit_all()
1015 return self._get_manifest_text(stream_name, strip, normalize,
1016 only_committed=only_committed)
1019 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1020 """Get the manifest text for this collection, sub collections and files.
1023 Name to use for this stream (directory)
1026 If True, remove signing tokens from block locators if present.
1027 If False (default), block locators are left unchanged.
1030 If True, always export the manifest text in normalized form
1031 even if the Collection is not modified. If False (default) and the collection
1032 is not modified, return the original manifest text even if it is not
1036 If True, only include blocks that were already committed to Keep.
1040 if not self.committed() or self._manifest_text is None or normalize:
1043 sorted_keys = sorted(self.keys())
1044 for filename in [s for s in sorted_keys if isinstance(self[s], ArvadosFile)]:
1045 # Create a stream per file `k`
1046 arvfile = self[filename]
1048 for segment in arvfile.segments():
1049 loc = segment.locator
1050 if arvfile.parent._my_block_manager().is_bufferblock(loc):
1053 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
1055 loc = KeepLocator(loc).stripped()
1056 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
1057 segment.segment_offset, segment.range_size))
1058 stream[filename] = filestream
1060 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
1061 for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
1062 buf.append(self[dirname].manifest_text(
1063 stream_name=os.path.join(stream_name, dirname),
1064 strip=strip, normalize=True, only_committed=only_committed))
1068 return self.stripped_manifest()
1070 return self._manifest_text
1073 def _copy_remote_blocks(self, remote_blocks={}):
1074 """Scan through the entire collection and ask Keep to copy remote blocks.
1076 When accessing a remote collection, blocks will have a remote signature
1077 (+R instead of +A). Collect these signatures and request Keep to copy the
1078 blocks to the local cluster, returning local (+A) signatures.
1081 Shared cache of remote to local block mappings. This is used to avoid
1082 doing extra work when blocks are shared by more than one file in
1083 different subdirectories.
1087 remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
1088 return remote_blocks
1091 def diff(self, end_collection, prefix=".", holding_collection=None):
1092 """Generate list of add/modify/delete actions.
1094 When given to `apply`, will change `self` to match `end_collection`
1098 if holding_collection is None:
1099 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
1101 if k not in end_collection:
1102 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection, "")))
1103 for k in end_collection:
1105 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
1106 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
1107 elif end_collection[k] != self[k]:
1108 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1110 changes.append((TOK, os.path.join(prefix, k), self[k].clone(holding_collection, ""), end_collection[k].clone(holding_collection, "")))
1112 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection, "")))
1117 def apply(self, changes):
1118 """Apply changes from `diff`.
1120 If a change conflicts with a local change, it will be saved to an
1121 alternate path indicating the conflict.
1125 self.set_committed(False)
1126 for change in changes:
1127 event_type = change[0]
1130 local = self.find(path)
1131 conflictpath = "%s~%s~conflict~" % (path, time.strftime("%Y%m%d-%H%M%S",
1133 if event_type == ADD:
1135 # No local file at path, safe to copy over new file
1136 self.copy(initial, path)
1137 elif local is not None and local != initial:
1138 # There is already local file and it is different:
1139 # save change to conflict file.
1140 self.copy(initial, conflictpath)
1141 elif event_type == MOD or event_type == TOK:
1143 if local == initial:
1144 # Local matches the "initial" item so it has not
1145 # changed locally and is safe to update.
1146 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
1147 # Replace contents of local file with new contents
1148 local.replace_contents(final)
1150 # Overwrite path with new item; this can happen if
1151 # path was a file and is now a collection or vice versa
1152 self.copy(final, path, overwrite=True)
1154 # Local is missing (presumably deleted) or local doesn't
1155 # match the "start" value, so save change to conflict file
1156 self.copy(final, conflictpath)
1157 elif event_type == DEL:
1158 if local == initial:
1159 # Local item matches "initial" value, so it is safe to remove.
1160 self.remove(path, recursive=True)
1161 # else, the file is modified or already removed, in either
1162 # case we don't want to try to remove it.
1164 def portable_data_hash(self):
1165 """Get the portable data hash for this collection's manifest."""
1166 if self._manifest_locator and self.committed():
1167 # If the collection is already saved on the API server, and it's committed
1168 # then return API server's PDH response.
1169 return self._portable_data_hash
1171 stripped = self.portable_manifest_text().encode()
1172 return '{}+{}'.format(hashlib.md5(stripped).hexdigest(), len(stripped))
1175 def subscribe(self, callback):
1176 if self._callback is None:
1177 self._callback = callback
1179 raise errors.ArgumentError("A callback is already set on this collection.")
1182 def unsubscribe(self):
1183 if self._callback is not None:
1184 self._callback = None
1187 def notify(self, event, collection, name, item):
1189 self._callback(event, collection, name, item)
1190 self.root_collection().notify(event, collection, name, item)
1193 def __eq__(self, other):
1196 if not isinstance(other, RichCollectionBase):
1198 if len(self._items) != len(other):
1200 for k in self._items:
1203 if self._items[k] != other[k]:
1207 def __ne__(self, other):
1208 return not self.__eq__(other)
1212 """Flush bufferblocks to Keep."""
1213 for e in listvalues(self):
1217 class Collection(RichCollectionBase):
1218 """Represents the root of an Arvados Collection.
1220 This class is threadsafe. The root collection object, all subcollections
1221 and files are protected by a single lock (i.e. each access locks the entire
1227 :To read an existing file:
1228 `c.open("myfile", "r")`
1230 :To write a new file:
1231 `c.open("myfile", "w")`
1233 :To determine if a file exists:
1234 `c.find("myfile") is not None`
1237 `c.copy("source", "dest")`
1240 `c.remove("myfile")`
1242 :To save to an existing collection record:
1245 :To save a new collection record:
1248 :To merge remote changes into this object:
1251 Must be associated with an API server Collection record (during
1252 initialization, or using `save_new`) to use `save` or `update`
1256 def __init__(self, manifest_locator_or_text=None,
1263 replication_desired=None,
1264 storage_classes_desired=None,
1266 """Collection constructor.
1268 :manifest_locator_or_text:
1269 An Arvados collection UUID, portable data hash, raw manifest
1270 text, or (if creating an empty collection) None.
1273 the parent Collection, may be None.
1276 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1277 Prefer this over supplying your own api_client and keep_client (except in testing).
1278 Will use default config settings if not specified.
1281 The API client object to use for requests. If not specified, create one using `apiconfig`.
1284 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1287 the number of retries for API and Keep requests.
1290 the block manager to use. If not specified, create one.
1292 :replication_desired:
1293 How many copies should Arvados maintain. If None, API server default
1294 configuration applies. If not None, this value will also be used
1295 for determining the number of block copies being written.
1297 :storage_classes_desired:
1298 A list of storage class names where to upload the data. If None,
1299 the keep client is expected to store the data into the cluster's
1300 default storage class(es).
1304 if storage_classes_desired and type(storage_classes_desired) is not list:
1305 raise errors.ArgumentError("storage_classes_desired must be list type.")
1307 super(Collection, self).__init__(parent)
1308 self._api_client = api_client
1309 self._keep_client = keep_client
1311 # Use the keep client from ThreadSafeApiCache
1312 if self._keep_client is None and isinstance(self._api_client, ThreadSafeApiCache):
1313 self._keep_client = self._api_client.keep
1315 self._block_manager = block_manager
1316 self.replication_desired = replication_desired
1317 self._storage_classes_desired = storage_classes_desired
1318 self.put_threads = put_threads
1321 self._config = apiconfig
1323 self._config = config.settings()
1325 self.num_retries = num_retries
1326 self._manifest_locator = None
1327 self._manifest_text = None
1328 self._portable_data_hash = None
1329 self._api_response = None
1330 self._past_versions = set()
1332 self.lock = threading.RLock()
1335 if manifest_locator_or_text:
1336 if re.match(arvados.util.keep_locator_pattern, manifest_locator_or_text):
1337 self._manifest_locator = manifest_locator_or_text
1338 elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
1339 self._manifest_locator = manifest_locator_or_text
1340 if not self._has_local_collection_uuid():
1341 self._has_remote_blocks = True
1342 elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
1343 self._manifest_text = manifest_locator_or_text
1344 if '+R' in self._manifest_text:
1345 self._has_remote_blocks = True
1347 raise errors.ArgumentError(
1348 "Argument to CollectionReader is not a manifest or a collection UUID")
1352 except errors.SyntaxError as e:
1353 raise errors.ArgumentError("Error processing manifest text: %s", str(e)) from None
1355 def storage_classes_desired(self):
1356 return self._storage_classes_desired or []
1358 def root_collection(self):
1361 def get_properties(self):
1362 if self._api_response and self._api_response["properties"]:
1363 return self._api_response["properties"]
1367 def get_trash_at(self):
1368 if self._api_response and self._api_response["trash_at"]:
1370 return ciso8601.parse_datetime(self._api_response["trash_at"])
1376 def stream_name(self):
1383 def known_past_version(self, modified_at_and_portable_data_hash):
1384 return modified_at_and_portable_data_hash in self._past_versions
1388 def update(self, other=None, num_retries=None):
1389 """Merge the latest collection on the API server with the current collection."""
1392 if self._manifest_locator is None:
1393 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1394 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1395 if (self.known_past_version((response.get("modified_at"), response.get("portable_data_hash"))) and
1396 response.get("portable_data_hash") != self.portable_data_hash()):
1397 # The record on the server is different from our current one, but we've seen it before,
1398 # so ignore it because it's already been merged.
1399 # However, if it's the same as our current record, proceed with the update, because we want to update
1403 self._remember_api_response(response)
1404 other = CollectionReader(response["manifest_text"])
1405 baseline = CollectionReader(self._manifest_text)
1406 self.apply(baseline.diff(other))
1407 self._manifest_text = self.manifest_text()
1411 if self._api_client is None:
1412 self._api_client = ThreadSafeApiCache(self._config, version='v1')
1413 if self._keep_client is None:
1414 self._keep_client = self._api_client.keep
1415 return self._api_client
1419 if self._keep_client is None:
1420 if self._api_client is None:
1423 self._keep_client = KeepClient(api_client=self._api_client)
1424 return self._keep_client
1427 def _my_block_manager(self):
1428 if self._block_manager is None:
1429 copies = (self.replication_desired or
1430 self._my_api()._rootDesc.get('defaultCollectionReplication',
1432 self._block_manager = _BlockManager(self._my_keep(),
1434 put_threads=self.put_threads,
1435 num_retries=self.num_retries,
1436 storage_classes_func=self.storage_classes_desired)
1437 return self._block_manager
1439 def _remember_api_response(self, response):
1440 self._api_response = response
1441 self._past_versions.add((response.get("modified_at"), response.get("portable_data_hash")))
1443 def _populate_from_api_server(self):
1444 # As in KeepClient itself, we must wait until the last
1445 # possible moment to instantiate an API client, in order to
1446 # avoid tripping up clients that don't have access to an API
1447 # server. If we do build one, make sure our Keep client uses
1448 # it. If instantiation fails, we'll fall back to the except
1449 # clause, just like any other Collection lookup
1450 # failure. Return an exception, or None if successful.
1451 self._remember_api_response(self._my_api().collections().get(
1452 uuid=self._manifest_locator).execute(
1453 num_retries=self.num_retries))
1454 self._manifest_text = self._api_response['manifest_text']
1455 self._portable_data_hash = self._api_response['portable_data_hash']
1456 # If not overriden via kwargs, we should try to load the
1457 # replication_desired and storage_classes_desired from the API server
1458 if self.replication_desired is None:
1459 self.replication_desired = self._api_response.get('replication_desired', None)
1460 if self._storage_classes_desired is None:
1461 self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
1463 def _populate(self):
1464 if self._manifest_text is None:
1465 if self._manifest_locator is None:
1468 self._populate_from_api_server()
1469 self._baseline_manifest = self._manifest_text
1470 self._import_manifest(self._manifest_text)
1472 def _has_collection_uuid(self):
1473 return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
1475 def _has_local_collection_uuid(self):
1476 return self._has_collection_uuid and \
1477 self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
1479 def __enter__(self):
1482 def __exit__(self, exc_type, exc_value, traceback):
1483 """Support scoped auto-commit in a with: block."""
1484 if exc_type is None:
1485 if self.writable() and self._has_collection_uuid():
1489 def stop_threads(self):
1490 if self._block_manager is not None:
1491 self._block_manager.stop_threads()
1494 def manifest_locator(self):
1495 """Get the manifest locator, if any.
1497 The manifest locator will be set when the collection is loaded from an
1498 API server record or the portable data hash of a manifest.
1500 The manifest locator will be None if the collection is newly created or
1501 was created directly from manifest text. The method `save_new()` will
1502 assign a manifest locator.
1505 return self._manifest_locator
1508 def clone(self, new_parent=None, new_name=None, readonly=False, new_config=None):
1509 if new_config is None:
1510 new_config = self._config
1512 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1514 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1516 newcollection._clonefrom(self)
1517 return newcollection
1520 def api_response(self):
1521 """Returns information about this Collection fetched from the API server.
1523 If the Collection exists in Keep but not the API server, currently
1524 returns None. Future versions may provide a synthetic response.
1527 return self._api_response
1529 def find_or_create(self, path, create_type):
1530 """See `RichCollectionBase.find_or_create`"""
1534 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1536 def find(self, path):
1537 """See `RichCollectionBase.find`"""
1541 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1543 def remove(self, path, recursive=False):
1544 """See `RichCollectionBase.remove`"""
1546 raise errors.ArgumentError("Cannot remove '.'")
1548 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1555 storage_classes=None,
1559 preserve_version=False):
1560 """Save collection to an existing collection record.
1562 Commit pending buffer blocks to Keep, merge with remote record (if
1563 merge=True, the default), and update the collection record. Returns
1564 the current manifest text.
1566 Will raise AssertionError if not associated with a collection record on
1567 the API server. If you want to save a manifest to Keep only, see
1571 Additional properties of collection. This value will replace any existing
1572 properties of collection.
1575 Specify desirable storage classes to be used when writing data to Keep.
1578 A collection is *expiring* when it has a *trash_at* time in the future.
1579 An expiring collection can be accessed as normal,
1580 but is scheduled to be trashed automatically at the *trash_at* time.
1583 Update and merge remote changes before saving. Otherwise, any
1584 remote changes will be ignored and overwritten.
1587 Retry count on API calls (if None, use the collection default)
1590 If True, indicate that the collection content being saved right now
1591 should be preserved in a version snapshot if the collection record is
1592 updated in the future. Requires that the API server has
1593 Collections.CollectionVersioning enabled, if not, setting this will
1597 if properties and type(properties) is not dict:
1598 raise errors.ArgumentError("properties must be dictionary type.")
1600 if storage_classes and type(storage_classes) is not list:
1601 raise errors.ArgumentError("storage_classes must be list type.")
1603 self._storage_classes_desired = storage_classes
1605 if trash_at and type(trash_at) is not datetime.datetime:
1606 raise errors.ArgumentError("trash_at must be datetime type.")
1608 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1609 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1613 body["properties"] = properties
1614 if self.storage_classes_desired():
1615 body["storage_classes_desired"] = self.storage_classes_desired()
1617 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1618 body["trash_at"] = t
1619 if preserve_version:
1620 body["preserve_version"] = preserve_version
1622 if not self.committed():
1623 if self._has_remote_blocks:
1624 # Copy any remote blocks to the local cluster.
1625 self._copy_remote_blocks(remote_blocks={})
1626 self._has_remote_blocks = False
1627 if not self._has_collection_uuid():
1628 raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
1629 elif not self._has_local_collection_uuid():
1630 raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
1632 self._my_block_manager().commit_all()
1637 text = self.manifest_text(strip=False)
1638 body['manifest_text'] = text
1640 self._remember_api_response(self._my_api().collections().update(
1641 uuid=self._manifest_locator,
1643 ).execute(num_retries=num_retries))
1644 self._manifest_text = self._api_response["manifest_text"]
1645 self._portable_data_hash = self._api_response["portable_data_hash"]
1646 self.set_committed(True)
1648 self._remember_api_response(self._my_api().collections().update(
1649 uuid=self._manifest_locator,
1651 ).execute(num_retries=num_retries))
1653 return self._manifest_text
1659 def save_new(self, name=None,
1660 create_collection_record=True,
1663 storage_classes=None,
1665 ensure_unique_name=False,
1667 preserve_version=False):
1668 """Save collection to a new collection record.
1670 Commit pending buffer blocks to Keep and, when create_collection_record
1671 is True (default), create a new collection record. After creating a
1672 new collection record, this Collection object will be associated with
1673 the new record used by `save()`. Returns the current manifest text.
1676 The collection name.
1678 :create_collection_record:
1679 If True, create a collection record on the API server.
1680 If False, only commit blocks to Keep and return the manifest text.
1683 the user, or project uuid that will own this collection.
1684 If None, defaults to the current user.
1687 Additional properties of collection. This value will replace any existing
1688 properties of collection.
1691 Specify desirable storage classes to be used when writing data to Keep.
1694 A collection is *expiring* when it has a *trash_at* time in the future.
1695 An expiring collection can be accessed as normal,
1696 but is scheduled to be trashed automatically at the *trash_at* time.
1698 :ensure_unique_name:
1699 If True, ask the API server to rename the collection
1700 if it conflicts with a collection with the same name and owner. If
1701 False, a name conflict will result in an error.
1704 Retry count on API calls (if None, use the collection default)
1707 If True, indicate that the collection content being saved right now
1708 should be preserved in a version snapshot if the collection record is
1709 updated in the future. Requires that the API server has
1710 Collections.CollectionVersioning enabled, if not, setting this will
1714 if properties and type(properties) is not dict:
1715 raise errors.ArgumentError("properties must be dictionary type.")
1717 if storage_classes and type(storage_classes) is not list:
1718 raise errors.ArgumentError("storage_classes must be list type.")
1720 if trash_at and type(trash_at) is not datetime.datetime:
1721 raise errors.ArgumentError("trash_at must be datetime type.")
1723 if preserve_version and not self._my_api().config()['Collections'].get('CollectionVersioning', False):
1724 raise errors.ArgumentError("preserve_version is not supported when CollectionVersioning is not enabled.")
1726 if self._has_remote_blocks:
1727 # Copy any remote blocks to the local cluster.
1728 self._copy_remote_blocks(remote_blocks={})
1729 self._has_remote_blocks = False
1732 self._storage_classes_desired = storage_classes
1734 self._my_block_manager().commit_all()
1735 text = self.manifest_text(strip=False)
1737 if create_collection_record:
1739 name = "New collection"
1740 ensure_unique_name = True
1742 body = {"manifest_text": text,
1744 "replication_desired": self.replication_desired}
1746 body["owner_uuid"] = owner_uuid
1748 body["properties"] = properties
1749 if self.storage_classes_desired():
1750 body["storage_classes_desired"] = self.storage_classes_desired()
1752 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
1753 body["trash_at"] = t
1754 if preserve_version:
1755 body["preserve_version"] = preserve_version
1757 self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
1758 text = self._api_response["manifest_text"]
1760 self._manifest_locator = self._api_response["uuid"]
1761 self._portable_data_hash = self._api_response["portable_data_hash"]
1763 self._manifest_text = text
1764 self.set_committed(True)
1768 _token_re = re.compile(r'(\S+)(\s+|$)')
1769 _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
1770 _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
1772 def _unescape_manifest_path(self, path):
1773 return re.sub(r'\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
1776 def _import_manifest(self, manifest_text):
1777 """Import a manifest into a `Collection`.
1780 The manifest text to import from.
1784 raise ArgumentError("Can only import manifest into an empty collection")
1793 for token_and_separator in self._token_re.finditer(manifest_text):
1794 tok = token_and_separator.group(1)
1795 sep = token_and_separator.group(2)
1797 if state == STREAM_NAME:
1798 # starting a new stream
1799 stream_name = self._unescape_manifest_path(tok)
1804 self.find_or_create(stream_name, COLLECTION)
1808 block_locator = self._block_re.match(tok)
1810 blocksize = int(block_locator.group(1))
1811 blocks.append(Range(tok, streamoffset, blocksize, 0))
1812 streamoffset += blocksize
1816 if state == SEGMENTS:
1817 file_segment = self._segment_re.match(tok)
1819 pos = int(file_segment.group(1))
1820 size = int(file_segment.group(2))
1821 name = self._unescape_manifest_path(file_segment.group(3))
1822 if name.split('/')[-1] == '.':
1823 # placeholder for persisting an empty directory, not a real file
1825 self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
1827 filepath = os.path.join(stream_name, name)
1829 afile = self.find_or_create(filepath, FILE)
1830 except IOError as e:
1831 if e.errno == errno.ENOTDIR:
1832 raise errors.SyntaxError("Dir part of %s conflicts with file of the same name.", filepath) from None
1835 if isinstance(afile, ArvadosFile):
1836 afile.add_segment(blocks, pos, size)
1838 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1841 raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
1847 self.set_committed(True)
1850 def notify(self, event, collection, name, item):
1852 self._callback(event, collection, name, item)
1855 class Subcollection(RichCollectionBase):
1856 """This is a subdirectory within a collection that doesn't have its own API
1859 Subcollection locking falls under the umbrella lock of its root collection.
1863 def __init__(self, parent, name):
1864 super(Subcollection, self).__init__(parent)
1865 self.lock = self.root_collection().lock
1866 self._manifest_text = None
1868 self.num_retries = parent.num_retries
1870 def root_collection(self):
1871 return self.parent.root_collection()
1874 return self.root_collection().writable()
1877 return self.root_collection()._my_api()
1880 return self.root_collection()._my_keep()
1882 def _my_block_manager(self):
1883 return self.root_collection()._my_block_manager()
1885 def stream_name(self):
1886 return os.path.join(self.parent.stream_name(), self.name)
1889 def clone(self, new_parent, new_name):
1890 c = Subcollection(new_parent, new_name)
1896 def _reparent(self, newparent, newname):
1897 self.set_committed(False)
1899 self.parent.remove(self.name, recursive=True)
1900 self.parent = newparent
1902 self.lock = self.parent.root_collection().lock
1905 def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
1906 """Encode empty directories by using an \056-named (".") empty file"""
1907 if len(self._items) == 0:
1908 return "%s %s 0:0:\\056\n" % (
1909 escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
1910 return super(Subcollection, self)._get_manifest_text(stream_name,
1915 class CollectionReader(Collection):
1916 """A read-only collection object.
1918 Initialize from a collection UUID or portable data hash, or raw
1919 manifest text. See `Collection` constructor for detailed options.
1922 def __init__(self, manifest_locator_or_text, *args, **kwargs):
1923 self._in_init = True
1924 super(CollectionReader, self).__init__(manifest_locator_or_text, *args, **kwargs)
1925 self._in_init = False
1927 # Forego any locking since it should never change once initialized.
1928 self.lock = NoopLock()
1930 # Backwards compatability with old CollectionReader
1931 # all_streams() and all_files()
1932 self._streams = None
1935 return self._in_init
1937 def _populate_streams(orig_func):
1938 @functools.wraps(orig_func)
1939 def populate_streams_wrapper(self, *args, **kwargs):
1940 # Defer populating self._streams until needed since it creates a copy of the manifest.
1941 if self._streams is None:
1942 if self._manifest_text:
1943 self._streams = [sline.split()
1944 for sline in self._manifest_text.split("\n")
1948 return orig_func(self, *args, **kwargs)
1949 return populate_streams_wrapper
1952 def normalize(self):
1953 """Normalize the streams returned by `all_streams`.
1955 This method is kept for backwards compatability and only affects the
1956 behavior of `all_streams()` and `all_files()`
1962 for s in self.all_streams():
1963 for f in s.all_files():
1964 streamname, filename = split(s.name() + "/" + f.name())
1965 if streamname not in streams:
1966 streams[streamname] = {}
1967 if filename not in streams[streamname]:
1968 streams[streamname][filename] = []
1969 for r in f.segments:
1970 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1972 self._streams = [normalize_stream(s, streams[s])
1973 for s in sorted(streams)]
1975 def all_streams(self):
1976 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1977 for s in self._streams]
1980 def all_files(self):
1981 for s in self.all_streams():
1982 for f in s.all_files():