8 from collections import deque
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
20 _logger = logging.getLogger('arvados.collection')
22 class CollectionBase(object):
26 def __exit__(self, exc_type, exc_value, traceback):
30 if self._keep_client is None:
31 self._keep_client = KeepClient(api_client=self._api_client,
32 num_retries=self.num_retries)
33 return self._keep_client
35 def stripped_manifest(self):
37 Return the manifest for the current collection with all
38 non-portable hints (i.e., permission signatures and other
39 hints other than size hints) removed from the locators.
41 raw = self.manifest_text()
43 for line in raw.split("\n"):
46 clean_fields = fields[:1] + [
47 (re.sub(r'\+[^\d][^\+]*', '', x)
48 if re.match(util.keep_locator_pattern, x)
51 clean += [' '.join(clean_fields), "\n"]
55 class CollectionReader(CollectionBase):
56 def __init__(self, manifest_locator_or_text, api_client=None,
57 keep_client=None, num_retries=0):
58 """Instantiate a CollectionReader.
60 This class parses Collection manifests to provide a simple interface
61 to read its underlying files.
64 * manifest_locator_or_text: One of a Collection UUID, portable data
65 hash, or full manifest text.
66 * api_client: The API client to use to look up Collections. If not
67 provided, CollectionReader will build one from available Arvados
69 * keep_client: The KeepClient to use to download Collection data.
70 If not provided, CollectionReader will build one from available
71 Arvados configuration.
72 * num_retries: The default number of times to retry failed
73 service requests. Default 0. You may change this value
74 after instantiation, but note those changes may not
75 propagate to related objects like the Keep client.
77 self._api_client = api_client
78 self._keep_client = keep_client
79 self.num_retries = num_retries
80 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
81 self._manifest_locator = manifest_locator_or_text
82 self._manifest_text = None
83 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
84 self._manifest_locator = manifest_locator_or_text
85 self._manifest_text = None
86 elif re.match(util.manifest_pattern, manifest_locator_or_text):
87 self._manifest_text = manifest_locator_or_text
88 self._manifest_locator = None
90 raise errors.ArgumentError(
91 "Argument to CollectionReader must be a manifest or a collection UUID")
92 self._api_response = None
95 def _populate_from_api_server(self):
96 # As in KeepClient itself, we must wait until the last
97 # possible moment to instantiate an API client, in order to
98 # avoid tripping up clients that don't have access to an API
99 # server. If we do build one, make sure our Keep client uses
100 # it. If instantiation fails, we'll fall back to the except
101 # clause, just like any other Collection lookup
102 # failure. Return an exception, or None if successful.
104 if self._api_client is None:
105 self._api_client = arvados.api('v1')
106 self._keep_client = None # Make a new one with the new api.
107 self._api_response = self._api_client.collections().get(
108 uuid=self._manifest_locator).execute(
109 num_retries=self.num_retries)
110 self._manifest_text = self._api_response['manifest_text']
112 except Exception as e:
115 def _populate_from_keep(self):
116 # Retrieve a manifest directly from Keep. This has a chance of
117 # working if [a] the locator includes a permission signature
118 # or [b] the Keep services are operating in world-readable
119 # mode. Return an exception, or None if successful.
121 self._manifest_text = self._my_keep().get(
122 self._manifest_locator, num_retries=self.num_retries)
123 except Exception as e:
128 error_via_keep = None
129 should_try_keep = ((self._manifest_text is None) and
130 util.keep_locator_pattern.match(
131 self._manifest_locator))
132 if ((self._manifest_text is None) and
133 util.signed_locator_pattern.match(self._manifest_locator)):
134 error_via_keep = self._populate_from_keep()
135 if self._manifest_text is None:
136 error_via_api = self._populate_from_api_server()
137 if error_via_api is not None and not should_try_keep:
139 if ((self._manifest_text is None) and
140 not error_via_keep and
142 # Looks like a keep locator, and we didn't already try keep above
143 error_via_keep = self._populate_from_keep()
144 if self._manifest_text is None:
146 raise arvados.errors.NotFoundError(
147 ("Failed to retrieve collection '{}' " +
148 "from either API server ({}) or Keep ({})."
150 self._manifest_locator,
153 self._streams = [sline.split()
154 for sline in self._manifest_text.split("\n")
158 def _populate_first(orig_func):
159 # Decorator for methods that read actual Collection data.
160 @functools.wraps(orig_func)
161 def wrapper(self, *args, **kwargs):
162 if self._streams is None:
164 return orig_func(self, *args, **kwargs)
168 def api_response(self):
169 """api_response() -> dict or None
171 Returns information about this Collection fetched from the API server.
172 If the Collection exists in Keep but not the API server, currently
173 returns None. Future versions may provide a synthetic response.
175 return self._api_response
181 for s in self.all_streams():
182 for f in s.all_files():
183 streamname, filename = split(s.name() + "/" + f.name())
184 if streamname not in streams:
185 streams[streamname] = {}
186 if filename not in streams[streamname]:
187 streams[streamname][filename] = []
189 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
191 self._streams = [normalize_stream(s, streams[s])
192 for s in sorted(streams)]
194 # Regenerate the manifest text based on the normalized streams
195 self._manifest_text = ''.join(
196 [StreamReader(stream, keep=self._my_keep()).manifest_text()
197 for stream in self._streams])
200 def open(self, streampath, filename=None):
201 """open(streampath[, filename]) -> file-like object
203 Pass in the path of a file to read from the Collection, either as a
204 single string or as two separate stream name and file name arguments.
205 This method returns a file-like object to read that file.
208 streampath, filename = split(streampath)
209 keep_client = self._my_keep()
210 for stream_s in self._streams:
211 stream = StreamReader(stream_s, keep_client,
212 num_retries=self.num_retries)
213 if stream.name() == streampath:
216 raise ValueError("stream '{}' not found in Collection".
219 return stream.files()[filename]
221 raise ValueError("file '{}' not found in Collection stream '{}'".
222 format(filename, streampath))
225 def all_streams(self):
226 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
227 for s in self._streams]
230 for s in self.all_streams():
231 for f in s.all_files():
235 def manifest_text(self, strip=False, normalize=False):
237 cr = CollectionReader(self.manifest_text())
239 return cr.manifest_text(strip=strip, normalize=False)
241 return self.stripped_manifest()
243 return self._manifest_text
246 class _WriterFile(ArvadosFileBase):
247 def __init__(self, coll_writer, name):
248 super(_WriterFile, self).__init__(name, 'wb')
249 self.dest = coll_writer
252 super(_WriterFile, self).close()
253 self.dest.finish_current_file()
255 @ArvadosFileBase._before_close
256 def write(self, data):
257 self.dest.write(data)
259 @ArvadosFileBase._before_close
260 def writelines(self, seq):
264 @ArvadosFileBase._before_close
266 self.dest.flush_data()
269 class CollectionWriter(CollectionBase):
270 def __init__(self, api_client=None, num_retries=0):
271 """Instantiate a CollectionWriter.
273 CollectionWriter lets you build a new Arvados Collection from scratch.
274 Write files to it. The CollectionWriter will upload data to Keep as
275 appropriate, and provide you with the Collection manifest text when
279 * api_client: The API client to use to look up Collections. If not
280 provided, CollectionReader will build one from available Arvados
282 * num_retries: The default number of times to retry failed
283 service requests. Default 0. You may change this value
284 after instantiation, but note those changes may not
285 propagate to related objects like the Keep client.
287 self._api_client = api_client
288 self.num_retries = num_retries
289 self._keep_client = None
290 self._data_buffer = []
291 self._data_buffer_len = 0
292 self._current_stream_files = []
293 self._current_stream_length = 0
294 self._current_stream_locators = []
295 self._current_stream_name = '.'
296 self._current_file_name = None
297 self._current_file_pos = 0
298 self._finished_streams = []
299 self._close_file = None
300 self._queued_file = None
301 self._queued_dirents = deque()
302 self._queued_trees = deque()
303 self._last_open = None
305 def __exit__(self, exc_type, exc_value, traceback):
309 def do_queued_work(self):
310 # The work queue consists of three pieces:
311 # * _queued_file: The file object we're currently writing to the
313 # * _queued_dirents: Entries under the current directory
314 # (_queued_trees[0]) that we want to write or recurse through.
315 # This may contain files from subdirectories if
316 # max_manifest_depth == 0 for this directory.
317 # * _queued_trees: Directories that should be written as separate
318 # streams to the Collection.
319 # This function handles the smallest piece of work currently queued
320 # (current file, then current directory, then next directory) until
321 # no work remains. The _work_THING methods each do a unit of work on
322 # THING. _queue_THING methods add a THING to the work queue.
324 if self._queued_file:
326 elif self._queued_dirents:
328 elif self._queued_trees:
333 def _work_file(self):
335 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
339 self.finish_current_file()
341 self._queued_file.close()
342 self._close_file = None
343 self._queued_file = None
345 def _work_dirents(self):
346 path, stream_name, max_manifest_depth = self._queued_trees[0]
347 if stream_name != self.current_stream_name():
348 self.start_new_stream(stream_name)
349 while self._queued_dirents:
350 dirent = self._queued_dirents.popleft()
351 target = os.path.join(path, dirent)
352 if os.path.isdir(target):
353 self._queue_tree(target,
354 os.path.join(stream_name, dirent),
355 max_manifest_depth - 1)
357 self._queue_file(target, dirent)
359 if not self._queued_dirents:
360 self._queued_trees.popleft()
362 def _work_trees(self):
363 path, stream_name, max_manifest_depth = self._queued_trees[0]
364 d = util.listdir_recursive(
365 path, max_depth = (None if max_manifest_depth == 0 else 0))
367 self._queue_dirents(stream_name, d)
369 self._queued_trees.popleft()
371 def _queue_file(self, source, filename=None):
372 assert (self._queued_file is None), "tried to queue more than one file"
373 if not hasattr(source, 'read'):
374 source = open(source, 'rb')
375 self._close_file = True
377 self._close_file = False
379 filename = os.path.basename(source.name)
380 self.start_new_file(filename)
381 self._queued_file = source
383 def _queue_dirents(self, stream_name, dirents):
384 assert (not self._queued_dirents), "tried to queue more than one tree"
385 self._queued_dirents = deque(sorted(dirents))
387 def _queue_tree(self, path, stream_name, max_manifest_depth):
388 self._queued_trees.append((path, stream_name, max_manifest_depth))
390 def write_file(self, source, filename=None):
391 self._queue_file(source, filename)
392 self.do_queued_work()
394 def write_directory_tree(self,
395 path, stream_name='.', max_manifest_depth=-1):
396 self._queue_tree(path, stream_name, max_manifest_depth)
397 self.do_queued_work()
399 def write(self, newdata):
400 if hasattr(newdata, '__iter__'):
404 self._data_buffer.append(newdata)
405 self._data_buffer_len += len(newdata)
406 self._current_stream_length += len(newdata)
407 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
410 def open(self, streampath, filename=None):
411 """open(streampath[, filename]) -> file-like object
413 Pass in the path of a file to write to the Collection, either as a
414 single string or as two separate stream name and file name arguments.
415 This method returns a file-like object you can write to add it to the
418 You may only have one file object from the Collection open at a time,
419 so be sure to close the object when you're done. Using the object in
420 a with statement makes that easy::
422 with cwriter.open('./doc/page1.txt') as outfile:
423 outfile.write(page1_data)
424 with cwriter.open('./doc/page2.txt') as outfile:
425 outfile.write(page2_data)
428 streampath, filename = split(streampath)
429 if self._last_open and not self._last_open.closed:
430 raise errors.AssertionError(
431 "can't open '{}' when '{}' is still open".format(
432 filename, self._last_open.name))
433 if streampath != self.current_stream_name():
434 self.start_new_stream(streampath)
435 self.set_current_file_name(filename)
436 self._last_open = _WriterFile(self, filename)
437 return self._last_open
439 def flush_data(self):
440 data_buffer = ''.join(self._data_buffer)
442 self._current_stream_locators.append(
443 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
444 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
445 self._data_buffer_len = len(self._data_buffer[0])
447 def start_new_file(self, newfilename=None):
448 self.finish_current_file()
449 self.set_current_file_name(newfilename)
451 def set_current_file_name(self, newfilename):
452 if re.search(r'[\t\n]', newfilename):
453 raise errors.AssertionError(
454 "Manifest filenames cannot contain whitespace: %s" %
456 elif re.search(r'\x00', newfilename):
457 raise errors.AssertionError(
458 "Manifest filenames cannot contain NUL characters: %s" %
460 self._current_file_name = newfilename
462 def current_file_name(self):
463 return self._current_file_name
465 def finish_current_file(self):
466 if self._current_file_name is None:
467 if self._current_file_pos == self._current_stream_length:
469 raise errors.AssertionError(
470 "Cannot finish an unnamed file " +
471 "(%d bytes at offset %d in '%s' stream)" %
472 (self._current_stream_length - self._current_file_pos,
473 self._current_file_pos,
474 self._current_stream_name))
475 self._current_stream_files.append([
476 self._current_file_pos,
477 self._current_stream_length - self._current_file_pos,
478 self._current_file_name])
479 self._current_file_pos = self._current_stream_length
480 self._current_file_name = None
482 def start_new_stream(self, newstreamname='.'):
483 self.finish_current_stream()
484 self.set_current_stream_name(newstreamname)
486 def set_current_stream_name(self, newstreamname):
487 if re.search(r'[\t\n]', newstreamname):
488 raise errors.AssertionError(
489 "Manifest stream names cannot contain whitespace")
490 self._current_stream_name = '.' if newstreamname=='' else newstreamname
492 def current_stream_name(self):
493 return self._current_stream_name
495 def finish_current_stream(self):
496 self.finish_current_file()
498 if not self._current_stream_files:
500 elif self._current_stream_name is None:
501 raise errors.AssertionError(
502 "Cannot finish an unnamed stream (%d bytes in %d files)" %
503 (self._current_stream_length, len(self._current_stream_files)))
505 if not self._current_stream_locators:
506 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
507 self._finished_streams.append([self._current_stream_name,
508 self._current_stream_locators,
509 self._current_stream_files])
510 self._current_stream_files = []
511 self._current_stream_length = 0
512 self._current_stream_locators = []
513 self._current_stream_name = None
514 self._current_file_pos = 0
515 self._current_file_name = None
518 # Store the manifest in Keep and return its locator.
519 return self._my_keep().put(self.manifest_text())
521 def portable_data_hash(self):
522 stripped = self.stripped_manifest()
523 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
525 def manifest_text(self):
526 self.finish_current_stream()
529 for stream in self._finished_streams:
530 if not re.search(r'^\.(/.*)?$', stream[0]):
532 manifest += stream[0].replace(' ', '\\040')
533 manifest += ' ' + ' '.join(stream[1])
534 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
539 def data_locators(self):
541 for name, locators, files in self._finished_streams:
546 class ResumableCollectionWriter(CollectionWriter):
547 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
548 '_current_stream_locators', '_current_stream_name',
549 '_current_file_name', '_current_file_pos', '_close_file',
550 '_data_buffer', '_dependencies', '_finished_streams',
551 '_queued_dirents', '_queued_trees']
553 def __init__(self, api_client=None, num_retries=0):
554 self._dependencies = {}
555 super(ResumableCollectionWriter, self).__init__(
556 api_client, num_retries=num_retries)
559 def from_state(cls, state, *init_args, **init_kwargs):
560 # Try to build a new writer from scratch with the given state.
561 # If the state is not suitable to resume (because files have changed,
562 # been deleted, aren't predictable, etc.), raise a
563 # StaleWriterStateError. Otherwise, return the initialized writer.
564 # The caller is responsible for calling writer.do_queued_work()
565 # appropriately after it's returned.
566 writer = cls(*init_args, **init_kwargs)
567 for attr_name in cls.STATE_PROPS:
568 attr_value = state[attr_name]
569 attr_class = getattr(writer, attr_name).__class__
570 # Coerce the value into the same type as the initial value, if
572 if attr_class not in (type(None), attr_value.__class__):
573 attr_value = attr_class(attr_value)
574 setattr(writer, attr_name, attr_value)
575 # Check dependencies before we try to resume anything.
576 if any(KeepLocator(ls).permission_expired()
577 for ls in writer._current_stream_locators):
578 raise errors.StaleWriterStateError(
579 "locators include expired permission hint")
580 writer.check_dependencies()
581 if state['_current_file'] is not None:
582 path, pos = state['_current_file']
584 writer._queued_file = open(path, 'rb')
585 writer._queued_file.seek(pos)
586 except IOError as error:
587 raise errors.StaleWriterStateError(
588 "failed to reopen active file {}: {}".format(path, error))
591 def check_dependencies(self):
592 for path, orig_stat in self._dependencies.items():
593 if not S_ISREG(orig_stat[ST_MODE]):
594 raise errors.StaleWriterStateError("{} not file".format(path))
596 now_stat = tuple(os.stat(path))
597 except OSError as error:
598 raise errors.StaleWriterStateError(
599 "failed to stat {}: {}".format(path, error))
600 if ((not S_ISREG(now_stat[ST_MODE])) or
601 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
602 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
603 raise errors.StaleWriterStateError("{} changed".format(path))
605 def dump_state(self, copy_func=lambda x: x):
606 state = {attr: copy_func(getattr(self, attr))
607 for attr in self.STATE_PROPS}
608 if self._queued_file is None:
609 state['_current_file'] = None
611 state['_current_file'] = (os.path.realpath(self._queued_file.name),
612 self._queued_file.tell())
615 def _queue_file(self, source, filename=None):
617 src_path = os.path.realpath(source)
619 raise errors.AssertionError("{} not a file path".format(source))
621 path_stat = os.stat(src_path)
622 except OSError as stat_error:
624 super(ResumableCollectionWriter, self)._queue_file(source, filename)
625 fd_stat = os.fstat(self._queued_file.fileno())
626 if not S_ISREG(fd_stat.st_mode):
627 # We won't be able to resume from this cache anyway, so don't
628 # worry about further checks.
629 self._dependencies[source] = tuple(fd_stat)
630 elif path_stat is None:
631 raise errors.AssertionError(
632 "could not stat {}: {}".format(source, stat_error))
633 elif path_stat.st_ino != fd_stat.st_ino:
634 raise errors.AssertionError(
635 "{} changed between open and stat calls".format(source))
637 self._dependencies[src_path] = tuple(fd_stat)
639 def write(self, data):
640 if self._queued_file is None:
641 raise errors.AssertionError(
642 "resumable writer can't accept unsourced data")
643 return super(ResumableCollectionWriter, self).write(data)
646 class SynchronizedCollectionBase(CollectionBase):
654 def __init__(self, parent=None):
659 raise NotImplementedError()
662 raise NotImplementedError()
664 def _my_block_manager(self):
665 raise NotImplementedError()
667 def _root_lock(self):
668 raise NotImplementedError()
671 raise NotImplementedError()
674 raise NotImplementedError()
676 def notify(self, collection, event, name, item):
677 raise NotImplementedError()
680 def find(self, path, create=False, create_collection=False):
681 """Recursively search the specified file path. May return either a Collection
685 If true, create path components (i.e. Collections) that are
686 missing. If "create" is False, return None if a path component is
690 If the path is not found, "create" is True, and
691 "create_collection" is False, then create and return a new
692 ArvadosFile for the last path component. If "create_collection" is
693 True, then create and return a new Collection for the last path
697 if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
698 raise IOError((errno.EROFS, "Collection is read only"))
705 item = self._items.get(p[0])
707 # item must be a file
708 if item is None and create:
710 if create_collection:
711 item = Subcollection(self)
713 item = ArvadosFile(self)
714 self._items[p[0]] = item
715 self.notify(self, ADD, p[0], item)
718 if item is None and create:
719 # create new collection
720 item = Subcollection(self)
721 self._items[p[0]] = item
722 self.notify(self, ADD, p[0], item)
724 return item.find("/".join(p), create=create)
728 def open(self, path, mode):
729 """Open a file-like object for access.
732 path to a file in the collection
734 one of "r", "r+", "w", "w+", "a", "a+"
738 opens for reading and writing. Reads/writes share a file pointer.
740 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
742 opens for reading and writing. All writes are appended to
743 the end of the file. Writing does not affect the file pointer for
746 mode = mode.replace("b", "")
747 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
748 raise ArgumentError("Bad mode '%s'" % mode)
749 create = (mode != "r")
751 if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
752 raise IOError((errno.EROFS, "Collection is read only"))
754 f = self.find(path, create=create)
756 raise IOError((errno.ENOENT, "File not found"))
757 if not isinstance(f, ArvadosFile):
758 raise IOError((errno.EISDIR, "Path must refer to a file."))
764 return ArvadosFileReader(f, path, mode)
766 return ArvadosFileWriter(f, path, mode)
770 """Test if the collection (or any subcollection or file) has been modified
771 since it was created."""
772 for k,v in self._items.items():
778 def set_unmodified(self):
779 """Recursively clear modified flag"""
780 for k,v in self._items.items():
785 """Iterate over names of files and collections contained in this collection."""
786 return self._items.keys()
790 """Iterate over names of files and collections directly contained in this collection."""
791 return self._items.keys()
794 def __getitem__(self, k):
795 """Get a file or collection that is directly contained by this collection. If
796 you want to search a path, use `find()` instead.
798 return self._items[k]
801 def __contains__(self, k):
802 """If there is a file or collection a directly contained by this collection
804 return k in self._items
808 """Get the number of items directly contained in this collection"""
809 return len(self._items)
813 def __delitem__(self, p):
814 """Delete an item by name which is directly contained by this collection."""
816 self.notify(self, DEL, p, None)
820 """Get a list of names of files and collections directly contained in this collection."""
821 return self._items.keys()
825 """Get a list of files and collection objects directly contained in this collection."""
826 return self._items.values()
830 """Get a list of (name, object) tuples directly contained in this collection."""
831 return self._items.items()
833 def exists(self, path):
834 """Test if there is a file or collection at "path" """
835 return self.find(path) != None
839 def remove(self, path, rm_r=False):
840 """Remove the file or subcollection (directory) at `path`.
842 Specify whether to remove non-empty subcollections (True), or raise an error (False).
846 # Remove '.' from the front of the path
850 item = self._items.get(p[0])
852 raise IOError((errno.ENOENT, "File not found"))
854 if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
855 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
856 del self._items[p[0]]
857 self.notify(self, DEL, p[0], None)
860 item.remove("/".join(p))
862 raise IOError((errno.ENOENT, "File not found"))
864 def _cloneinto(self, target):
865 for k,v in self._items:
866 target._items[k] = v.clone(new_parent=target)
869 raise NotImplementedError()
873 def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
875 copyto('/foo', '/bar') will overwrite 'foo' if it exists.
876 copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
878 if source_collection is None:
879 source_collection = self
881 # Find the object to copy
882 sp = source_path.split("/")
883 source_obj = source_collection.find(source_path)
884 if source_obj is None:
885 raise IOError((errno.ENOENT, "File not found"))
887 # Find parent collection the target path
888 tp = target_path.split("/")
889 target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
891 # Determine the name to use.
892 target_name = tp[-1] if tp[-1] else sp[-1]
894 if target_name in target_dir and not overwrite:
895 raise IOError((errno.EEXIST, "File already exists"))
897 # Actually make the copy.
898 dup = source_obj.clone(target_dir)
899 with target_dir.lock:
900 target_dir._items[target_name] = dup
902 self.notify(target_dir, ADD, target_name, dup)
906 def manifest_text(self, strip=False, normalize=False):
907 """Get the manifest text for this collection, sub collections and files.
910 If True, remove signing tokens from block locators if present.
911 If False, block locators are left unchanged.
914 If True, always export the manifest text in normalized form
915 even if the Collection is not modified. If False and the collection
916 is not modified, return the original manifest text even if it is not
920 if self.modified() or self._manifest_text is None or normalize:
921 return export_manifest(self, stream_name=".", portable_locators=strip)
924 return self.stripped_manifest()
926 return self._manifest_text
930 def merge(self, other):
931 for k in other.keys():
933 if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
934 self[k].merge(other[k])
936 if self[k] != other[k]:
937 name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
939 self[name] = other[k].clone(self)
940 self.notify(self, name, ADD, self[name])
942 self[k] = other[k].clone(self)
943 self.notify(self, k, ADD, self[k])
945 def portable_data_hash(self):
946 """Get the portable data hash for this collection's manifest."""
947 stripped = self.manifest_text(strip=True)
948 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
951 class Collection(SynchronizedCollectionBase):
952 """Store an Arvados collection, consisting of a set of files and
956 def __init__(self, manifest_locator_or_text=None,
963 sync=Collection.SYNC_READONLY):
964 """:manifest_locator_or_text:
965 One of Arvados collection UUID, block locator of
966 a manifest, raw manifest text, or None (to create an empty collection).
968 the parent Collection, may be None.
970 the arvados configuration to get the hostname and api token.
971 Prefer this over supplying your own api_client and keep_client (except in testing).
972 Will use default config settings if not specified.
974 The API client object to use for requests. If not specified, create one using `config`.
976 the Keep client to use for requests. If not specified, create one using `config`.
978 the number of retries for API and Keep requests.
980 the block manager to use. If not specified, create one.
982 Set synchronization policy with API server collection record.
984 Collection is read only. No synchronization. This mode will
985 also forego locking, which gives better performance.
987 Synchronize on explicit request via `merge()` or `save()`
989 Synchronize with server in response to background websocket events,
990 on block write, or on file close.
996 self._api_client = api_client
997 self._keep_client = keep_client
998 self._block_manager = block_manager
999 self._config = config
1000 self.num_retries = num_retries
1001 self._manifest_locator = None
1002 self._manifest_text = None
1003 self._api_response = None
1005 self.lock = threading.RLock()
1008 if manifest_locator_or_text:
1009 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1010 self._manifest_locator = manifest_locator_or_text
1011 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1012 self._manifest_locator = manifest_locator_or_text
1013 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1014 self._manifest_text = manifest_locator_or_text
1016 raise errors.ArgumentError(
1017 "Argument to CollectionReader must be a manifest or a collection UUID")
1021 if self._sync == SYNC_LIVE:
1022 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1023 raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1024 self.events = events.subscribe(arvados.api(), filters=[["object_uuid", "=", self._manifest_locator]], self.on_message)
1027 def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1028 c = Collection(sync=SYNC_EXPLICIT)
1029 c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1032 def _root_lock(self):
1035 def sync_mode(self):
1040 n = self._my_api().collections().get(uuid=self._manifest_locator, select=[["manifest_text"])).execute()
1041 other = import_collection(n["manifest_text"])
1046 if self._api_client is None:
1047 self._api_client = arvados.api.SafeApi(self._config)
1048 self._keep_client = self._api_client.keep
1049 return self._api_client
1053 if self._keep_client is None:
1054 if self._api_client is None:
1057 self._keep_client = KeepClient(api=self._api_client)
1058 return self._keep_client
1061 def _my_block_manager(self):
1062 if self._block_manager is None:
1063 self._block_manager = BlockManager(self._my_keep())
1064 return self._block_manager
1066 def _populate_from_api_server(self):
1067 # As in KeepClient itself, we must wait until the last
1068 # possible moment to instantiate an API client, in order to
1069 # avoid tripping up clients that don't have access to an API
1070 # server. If we do build one, make sure our Keep client uses
1071 # it. If instantiation fails, we'll fall back to the except
1072 # clause, just like any other Collection lookup
1073 # failure. Return an exception, or None if successful.
1075 self._api_response = self._my_api().collections().get(
1076 uuid=self._manifest_locator).execute(
1077 num_retries=self.num_retries)
1078 self._manifest_text = self._api_response['manifest_text']
1080 except Exception as e:
1083 def _populate_from_keep(self):
1084 # Retrieve a manifest directly from Keep. This has a chance of
1085 # working if [a] the locator includes a permission signature
1086 # or [b] the Keep services are operating in world-readable
1087 # mode. Return an exception, or None if successful.
1089 self._manifest_text = self._my_keep().get(
1090 self._manifest_locator, num_retries=self.num_retries)
1091 except Exception as e:
1094 def _populate(self):
1096 if self._manifest_locator is None and self._manifest_text is None:
1098 error_via_api = None
1099 error_via_keep = None
1100 should_try_keep = ((self._manifest_text is None) and
1101 util.keep_locator_pattern.match(
1102 self._manifest_locator))
1103 if ((self._manifest_text is None) and
1104 util.signed_locator_pattern.match(self._manifest_locator)):
1105 error_via_keep = self._populate_from_keep()
1106 if self._manifest_text is None:
1107 error_via_api = self._populate_from_api_server()
1108 if error_via_api is not None and not should_try_keep:
1110 if ((self._manifest_text is None) and
1111 not error_via_keep and
1113 # Looks like a keep locator, and we didn't already try keep above
1114 error_via_keep = self._populate_from_keep()
1115 if self._manifest_text is None:
1117 raise arvados.errors.NotFoundError(
1118 ("Failed to retrieve collection '{}' " +
1119 "from either API server ({}) or Keep ({})."
1121 self._manifest_locator,
1125 import_manifest(self._manifest_text, self)
1127 if self._sync == SYNC_READONLY:
1128 # Now that we're populated, knowing that this will be readonly,
1129 # forego any further locking.
1130 self.lock = NoopLock()
1132 def __enter__(self):
1135 def __exit__(self, exc_type, exc_value, traceback):
1136 """Support scoped auto-commit in a with: block"""
1137 self.save(allow_no_locator=True)
1138 if self._block_manager is not None:
1139 self._block_manager.stop_threads()
1142 def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
1143 c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1144 if new_sync == Collection.SYNC_READONLY:
1151 def api_response(self):
1153 api_response() -> dict or None
1155 Returns information about this Collection fetched from the API server.
1156 If the Collection exists in Keep but not the API server, currently
1157 returns None. Future versions may provide a synthetic response.
1159 return self._api_response
1163 def save(self, allow_no_locator=False):
1164 """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1165 update the collection record to Keep.
1168 If there is no collection uuid associated with this
1169 Collection and `allow_no_locator` is False, raise an error. If True,
1170 do not raise an error.
1173 self._my_block_manager().commit_all()
1174 self._my_keep().put(self.manifest_text(strip=True))
1175 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1176 self._api_response = self._my_api().collections().update(
1177 uuid=self._manifest_locator,
1178 body={'manifest_text': self.manifest_text(strip=False)}
1180 num_retries=self.num_retries)
1181 elif not allow_no_locator:
1182 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1183 self.set_unmodified()
1187 def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1188 """Save a new collection record.
1191 The collection name.
1194 the user, or project uuid that will own this collection.
1195 If None, defaults to the current user.
1197 :ensure_unique_name:
1198 If True, ask the API server to rename the collection
1199 if it conflicts with a collection with the same name and owner. If
1200 False, a name conflict will result in an error.
1203 self._my_block_manager().commit_all()
1204 self._my_keep().put(self.manifest_text(strip=True))
1205 body = {"manifest_text": self.manifest_text(strip=False),
1208 body["owner_uuid"] = owner_uuid
1209 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1212 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1214 self._manifest_locator = self._api_response["uuid"]
1217 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1219 self.set_unmodified()
1222 def subscribe(self, callback):
1223 self.callbacks.append(callback)
1226 def unsubscribe(self, callback):
1227 self.callbacks.remove(callback)
1230 def notify(self, event):
1231 for c in self.callbacks:
1234 class Subcollection(SynchronizedCollectionBase):
1235 """This is a subdirectory within a collection that doesn't have its own API
1236 server record. It falls under the umbrella of the root collection."""
1238 def __init__(self, parent):
1239 super(Subcollection, self).__init__(parent)
1240 self.lock = parent._root_lock()
1243 return self.parent._root_lock()
1245 def sync_mode(self):
1246 return self.parent.sync_mode()
1249 return self.parent._my_api()
1252 return self.parent._my_keep()
1254 def _my_block_manager(self):
1255 return self.parent._my_block_manager()
1257 def _populate(self):
1258 self.parent._populate()
1260 def notify(self, event):
1261 self.parent.notify(event)
1264 def clone(self, new_parent):
1265 c = Subcollection(new_parent)
1270 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1271 """Import a manifest into a `Collection`.
1274 The manifest text to import from.
1277 The `Collection` that will be initialized (must be empty).
1278 If None, create a new `Collection` object.
1281 The API client object that will be used when creating a new `Collection` object.
1284 The keep client object that will be used when creating a new `Collection` object.
1287 the default number of api client and keep retries on error.
1289 if into_collection is not None:
1290 if len(into_collection) > 0:
1291 raise ArgumentError("Can only import manifest into an empty collection")
1294 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1303 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1307 if state == STREAM_NAME:
1308 # starting a new stream
1309 stream_name = tok.replace('\\040', ' ')
1317 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1319 blocksize = long(s.group(1))
1320 blocks.append(Range(tok, streamoffset, blocksize))
1321 streamoffset += blocksize
1325 if state == SEGMENTS:
1326 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1328 pos = long(s.group(1))
1329 size = long(s.group(2))
1330 name = s.group(3).replace('\\040', ' ')
1331 f = c.find("%s/%s" % (stream_name, name), create=True)
1332 f.add_segment(blocks, pos, size)
1335 raise errors.SyntaxError("Invalid manifest format")
1344 def export_manifest(item, stream_name=".", portable_locators=False):
1347 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1348 `item` is a is a `Collection`, this will also export subcollections.
1351 the name of the stream when exporting `item`.
1354 If True, strip any permission hints on block locators.
1355 If False, use block locators as-is.
1358 if isinstance(item, SynchronizedCollectionBase):
1360 sorted_keys = sorted(item.keys())
1361 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1364 for s in v.segments:
1366 if loc.startswith("bufferblock"):
1367 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1368 if portable_locators:
1369 loc = KeepLocator(loc).stripped()
1370 st.append(LocatorAndRange(loc, locator_block_size(loc),
1371 s.segment_offset, s.range_size))
1374 buf += ' '.join(normalize_stream(stream_name, stream))
1376 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1377 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1378 elif isinstance(item, ArvadosFile):
1380 for s in item.segments:
1382 if loc.startswith("bufferblock"):
1383 loc = item._bufferblocks[loc].calculate_locator()
1384 if portable_locators:
1385 loc = KeepLocator(loc).stripped()
1386 st.append(LocatorAndRange(loc, locator_block_size(loc),
1387 s.segment_offset, s.range_size))
1388 stream[stream_name] = st
1389 buf += ' '.join(normalize_stream(stream_name, stream))