10 from collections import deque
13 from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, NoopLock
14 from keep import KeepLocator, KeepClient
15 from .stream import StreamReader
16 from ._normalize_stream import normalize_stream
17 from ._ranges import Range, LocatorAndRange
18 from .safeapi import ThreadSafeApiCache
23 from arvados.retry import retry_method
25 _logger = logging.getLogger('arvados.collection')
27 class CollectionBase(object):
31 def __exit__(self, exc_type, exc_value, traceback):
35 if self._keep_client is None:
36 self._keep_client = KeepClient(api_client=self._api_client,
37 num_retries=self.num_retries)
38 return self._keep_client
40 def stripped_manifest(self):
42 Return the manifest for the current collection with all
43 non-portable hints (i.e., permission signatures and other
44 hints other than size hints) removed from the locators.
46 raw = self.manifest_text()
48 for line in raw.split("\n"):
51 clean_fields = fields[:1] + [
52 (re.sub(r'\+[^\d][^\+]*', '', x)
53 if re.match(util.keep_locator_pattern, x)
56 clean += [' '.join(clean_fields), "\n"]
60 class _WriterFile(_FileLikeObjectBase):
61 def __init__(self, coll_writer, name):
62 super(_WriterFile, self).__init__(name, 'wb')
63 self.dest = coll_writer
66 super(_WriterFile, self).close()
67 self.dest.finish_current_file()
69 @_FileLikeObjectBase._before_close
70 def write(self, data):
73 @_FileLikeObjectBase._before_close
74 def writelines(self, seq):
78 @_FileLikeObjectBase._before_close
80 self.dest.flush_data()
83 class CollectionWriter(CollectionBase):
84 def __init__(self, api_client=None, num_retries=0, replication=None):
85 """Instantiate a CollectionWriter.
87 CollectionWriter lets you build a new Arvados Collection from scratch.
88 Write files to it. The CollectionWriter will upload data to Keep as
89 appropriate, and provide you with the Collection manifest text when
93 * api_client: The API client to use to look up Collections. If not
94 provided, CollectionReader will build one from available Arvados
96 * num_retries: The default number of times to retry failed
97 service requests. Default 0. You may change this value
98 after instantiation, but note those changes may not
99 propagate to related objects like the Keep client.
100 * replication: The number of copies of each block to store.
101 If this argument is None or not supplied, replication is
102 the server-provided default if available, otherwise 2.
104 self._api_client = api_client
105 self.num_retries = num_retries
106 self.replication = (2 if replication is None else replication)
107 self._keep_client = None
108 self._data_buffer = []
109 self._data_buffer_len = 0
110 self._current_stream_files = []
111 self._current_stream_length = 0
112 self._current_stream_locators = []
113 self._current_stream_name = '.'
114 self._current_file_name = None
115 self._current_file_pos = 0
116 self._finished_streams = []
117 self._close_file = None
118 self._queued_file = None
119 self._queued_dirents = deque()
120 self._queued_trees = deque()
121 self._last_open = None
123 def __exit__(self, exc_type, exc_value, traceback):
127 def do_queued_work(self):
128 # The work queue consists of three pieces:
129 # * _queued_file: The file object we're currently writing to the
131 # * _queued_dirents: Entries under the current directory
132 # (_queued_trees[0]) that we want to write or recurse through.
133 # This may contain files from subdirectories if
134 # max_manifest_depth == 0 for this directory.
135 # * _queued_trees: Directories that should be written as separate
136 # streams to the Collection.
137 # This function handles the smallest piece of work currently queued
138 # (current file, then current directory, then next directory) until
139 # no work remains. The _work_THING methods each do a unit of work on
140 # THING. _queue_THING methods add a THING to the work queue.
142 if self._queued_file:
144 elif self._queued_dirents:
146 elif self._queued_trees:
151 def _work_file(self):
153 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
157 self.finish_current_file()
159 self._queued_file.close()
160 self._close_file = None
161 self._queued_file = None
163 def _work_dirents(self):
164 path, stream_name, max_manifest_depth = self._queued_trees[0]
165 if stream_name != self.current_stream_name():
166 self.start_new_stream(stream_name)
167 while self._queued_dirents:
168 dirent = self._queued_dirents.popleft()
169 target = os.path.join(path, dirent)
170 if os.path.isdir(target):
171 self._queue_tree(target,
172 os.path.join(stream_name, dirent),
173 max_manifest_depth - 1)
175 self._queue_file(target, dirent)
177 if not self._queued_dirents:
178 self._queued_trees.popleft()
180 def _work_trees(self):
181 path, stream_name, max_manifest_depth = self._queued_trees[0]
182 d = util.listdir_recursive(
183 path, max_depth = (None if max_manifest_depth == 0 else 0))
185 self._queue_dirents(stream_name, d)
187 self._queued_trees.popleft()
189 def _queue_file(self, source, filename=None):
190 assert (self._queued_file is None), "tried to queue more than one file"
191 if not hasattr(source, 'read'):
192 source = open(source, 'rb')
193 self._close_file = True
195 self._close_file = False
197 filename = os.path.basename(source.name)
198 self.start_new_file(filename)
199 self._queued_file = source
201 def _queue_dirents(self, stream_name, dirents):
202 assert (not self._queued_dirents), "tried to queue more than one tree"
203 self._queued_dirents = deque(sorted(dirents))
205 def _queue_tree(self, path, stream_name, max_manifest_depth):
206 self._queued_trees.append((path, stream_name, max_manifest_depth))
208 def write_file(self, source, filename=None):
209 self._queue_file(source, filename)
210 self.do_queued_work()
212 def write_directory_tree(self,
213 path, stream_name='.', max_manifest_depth=-1):
214 self._queue_tree(path, stream_name, max_manifest_depth)
215 self.do_queued_work()
217 def write(self, newdata):
218 if hasattr(newdata, '__iter__'):
222 self._data_buffer.append(newdata)
223 self._data_buffer_len += len(newdata)
224 self._current_stream_length += len(newdata)
225 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
228 def open(self, streampath, filename=None):
229 """open(streampath[, filename]) -> file-like object
231 Pass in the path of a file to write to the Collection, either as a
232 single string or as two separate stream name and file name arguments.
233 This method returns a file-like object you can write to add it to the
236 You may only have one file object from the Collection open at a time,
237 so be sure to close the object when you're done. Using the object in
238 a with statement makes that easy::
240 with cwriter.open('./doc/page1.txt') as outfile:
241 outfile.write(page1_data)
242 with cwriter.open('./doc/page2.txt') as outfile:
243 outfile.write(page2_data)
246 streampath, filename = split(streampath)
247 if self._last_open and not self._last_open.closed:
248 raise errors.AssertionError(
249 "can't open '{}' when '{}' is still open".format(
250 filename, self._last_open.name))
251 if streampath != self.current_stream_name():
252 self.start_new_stream(streampath)
253 self.set_current_file_name(filename)
254 self._last_open = _WriterFile(self, filename)
255 return self._last_open
257 def flush_data(self):
258 data_buffer = ''.join(self._data_buffer)
260 self._current_stream_locators.append(
262 data_buffer[0:config.KEEP_BLOCK_SIZE],
263 copies=self.replication))
264 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
265 self._data_buffer_len = len(self._data_buffer[0])
267 def start_new_file(self, newfilename=None):
268 self.finish_current_file()
269 self.set_current_file_name(newfilename)
271 def set_current_file_name(self, newfilename):
272 if re.search(r'[\t\n]', newfilename):
273 raise errors.AssertionError(
274 "Manifest filenames cannot contain whitespace: %s" %
276 elif re.search(r'\x00', newfilename):
277 raise errors.AssertionError(
278 "Manifest filenames cannot contain NUL characters: %s" %
280 self._current_file_name = newfilename
282 def current_file_name(self):
283 return self._current_file_name
285 def finish_current_file(self):
286 if self._current_file_name is None:
287 if self._current_file_pos == self._current_stream_length:
289 raise errors.AssertionError(
290 "Cannot finish an unnamed file " +
291 "(%d bytes at offset %d in '%s' stream)" %
292 (self._current_stream_length - self._current_file_pos,
293 self._current_file_pos,
294 self._current_stream_name))
295 self._current_stream_files.append([
296 self._current_file_pos,
297 self._current_stream_length - self._current_file_pos,
298 self._current_file_name])
299 self._current_file_pos = self._current_stream_length
300 self._current_file_name = None
302 def start_new_stream(self, newstreamname='.'):
303 self.finish_current_stream()
304 self.set_current_stream_name(newstreamname)
306 def set_current_stream_name(self, newstreamname):
307 if re.search(r'[\t\n]', newstreamname):
308 raise errors.AssertionError(
309 "Manifest stream names cannot contain whitespace")
310 self._current_stream_name = '.' if newstreamname=='' else newstreamname
312 def current_stream_name(self):
313 return self._current_stream_name
315 def finish_current_stream(self):
316 self.finish_current_file()
318 if not self._current_stream_files:
320 elif self._current_stream_name is None:
321 raise errors.AssertionError(
322 "Cannot finish an unnamed stream (%d bytes in %d files)" %
323 (self._current_stream_length, len(self._current_stream_files)))
325 if not self._current_stream_locators:
326 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
327 self._finished_streams.append([self._current_stream_name,
328 self._current_stream_locators,
329 self._current_stream_files])
330 self._current_stream_files = []
331 self._current_stream_length = 0
332 self._current_stream_locators = []
333 self._current_stream_name = None
334 self._current_file_pos = 0
335 self._current_file_name = None
338 """Store the manifest in Keep and return its locator.
340 This is useful for storing manifest fragments (task outputs)
341 temporarily in Keep during a Crunch job.
343 In other cases you should make a collection instead, by
344 sending manifest_text() to the API server's "create
345 collection" endpoint.
347 return self._my_keep().put(self.manifest_text(), copies=self.replication)
349 def portable_data_hash(self):
350 stripped = self.stripped_manifest()
351 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
353 def manifest_text(self):
354 self.finish_current_stream()
357 for stream in self._finished_streams:
358 if not re.search(r'^\.(/.*)?$', stream[0]):
360 manifest += stream[0].replace(' ', '\\040')
361 manifest += ' ' + ' '.join(stream[1])
362 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
367 def data_locators(self):
369 for name, locators, files in self._finished_streams:
374 class ResumableCollectionWriter(CollectionWriter):
375 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
376 '_current_stream_locators', '_current_stream_name',
377 '_current_file_name', '_current_file_pos', '_close_file',
378 '_data_buffer', '_dependencies', '_finished_streams',
379 '_queued_dirents', '_queued_trees']
381 def __init__(self, api_client=None, **kwargs):
382 self._dependencies = {}
383 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
386 def from_state(cls, state, *init_args, **init_kwargs):
387 # Try to build a new writer from scratch with the given state.
388 # If the state is not suitable to resume (because files have changed,
389 # been deleted, aren't predictable, etc.), raise a
390 # StaleWriterStateError. Otherwise, return the initialized writer.
391 # The caller is responsible for calling writer.do_queued_work()
392 # appropriately after it's returned.
393 writer = cls(*init_args, **init_kwargs)
394 for attr_name in cls.STATE_PROPS:
395 attr_value = state[attr_name]
396 attr_class = getattr(writer, attr_name).__class__
397 # Coerce the value into the same type as the initial value, if
399 if attr_class not in (type(None), attr_value.__class__):
400 attr_value = attr_class(attr_value)
401 setattr(writer, attr_name, attr_value)
402 # Check dependencies before we try to resume anything.
403 if any(KeepLocator(ls).permission_expired()
404 for ls in writer._current_stream_locators):
405 raise errors.StaleWriterStateError(
406 "locators include expired permission hint")
407 writer.check_dependencies()
408 if state['_current_file'] is not None:
409 path, pos = state['_current_file']
411 writer._queued_file = open(path, 'rb')
412 writer._queued_file.seek(pos)
413 except IOError as error:
414 raise errors.StaleWriterStateError(
415 "failed to reopen active file {}: {}".format(path, error))
418 def check_dependencies(self):
419 for path, orig_stat in self._dependencies.items():
420 if not S_ISREG(orig_stat[ST_MODE]):
421 raise errors.StaleWriterStateError("{} not file".format(path))
423 now_stat = tuple(os.stat(path))
424 except OSError as error:
425 raise errors.StaleWriterStateError(
426 "failed to stat {}: {}".format(path, error))
427 if ((not S_ISREG(now_stat[ST_MODE])) or
428 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
429 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
430 raise errors.StaleWriterStateError("{} changed".format(path))
432 def dump_state(self, copy_func=lambda x: x):
433 state = {attr: copy_func(getattr(self, attr))
434 for attr in self.STATE_PROPS}
435 if self._queued_file is None:
436 state['_current_file'] = None
438 state['_current_file'] = (os.path.realpath(self._queued_file.name),
439 self._queued_file.tell())
442 def _queue_file(self, source, filename=None):
444 src_path = os.path.realpath(source)
446 raise errors.AssertionError("{} not a file path".format(source))
448 path_stat = os.stat(src_path)
449 except OSError as stat_error:
451 super(ResumableCollectionWriter, self)._queue_file(source, filename)
452 fd_stat = os.fstat(self._queued_file.fileno())
453 if not S_ISREG(fd_stat.st_mode):
454 # We won't be able to resume from this cache anyway, so don't
455 # worry about further checks.
456 self._dependencies[source] = tuple(fd_stat)
457 elif path_stat is None:
458 raise errors.AssertionError(
459 "could not stat {}: {}".format(source, stat_error))
460 elif path_stat.st_ino != fd_stat.st_ino:
461 raise errors.AssertionError(
462 "{} changed between open and stat calls".format(source))
464 self._dependencies[src_path] = tuple(fd_stat)
466 def write(self, data):
467 if self._queued_file is None:
468 raise errors.AssertionError(
469 "resumable writer can't accept unsourced data")
470 return super(ResumableCollectionWriter, self).write(data)
476 COLLECTION = "collection"
478 class SynchronizedCollectionBase(CollectionBase):
479 """Base class for Collections and Subcollections.
481 Implements the majority of functionality relating to accessing items in the
486 def __init__(self, parent=None):
488 self._modified = True
492 raise NotImplementedError()
495 raise NotImplementedError()
497 def _my_block_manager(self):
498 raise NotImplementedError()
501 raise NotImplementedError()
503 def root_collection(self):
504 raise NotImplementedError()
506 def notify(self, event, collection, name, item):
507 raise NotImplementedError()
509 def stream_name(self):
510 raise NotImplementedError()
514 def find_or_create(self, path, create_type):
515 """Recursively search the specified file path.
517 May return either a `Collection` or `ArvadosFile`. If not found, will
518 create a new item at the specified path based on `create_type`. Will
519 create intermediate subcollections needed to contain the final item in
523 One of `arvado.collection.FILE` or
524 `arvado.collection.COLLECTION`. If the path is not found, and value
525 of create_type is FILE then create and return a new ArvadosFile for
526 the last path component. If COLLECTION, then create and return a new
527 Collection for the last path component.
531 pathcomponents = path.split("/")
533 if pathcomponents and pathcomponents[0]:
534 item = self._items.get(pathcomponents[0])
535 if len(pathcomponents) == 1:
536 # item must be a file
539 if create_type == COLLECTION:
540 item = Subcollection(self)
542 item = ArvadosFile(self)
543 self._items[pathcomponents[0]] = item
544 self._modified = True
545 self.notify(ADD, self, pathcomponents[0], item)
549 # create new collection
550 item = Subcollection(self)
551 self._items[pathcomponents[0]] = item
552 self._modified = True
553 self.notify(ADD, self, pathcomponents[0], item)
554 del pathcomponents[0]
555 if isinstance(item, SynchronizedCollectionBase):
556 return item.find_or_create("/".join(pathcomponents), create_type)
558 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
563 def find(self, path):
564 """Recursively search the specified file path.
566 May return either a Collection or ArvadosFile. Return None if not
570 pathcomponents = path.split("/")
572 if pathcomponents and pathcomponents[0]:
573 item = self._items.get(pathcomponents[0])
574 if len(pathcomponents) == 1:
575 # item must be a file
578 del pathcomponents[0]
579 if isinstance(item, SynchronizedCollectionBase):
580 return item.find("/".join(pathcomponents))
582 raise IOError((errno.ENOTDIR, "Interior path components must be subcollection"))
587 """Recursive subcollection create.
589 Like `os.mkdirs()`. Will create intermediate subcollections needed to
590 contain the leaf subcollection path.
593 return self.find_or_create(path, COLLECTION)
595 def open(self, path, mode="r"):
596 """Open a file-like object for access.
599 path to a file in the collection
601 one of "r", "r+", "w", "w+", "a", "a+"
605 opens for reading and writing. Reads/writes share a file pointer.
607 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
609 opens for reading and writing. All writes are appended to
610 the end of the file. Writing does not affect the file pointer for
613 mode = mode.replace("b", "")
614 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
615 raise errors.ArgumentError("Bad mode '%s'" % mode)
616 create = (mode != "r")
618 if create and self.sync_mode() == SYNC_READONLY:
619 raise IOError((errno.EROFS, "Collection is read only"))
622 arvfile = self.find_or_create(path, FILE)
624 arvfile = self.find(path)
627 raise IOError((errno.ENOENT, "File not found"))
628 if not isinstance(arvfile, ArvadosFile):
629 raise IOError((errno.EISDIR, "Path must refer to a file."))
634 name = os.path.basename(path)
637 return ArvadosFileReader(arvfile, name, mode, num_retries=self.num_retries)
639 return ArvadosFileWriter(arvfile, name, mode, num_retries=self.num_retries)
643 """Test if the collection (or any subcollection or file) has been modified
644 since it was created."""
647 for k,v in self._items.items():
653 def set_unmodified(self):
654 """Recursively clear modified flag."""
655 self._modified = False
656 for k,v in self._items.items():
661 """Iterate over names of files and collections contained in this collection."""
662 return iter(self._items.keys())
666 """Iterate over names of files and collections directly contained in this collection."""
667 return self._items.keys()
670 def __getitem__(self, k):
671 """Get a file or collection that is directly contained by this collection. If
672 you want to search a path, use `find()` instead.
674 return self._items[k]
677 def __contains__(self, k):
678 """If there is a file or collection a directly contained by this collection
680 return k in self._items
684 """Get the number of items directly contained in this collection."""
685 return len(self._items)
689 def __delitem__(self, p):
690 """Delete an item by name which is directly contained by this collection."""
692 self._modified = True
693 self.notify(DEL, self, p, None)
697 """Get a list of names of files and collections directly contained in this collection."""
698 return self._items.keys()
702 """Get a list of files and collection objects directly contained in this collection."""
703 return self._items.values()
707 """Get a list of (name, object) tuples directly contained in this collection."""
708 return self._items.items()
710 def exists(self, path):
711 """Test if there is a file or collection at `path`."""
712 return self.find(path) != None
716 def remove(self, path, recursive=False):
717 """Remove the file or subcollection (directory) at `path`.
720 Specify whether to remove non-empty subcollections (True), or raise an error (False).
722 pathcomponents = path.split("/")
724 if len(pathcomponents) > 0:
725 item = self._items.get(pathcomponents[0])
727 raise IOError((errno.ENOENT, "File not found"))
728 if len(pathcomponents) == 1:
729 if isinstance(self._items[pathcomponents[0]], SynchronizedCollectionBase) and len(self._items[pathcomponents[0]]) > 0 and not recursive:
730 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
731 deleteditem = self._items[pathcomponents[0]]
732 del self._items[pathcomponents[0]]
733 self._modified = True
734 self.notify(DEL, self, pathcomponents[0], deleteditem)
736 del pathcomponents[0]
737 item.remove("/".join(pathcomponents))
739 raise IOError((errno.ENOENT, "File not found"))
741 def _cloneinto(self, target):
742 for k,v in self._items.items():
743 target._items[k] = v.clone(target)
746 raise NotImplementedError()
750 def copy(self, source, target_path, source_collection=None, overwrite=False):
751 """Copy a file or subcollection to a new path in this collection.
754 An ArvadosFile, Subcollection, or string with a path to source file or subcollection
757 Destination file or path. If the target path already exists and is a
758 subcollection, the item will be placed inside the subcollection. If
759 the target path already exists and is a file, this will raise an error
760 unless you specify `overwrite=True`.
763 Collection to copy `source_path` from (default `self`)
766 Whether to overwrite target file if it already exists.
768 if source_collection is None:
769 source_collection = self
771 # Find the object to copy
772 if isinstance(source, basestring):
773 source_obj = source_collection.find(source)
774 if source_obj is None:
775 raise IOError((errno.ENOENT, "File not found"))
776 sourcecomponents = source.split("/")
779 sourcecomponents = None
781 # Find parent collection the target path
782 targetcomponents = target_path.split("/")
784 # Determine the name to use.
785 target_name = targetcomponents[-1] if targetcomponents[-1] else (sourcecomponents[-1] if sourcecomponents else None)
788 raise errors.ArgumentError("Target path is empty and source is an object. Cannot determine destination filename to use.")
790 target_dir = self.find_or_create("/".join(targetcomponents[0:-1]), COLLECTION)
792 if target_name in target_dir:
793 if isinstance(target_dir[target_name], SynchronizedCollectionBase) and sourcecomponents:
794 target_dir = target_dir[target_name]
795 target_name = sourcecomponents[-1]
797 raise IOError((errno.EEXIST, "File already exists"))
800 if target_name in target_dir:
801 modified_from = target_dir[target_name]
803 # Actually make the copy.
804 dup = source_obj.clone(target_dir)
805 target_dir._items[target_name] = dup
806 target_dir._modified = True
809 self.notify(MOD, target_dir, target_name, (modified_from, dup))
811 self.notify(ADD, target_dir, target_name, dup)
814 def manifest_text(self, stream_name=".", strip=False, normalize=False):
815 """Get the manifest text for this collection, sub collections and files.
818 Name of the stream (directory)
821 If True, remove signing tokens from block locators if present.
822 If False (default), block locators are left unchanged.
825 If True, always export the manifest text in normalized form
826 even if the Collection is not modified. If False (default) and the collection
827 is not modified, return the original manifest text even if it is not
832 if self.modified() or self._manifest_text is None or normalize:
836 sorted_keys = sorted(item.keys())
837 for filename in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
838 # Create a stream per file `k`
839 arvfile = item[filename]
841 for segment in arvfile.segments():
842 loc = segment.locator
843 if arvfile.parent._my_block_manager().is_bufferblock(loc):
844 loc = arvfile.parent._my_block_manager().get_bufferblock(loc).locator()
846 loc = KeepLocator(loc).stripped()
847 filestream.append(LocatorAndRange(loc, KeepLocator(loc).size,
848 segment.segment_offset, segment.range_size))
849 stream[filename] = filestream
851 buf += ' '.join(normalize_stream(stream_name, stream))
853 for dirname in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
854 buf += item[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip)
858 return self.stripped_manifest()
860 return self._manifest_text
863 def diff(self, end_collection, prefix=".", holding_collection=None):
865 Generate list of add/modify/delete actions which, when given to `apply`, will
866 change `self` to match `end_collection`
869 if holding_collection is None:
870 holding_collection = Collection(api_client=self._my_api(), keep_client=self._my_keep())
872 if k not in end_collection:
873 changes.append((DEL, os.path.join(prefix, k), self[k].clone(holding_collection)))
874 for k in end_collection:
876 if isinstance(end_collection[k], Subcollection) and isinstance(self[k], Subcollection):
877 changes.extend(self[k].diff(end_collection[k], os.path.join(prefix, k), holding_collection))
878 elif end_collection[k] != self[k]:
879 changes.append((MOD, os.path.join(prefix, k), self[k].clone(holding_collection), end_collection[k].clone(holding_collection)))
881 changes.append((ADD, os.path.join(prefix, k), end_collection[k].clone(holding_collection)))
886 def apply(self, changes):
887 """Apply changes from `diff`.
889 If a change conflicts with a local change, it will be saved to an
890 alternate path indicating the conflict.
893 for change in changes:
894 event_type = change[0]
897 local = self.find(path)
898 conflictpath = "%s~conflict-%s~" % (path, time.strftime("%Y-%m-%d-%H:%M:%S",
900 if event_type == ADD:
902 # No local file at path, safe to copy over new file
903 self.copy(initial, path)
904 elif local is not None and local != initial:
905 # There is already local file and it is different:
906 # save change to conflict file.
907 self.copy(initial, conflictpath)
908 elif event_type == MOD:
911 # Local matches the "initial" item so it has not
912 # changed locally and is safe to update.
913 if isinstance(local, ArvadosFile) and isinstance(final, ArvadosFile):
914 # Replace contents of local file with new contents
915 local.replace_contents(final)
917 # Overwrite path with new item; this can happen if
918 # path was a file and is now a collection or vice versa
919 self.copy(final, path, overwrite=True)
921 # Local is missing (presumably deleted) or local doesn't
922 # match the "start" value, so save change to conflict file
923 self.copy(final, conflictpath)
924 elif event_type == DEL:
926 # Local item matches "initial" value, so it is safe to remove.
927 self.remove(path, recursive=True)
928 # else, the file is modified or already removed, in either
929 # case we don't want to try to remove it.
931 def portable_data_hash(self):
932 """Get the portable data hash for this collection's manifest."""
933 stripped = self.manifest_text(strip=True)
934 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
937 def __eq__(self, other):
940 if not isinstance(other, SynchronizedCollectionBase):
942 if len(self._items) != len(other):
944 for k in self._items:
947 if self._items[k] != other[k]:
951 def __ne__(self, other):
952 return not self.__eq__(other)
955 class Collection(SynchronizedCollectionBase):
956 """Represents the root of an Arvados Collection, which may be associated with
957 an API server Collection record.
959 Brief summary of useful methods:
961 :To read an existing file:
962 `c.open("myfile", "r")`
964 :To write a new file:
965 `c.open("myfile", "w")`
967 :To determine if a file exists:
968 `c.find("myfile") is not None`
971 `c.copy("source", "dest")`
976 :To save to an existing collection record:
979 :To save a new collection record:
982 :To merge remote changes into this object:
985 This class is threadsafe. The root collection object, all subcollections
986 and files are protected by a single lock (i.e. each access locks the entire
991 def __init__(self, manifest_locator_or_text=None,
998 """Collection constructor.
1000 :manifest_locator_or_text:
1001 One of Arvados collection UUID, block locator of
1002 a manifest, raw manifest text, or None (to create an empty collection).
1004 the parent Collection, may be None.
1006 A dict containing keys for ARVADOS_API_HOST and ARVADOS_API_TOKEN.
1007 Prefer this over supplying your own api_client and keep_client (except in testing).
1008 Will use default config settings if not specified.
1010 The API client object to use for requests. If not specified, create one using `apiconfig`.
1012 the Keep client to use for requests. If not specified, create one using `apiconfig`.
1014 the number of retries for API and Keep requests.
1016 the block manager to use. If not specified, create one.
1019 super(Collection, self).__init__(parent)
1020 self._api_client = api_client
1021 self._keep_client = keep_client
1022 self._block_manager = block_manager
1025 self._config = apiconfig
1027 self._config = config.settings()
1029 self.num_retries = num_retries if num_retries is not None else 0
1030 self._manifest_locator = None
1031 self._manifest_text = None
1032 self._api_response = None
1034 self._sync = SYNC_EXPLICIT
1035 self.lock = threading.RLock()
1039 if manifest_locator_or_text:
1040 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1041 self._manifest_locator = manifest_locator_or_text
1042 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1043 self._manifest_locator = manifest_locator_or_text
1044 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1045 self._manifest_text = manifest_locator_or_text
1047 raise errors.ArgumentError(
1048 "Argument to CollectionReader must be a manifest or a collection UUID")
1053 def root_collection(self):
1056 def stream_name(self):
1059 def sync_mode(self):
1064 def update(self, other=None, num_retries=None):
1065 """Fetch the latest collection record on the API server and merge it with the
1066 current collection contents.
1070 if self._manifest_locator is None:
1071 raise errors.ArgumentError("`other` is None but collection does not have a manifest_locator uuid")
1072 response = self._my_api().collections().get(uuid=self._manifest_locator).execute(num_retries=num_retries)
1073 other = CollectionReader(response["manifest_text"])
1074 baseline = CollectionReader(self._manifest_text)
1075 self.apply(baseline.diff(other))
1079 if self._api_client is None:
1080 self._api_client = ThreadSafeApiCache(self._config)
1081 self._keep_client = self._api_client.keep
1082 return self._api_client
1086 if self._keep_client is None:
1087 if self._api_client is None:
1090 self._keep_client = KeepClient(api_client=self._api_client)
1091 return self._keep_client
1094 def _my_block_manager(self):
1095 if self._block_manager is None:
1096 self._block_manager = _BlockManager(self._my_keep())
1097 return self._block_manager
1099 def _populate_from_api_server(self):
1100 # As in KeepClient itself, we must wait until the last
1101 # possible moment to instantiate an API client, in order to
1102 # avoid tripping up clients that don't have access to an API
1103 # server. If we do build one, make sure our Keep client uses
1104 # it. If instantiation fails, we'll fall back to the except
1105 # clause, just like any other Collection lookup
1106 # failure. Return an exception, or None if successful.
1108 self._api_response = self._my_api().collections().get(
1109 uuid=self._manifest_locator).execute(
1110 num_retries=self.num_retries)
1111 self._manifest_text = self._api_response['manifest_text']
1113 except Exception as e:
1116 def _populate_from_keep(self):
1117 # Retrieve a manifest directly from Keep. This has a chance of
1118 # working if [a] the locator includes a permission signature
1119 # or [b] the Keep services are operating in world-readable
1120 # mode. Return an exception, or None if successful.
1122 self._manifest_text = self._my_keep().get(
1123 self._manifest_locator, num_retries=self.num_retries)
1124 except Exception as e:
1127 def _populate(self):
1128 if self._manifest_locator is None and self._manifest_text is None:
1130 error_via_api = None
1131 error_via_keep = None
1132 should_try_keep = ((self._manifest_text is None) and
1133 util.keep_locator_pattern.match(
1134 self._manifest_locator))
1135 if ((self._manifest_text is None) and
1136 util.signed_locator_pattern.match(self._manifest_locator)):
1137 error_via_keep = self._populate_from_keep()
1138 if self._manifest_text is None:
1139 error_via_api = self._populate_from_api_server()
1140 if error_via_api is not None and not should_try_keep:
1142 if ((self._manifest_text is None) and
1143 not error_via_keep and
1145 # Looks like a keep locator, and we didn't already try keep above
1146 error_via_keep = self._populate_from_keep()
1147 if self._manifest_text is None:
1149 raise arvados.errors.NotFoundError(
1150 ("Failed to retrieve collection '{}' " +
1151 "from either API server ({}) or Keep ({})."
1153 self._manifest_locator,
1157 self._baseline_manifest = self._manifest_text
1158 self._import_manifest(self._manifest_text)
1161 def _has_collection_uuid(self):
1162 return self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator)
1164 def __enter__(self):
1167 def __exit__(self, exc_type, exc_value, traceback):
1168 """Support scoped auto-commit in a with: block."""
1169 if self._sync != SYNC_READONLY and self._has_collection_uuid():
1171 if self._block_manager is not None:
1172 self._block_manager.stop_threads()
1175 def clone(self, new_parent=None, readonly=False, new_config=None):
1176 if new_config is None:
1177 new_config = self._config
1179 newcollection = CollectionReader(parent=new_parent, apiconfig=new_config)
1181 newcollection = Collection(parent=new_parent, apiconfig=new_config)
1183 newcollection._sync = None
1184 self._cloneinto(newcollection)
1185 newcollection._sync = SYNC_READONLY if readonly else SYNC_EXPLICIT
1186 return newcollection
1189 def api_response(self):
1190 """Returns information about this Collection fetched from the API server.
1192 If the Collection exists in Keep but not the API server, currently
1193 returns None. Future versions may provide a synthetic response.
1196 return self._api_response
1198 def find_or_create(self, path, create_type):
1199 """See `SynchronizedCollectionBase.find_or_create`"""
1203 return super(Collection, self).find_or_create(path[2:] if path.startswith("./") else path, create_type)
1205 def find(self, path):
1206 """See `SynchronizedCollectionBase.find`"""
1210 return super(Collection, self).find(path[2:] if path.startswith("./") else path)
1212 def remove(self, path, recursive=False):
1213 """See `SynchronizedCollectionBase.remove`"""
1215 raise errors.ArgumentError("Cannot remove '.'")
1217 return super(Collection, self).remove(path[2:] if path.startswith("./") else path, recursive)
1222 def save(self, merge=True, num_retries=None):
1223 """Commit pending buffer blocks to Keep, merge with remote record (if
1224 update=True), write the manifest to Keep, and update the collection
1227 Will raise AssertionError if not associated with a collection record on
1228 the API server. If you want to save a manifest to Keep only, see
1232 Update and merge remote changes before saving. Otherwise, any
1233 remote changes will be ignored and overwritten.
1237 if not self._has_collection_uuid():
1238 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_new() for new collections.")
1239 self._my_block_manager().commit_all()
1242 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1244 text = self.manifest_text(strip=False)
1245 self._api_response = self._my_api().collections().update(
1246 uuid=self._manifest_locator,
1247 body={'manifest_text': text}
1249 num_retries=num_retries)
1250 self._manifest_text = self._api_response["manifest_text"]
1251 self.set_unmodified()
1257 def save_new(self, name=None, create_collection_record=True, owner_uuid=None, ensure_unique_name=False, num_retries=None):
1258 """Commit pending buffer blocks to Keep, write the manifest to Keep, and create
1259 a new collection record (if create_collection_record True).
1261 After creating a new collection record, this Collection object will be
1262 associated with the new record used by `save()`.
1265 The collection name.
1268 Only save the manifest to keep, do not create a collection record.
1271 the user, or project uuid that will own this collection.
1272 If None, defaults to the current user.
1274 :ensure_unique_name:
1275 If True, ask the API server to rename the collection
1276 if it conflicts with a collection with the same name and owner. If
1277 False, a name conflict will result in an error.
1280 self._my_block_manager().commit_all()
1281 self._my_keep().put(self.manifest_text(strip=True), num_retries=num_retries)
1282 text = self.manifest_text(strip=False)
1284 if create_collection_record:
1286 name = "Collection created %s" % (time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime()))
1288 body = {"manifest_text": text,
1291 body["owner_uuid"] = owner_uuid
1293 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries)
1294 text = self._api_response["manifest_text"]
1296 self._manifest_locator = self._api_response["uuid"]
1298 self._manifest_text = text
1299 self.set_unmodified()
1302 def subscribe(self, callback):
1303 self.callbacks.append(callback)
1306 def unsubscribe(self, callback):
1307 self.callbacks.remove(callback)
1310 def notify(self, event, collection, name, item):
1311 for c in self.callbacks:
1312 c(event, collection, name, item)
1315 def _import_manifest(self, manifest_text):
1316 """Import a manifest into a `Collection`.
1319 The manifest text to import from.
1323 raise ArgumentError("Can only import manifest into an empty collection")
1325 save_sync = self.sync_mode()
1335 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1339 if state == STREAM_NAME:
1340 # starting a new stream
1341 stream_name = tok.replace('\\040', ' ')
1349 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1351 blocksize = long(s.group(1))
1352 blocks.append(Range(tok, streamoffset, blocksize))
1353 streamoffset += blocksize
1357 if state == SEGMENTS:
1358 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1360 pos = long(s.group(1))
1361 size = long(s.group(2))
1362 name = s.group(3).replace('\\040', ' ')
1363 filepath = os.path.join(stream_name, name)
1364 f = self.find_or_create(filepath, FILE)
1365 if isinstance(f, ArvadosFile):
1366 f.add_segment(blocks, pos, size)
1368 raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
1371 raise errors.SyntaxError("Invalid manifest format")
1377 self.set_unmodified()
1378 self._sync = save_sync
1381 class Subcollection(SynchronizedCollectionBase):
1382 """This is a subdirectory within a collection that doesn't have its own API
1385 It falls under the umbrella of the root collection.
1389 def __init__(self, parent):
1390 super(Subcollection, self).__init__(parent)
1391 self.lock = self.root_collection().lock
1392 self._manifest_text = None
1394 def root_collection(self):
1395 return self.parent.root_collection()
1397 def sync_mode(self):
1398 return self.root_collection().sync_mode()
1401 return self.root_collection()._my_api()
1404 return self.root_collection()._my_keep()
1406 def _my_block_manager(self):
1407 return self.root_collection()._my_block_manager()
1409 def notify(self, event, collection, name, item):
1410 return self.root_collection().notify(event, collection, name, item)
1412 def stream_name(self):
1413 for k, v in self.parent.items():
1415 return os.path.join(self.parent.stream_name(), k)
1419 def clone(self, new_parent):
1420 c = Subcollection(new_parent)
1425 class CollectionReader(Collection):
1426 """A read-only collection object from an api collection record locator,
1427 a portable data hash of a manifest, or raw manifest text.
1429 See `Collection` constructor for detailed options.
1432 def __init__(self, *args, **kwargs):
1433 if not args and not kwargs.get("manifest_locator_or_text"):
1434 raise errors.ArgumentError("Must provide manifest locator or text to initialize ReadOnlyCollection")
1436 super(CollectionReader, self).__init__(*args, **kwargs)
1438 # Forego any locking since it should never change once initialized.
1439 self.lock = NoopLock()
1440 self._sync = SYNC_READONLY
1442 # Backwards compatability with old CollectionReader
1443 # all_streams() and all_files()
1444 self._streams = None
1446 def _populate_streams(orig_func):
1447 @functools.wraps(orig_func)
1448 def populate_streams_wrapper(self, *args, **kwargs):
1449 # Defer populating self._streams until needed since it creates a copy of the manifest.
1450 if self._streams is None:
1451 if self._manifest_text:
1452 self._streams = [sline.split()
1453 for sline in self._manifest_text.split("\n")
1457 return orig_func(self, *args, **kwargs)
1458 return populate_streams_wrapper
1461 def normalize(self):
1462 """Normalize the streams returned by `all_streams`.
1464 This method is kept for backwards compatability and only affects the
1465 behavior of `all_streams()` and `all_files()`
1471 for s in self.all_streams():
1472 for f in s.all_files():
1473 streamname, filename = split(s.name() + "/" + f.name())
1474 if streamname not in streams:
1475 streams[streamname] = {}
1476 if filename not in streams[streamname]:
1477 streams[streamname][filename] = []
1478 for r in f.segments:
1479 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
1481 self._streams = [normalize_stream(s, streams[s])
1482 for s in sorted(streams)]
1484 def all_streams(self):
1485 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
1486 for s in self._streams]
1489 def all_files(self):
1490 for s in self.all_streams():
1491 for f in s.all_files():