7 from collections import deque
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
18 _logger = logging.getLogger('arvados.collection')
20 class CollectionBase(object):
24 def __exit__(self, exc_type, exc_value, traceback):
28 if self._keep_client is None:
29 self._keep_client = KeepClient(api_client=self._api_client,
30 num_retries=self.num_retries)
31 return self._keep_client
33 def stripped_manifest(self):
35 Return the manifest for the current collection with all
36 non-portable hints (i.e., permission signatures and other
37 hints other than size hints) removed from the locators.
39 raw = self.manifest_text()
41 for line in raw.split("\n"):
44 clean_fields = fields[:1] + [
45 (re.sub(r'\+[^\d][^\+]*', '', x)
46 if re.match(util.keep_locator_pattern, x)
49 clean += [' '.join(clean_fields), "\n"]
53 class CollectionReader(CollectionBase):
54 def __init__(self, manifest_locator_or_text, api_client=None,
55 keep_client=None, num_retries=0):
56 """Instantiate a CollectionReader.
58 This class parses Collection manifests to provide a simple interface
59 to read its underlying files.
62 * manifest_locator_or_text: One of a Collection UUID, portable data
63 hash, or full manifest text.
64 * api_client: The API client to use to look up Collections. If not
65 provided, CollectionReader will build one from available Arvados
67 * keep_client: The KeepClient to use to download Collection data.
68 If not provided, CollectionReader will build one from available
69 Arvados configuration.
70 * num_retries: The default number of times to retry failed
71 service requests. Default 0. You may change this value
72 after instantiation, but note those changes may not
73 propagate to related objects like the Keep client.
75 self._api_client = api_client
76 self._keep_client = keep_client
77 self.num_retries = num_retries
78 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79 self._manifest_locator = manifest_locator_or_text
80 self._manifest_text = None
81 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82 self._manifest_locator = manifest_locator_or_text
83 self._manifest_text = None
84 elif re.match(util.manifest_pattern, manifest_locator_or_text):
85 self._manifest_text = manifest_locator_or_text
86 self._manifest_locator = None
88 raise errors.ArgumentError(
89 "Argument to CollectionReader must be a manifest or a collection UUID")
90 self._api_response = None
93 def _populate_from_api_server(self):
94 # As in KeepClient itself, we must wait until the last
95 # possible moment to instantiate an API client, in order to
96 # avoid tripping up clients that don't have access to an API
97 # server. If we do build one, make sure our Keep client uses
98 # it. If instantiation fails, we'll fall back to the except
99 # clause, just like any other Collection lookup
100 # failure. Return an exception, or None if successful.
102 if self._api_client is None:
103 self._api_client = arvados.api('v1')
104 self._keep_client = None # Make a new one with the new api.
105 self._api_response = self._api_client.collections().get(
106 uuid=self._manifest_locator).execute(
107 num_retries=self.num_retries)
108 self._manifest_text = self._api_response['manifest_text']
110 except Exception as e:
113 def _populate_from_keep(self):
114 # Retrieve a manifest directly from Keep. This has a chance of
115 # working if [a] the locator includes a permission signature
116 # or [b] the Keep services are operating in world-readable
117 # mode. Return an exception, or None if successful.
119 self._manifest_text = self._my_keep().get(
120 self._manifest_locator, num_retries=self.num_retries)
121 except Exception as e:
126 error_via_keep = None
127 should_try_keep = ((self._manifest_text is None) and
128 util.keep_locator_pattern.match(
129 self._manifest_locator))
130 if ((self._manifest_text is None) and
131 util.signed_locator_pattern.match(self._manifest_locator)):
132 error_via_keep = self._populate_from_keep()
133 if self._manifest_text is None:
134 error_via_api = self._populate_from_api_server()
135 if error_via_api is not None and not should_try_keep:
137 if ((self._manifest_text is None) and
138 not error_via_keep and
140 # Looks like a keep locator, and we didn't already try keep above
141 error_via_keep = self._populate_from_keep()
142 if self._manifest_text is None:
144 raise arvados.errors.NotFoundError(
145 ("Failed to retrieve collection '{}' " +
146 "from either API server ({}) or Keep ({})."
148 self._manifest_locator,
151 self._streams = [sline.split()
152 for sline in self._manifest_text.split("\n")
156 def _populate_first(orig_func):
157 # Decorator for methods that read actual Collection data.
158 @functools.wraps(orig_func)
159 def wrapper(self, *args, **kwargs):
160 if self._streams is None:
162 return orig_func(self, *args, **kwargs)
166 def api_response(self):
167 """api_response() -> dict or None
169 Returns information about this Collection fetched from the API server.
170 If the Collection exists in Keep but not the API server, currently
171 returns None. Future versions may provide a synthetic response.
173 return self._api_response
179 for s in self.all_streams():
180 for f in s.all_files():
181 streamname, filename = split(s.name() + "/" + f.name())
182 if streamname not in streams:
183 streams[streamname] = {}
184 if filename not in streams[streamname]:
185 streams[streamname][filename] = []
187 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
189 self._streams = [normalize_stream(s, streams[s])
190 for s in sorted(streams)]
192 # Regenerate the manifest text based on the normalized streams
193 self._manifest_text = ''.join(
194 [StreamReader(stream, keep=self._my_keep()).manifest_text()
195 for stream in self._streams])
198 def open(self, streampath, filename=None):
199 """open(streampath[, filename]) -> file-like object
201 Pass in the path of a file to read from the Collection, either as a
202 single string or as two separate stream name and file name arguments.
203 This method returns a file-like object to read that file.
206 streampath, filename = split(streampath)
207 keep_client = self._my_keep()
208 for stream_s in self._streams:
209 stream = StreamReader(stream_s, keep_client,
210 num_retries=self.num_retries)
211 if stream.name() == streampath:
214 raise ValueError("stream '{}' not found in Collection".
217 return stream.files()[filename]
219 raise ValueError("file '{}' not found in Collection stream '{}'".
220 format(filename, streampath))
223 def all_streams(self):
224 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
225 for s in self._streams]
228 for s in self.all_streams():
229 for f in s.all_files():
233 def manifest_text(self, strip=False, normalize=False):
235 cr = CollectionReader(self.manifest_text())
237 return cr.manifest_text(strip=strip, normalize=False)
239 return self.stripped_manifest()
241 return self._manifest_text
244 class _WriterFile(ArvadosFileBase):
245 def __init__(self, coll_writer, name):
246 super(_WriterFile, self).__init__(name, 'wb')
247 self.dest = coll_writer
250 super(_WriterFile, self).close()
251 self.dest.finish_current_file()
253 @ArvadosFileBase._before_close
254 def write(self, data):
255 self.dest.write(data)
257 @ArvadosFileBase._before_close
258 def writelines(self, seq):
262 @ArvadosFileBase._before_close
264 self.dest.flush_data()
267 class CollectionWriter(CollectionBase):
268 def __init__(self, api_client=None, num_retries=0):
269 """Instantiate a CollectionWriter.
271 CollectionWriter lets you build a new Arvados Collection from scratch.
272 Write files to it. The CollectionWriter will upload data to Keep as
273 appropriate, and provide you with the Collection manifest text when
277 * api_client: The API client to use to look up Collections. If not
278 provided, CollectionReader will build one from available Arvados
280 * num_retries: The default number of times to retry failed
281 service requests. Default 0. You may change this value
282 after instantiation, but note those changes may not
283 propagate to related objects like the Keep client.
285 self._api_client = api_client
286 self.num_retries = num_retries
287 self._keep_client = None
288 self._data_buffer = []
289 self._data_buffer_len = 0
290 self._current_stream_files = []
291 self._current_stream_length = 0
292 self._current_stream_locators = []
293 self._current_stream_name = '.'
294 self._current_file_name = None
295 self._current_file_pos = 0
296 self._finished_streams = []
297 self._close_file = None
298 self._queued_file = None
299 self._queued_dirents = deque()
300 self._queued_trees = deque()
301 self._last_open = None
303 def __exit__(self, exc_type, exc_value, traceback):
307 def do_queued_work(self):
308 # The work queue consists of three pieces:
309 # * _queued_file: The file object we're currently writing to the
311 # * _queued_dirents: Entries under the current directory
312 # (_queued_trees[0]) that we want to write or recurse through.
313 # This may contain files from subdirectories if
314 # max_manifest_depth == 0 for this directory.
315 # * _queued_trees: Directories that should be written as separate
316 # streams to the Collection.
317 # This function handles the smallest piece of work currently queued
318 # (current file, then current directory, then next directory) until
319 # no work remains. The _work_THING methods each do a unit of work on
320 # THING. _queue_THING methods add a THING to the work queue.
322 if self._queued_file:
324 elif self._queued_dirents:
326 elif self._queued_trees:
331 def _work_file(self):
333 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
337 self.finish_current_file()
339 self._queued_file.close()
340 self._close_file = None
341 self._queued_file = None
343 def _work_dirents(self):
344 path, stream_name, max_manifest_depth = self._queued_trees[0]
345 if stream_name != self.current_stream_name():
346 self.start_new_stream(stream_name)
347 while self._queued_dirents:
348 dirent = self._queued_dirents.popleft()
349 target = os.path.join(path, dirent)
350 if os.path.isdir(target):
351 self._queue_tree(target,
352 os.path.join(stream_name, dirent),
353 max_manifest_depth - 1)
355 self._queue_file(target, dirent)
357 if not self._queued_dirents:
358 self._queued_trees.popleft()
360 def _work_trees(self):
361 path, stream_name, max_manifest_depth = self._queued_trees[0]
362 d = util.listdir_recursive(
363 path, max_depth = (None if max_manifest_depth == 0 else 0))
365 self._queue_dirents(stream_name, d)
367 self._queued_trees.popleft()
369 def _queue_file(self, source, filename=None):
370 assert (self._queued_file is None), "tried to queue more than one file"
371 if not hasattr(source, 'read'):
372 source = open(source, 'rb')
373 self._close_file = True
375 self._close_file = False
377 filename = os.path.basename(source.name)
378 self.start_new_file(filename)
379 self._queued_file = source
381 def _queue_dirents(self, stream_name, dirents):
382 assert (not self._queued_dirents), "tried to queue more than one tree"
383 self._queued_dirents = deque(sorted(dirents))
385 def _queue_tree(self, path, stream_name, max_manifest_depth):
386 self._queued_trees.append((path, stream_name, max_manifest_depth))
388 def write_file(self, source, filename=None):
389 self._queue_file(source, filename)
390 self.do_queued_work()
392 def write_directory_tree(self,
393 path, stream_name='.', max_manifest_depth=-1):
394 self._queue_tree(path, stream_name, max_manifest_depth)
395 self.do_queued_work()
397 def write(self, newdata):
398 if hasattr(newdata, '__iter__'):
402 self._data_buffer.append(newdata)
403 self._data_buffer_len += len(newdata)
404 self._current_stream_length += len(newdata)
405 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
408 def open(self, streampath, filename=None):
409 """open(streampath[, filename]) -> file-like object
411 Pass in the path of a file to write to the Collection, either as a
412 single string or as two separate stream name and file name arguments.
413 This method returns a file-like object you can write to add it to the
416 You may only have one file object from the Collection open at a time,
417 so be sure to close the object when you're done. Using the object in
418 a with statement makes that easy::
420 with cwriter.open('./doc/page1.txt') as outfile:
421 outfile.write(page1_data)
422 with cwriter.open('./doc/page2.txt') as outfile:
423 outfile.write(page2_data)
426 streampath, filename = split(streampath)
427 if self._last_open and not self._last_open.closed:
428 raise errors.AssertionError(
429 "can't open '{}' when '{}' is still open".format(
430 filename, self._last_open.name))
431 if streampath != self.current_stream_name():
432 self.start_new_stream(streampath)
433 self.set_current_file_name(filename)
434 self._last_open = _WriterFile(self, filename)
435 return self._last_open
437 def flush_data(self):
438 data_buffer = ''.join(self._data_buffer)
440 self._current_stream_locators.append(
441 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
442 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
443 self._data_buffer_len = len(self._data_buffer[0])
445 def start_new_file(self, newfilename=None):
446 self.finish_current_file()
447 self.set_current_file_name(newfilename)
449 def set_current_file_name(self, newfilename):
450 if re.search(r'[\t\n]', newfilename):
451 raise errors.AssertionError(
452 "Manifest filenames cannot contain whitespace: %s" %
454 elif re.search(r'\x00', newfilename):
455 raise errors.AssertionError(
456 "Manifest filenames cannot contain NUL characters: %s" %
458 self._current_file_name = newfilename
460 def current_file_name(self):
461 return self._current_file_name
463 def finish_current_file(self):
464 if self._current_file_name is None:
465 if self._current_file_pos == self._current_stream_length:
467 raise errors.AssertionError(
468 "Cannot finish an unnamed file " +
469 "(%d bytes at offset %d in '%s' stream)" %
470 (self._current_stream_length - self._current_file_pos,
471 self._current_file_pos,
472 self._current_stream_name))
473 self._current_stream_files.append([
474 self._current_file_pos,
475 self._current_stream_length - self._current_file_pos,
476 self._current_file_name])
477 self._current_file_pos = self._current_stream_length
478 self._current_file_name = None
480 def start_new_stream(self, newstreamname='.'):
481 self.finish_current_stream()
482 self.set_current_stream_name(newstreamname)
484 def set_current_stream_name(self, newstreamname):
485 if re.search(r'[\t\n]', newstreamname):
486 raise errors.AssertionError(
487 "Manifest stream names cannot contain whitespace")
488 self._current_stream_name = '.' if newstreamname=='' else newstreamname
490 def current_stream_name(self):
491 return self._current_stream_name
493 def finish_current_stream(self):
494 self.finish_current_file()
496 if not self._current_stream_files:
498 elif self._current_stream_name is None:
499 raise errors.AssertionError(
500 "Cannot finish an unnamed stream (%d bytes in %d files)" %
501 (self._current_stream_length, len(self._current_stream_files)))
503 if not self._current_stream_locators:
504 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
505 self._finished_streams.append([self._current_stream_name,
506 self._current_stream_locators,
507 self._current_stream_files])
508 self._current_stream_files = []
509 self._current_stream_length = 0
510 self._current_stream_locators = []
511 self._current_stream_name = None
512 self._current_file_pos = 0
513 self._current_file_name = None
516 # Store the manifest in Keep and return its locator.
517 return self._my_keep().put(self.manifest_text())
519 def portable_data_hash(self):
520 stripped = self.stripped_manifest()
521 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
523 def manifest_text(self):
524 self.finish_current_stream()
527 for stream in self._finished_streams:
528 if not re.search(r'^\.(/.*)?$', stream[0]):
530 manifest += stream[0].replace(' ', '\\040')
531 manifest += ' ' + ' '.join(stream[1])
532 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
537 def data_locators(self):
539 for name, locators, files in self._finished_streams:
544 class ResumableCollectionWriter(CollectionWriter):
545 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
546 '_current_stream_locators', '_current_stream_name',
547 '_current_file_name', '_current_file_pos', '_close_file',
548 '_data_buffer', '_dependencies', '_finished_streams',
549 '_queued_dirents', '_queued_trees']
551 def __init__(self, api_client=None, num_retries=0):
552 self._dependencies = {}
553 super(ResumableCollectionWriter, self).__init__(
554 api_client, num_retries=num_retries)
557 def from_state(cls, state, *init_args, **init_kwargs):
558 # Try to build a new writer from scratch with the given state.
559 # If the state is not suitable to resume (because files have changed,
560 # been deleted, aren't predictable, etc.), raise a
561 # StaleWriterStateError. Otherwise, return the initialized writer.
562 # The caller is responsible for calling writer.do_queued_work()
563 # appropriately after it's returned.
564 writer = cls(*init_args, **init_kwargs)
565 for attr_name in cls.STATE_PROPS:
566 attr_value = state[attr_name]
567 attr_class = getattr(writer, attr_name).__class__
568 # Coerce the value into the same type as the initial value, if
570 if attr_class not in (type(None), attr_value.__class__):
571 attr_value = attr_class(attr_value)
572 setattr(writer, attr_name, attr_value)
573 # Check dependencies before we try to resume anything.
574 if any(KeepLocator(ls).permission_expired()
575 for ls in writer._current_stream_locators):
576 raise errors.StaleWriterStateError(
577 "locators include expired permission hint")
578 writer.check_dependencies()
579 if state['_current_file'] is not None:
580 path, pos = state['_current_file']
582 writer._queued_file = open(path, 'rb')
583 writer._queued_file.seek(pos)
584 except IOError as error:
585 raise errors.StaleWriterStateError(
586 "failed to reopen active file {}: {}".format(path, error))
589 def check_dependencies(self):
590 for path, orig_stat in self._dependencies.items():
591 if not S_ISREG(orig_stat[ST_MODE]):
592 raise errors.StaleWriterStateError("{} not file".format(path))
594 now_stat = tuple(os.stat(path))
595 except OSError as error:
596 raise errors.StaleWriterStateError(
597 "failed to stat {}: {}".format(path, error))
598 if ((not S_ISREG(now_stat[ST_MODE])) or
599 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
600 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
601 raise errors.StaleWriterStateError("{} changed".format(path))
603 def dump_state(self, copy_func=lambda x: x):
604 state = {attr: copy_func(getattr(self, attr))
605 for attr in self.STATE_PROPS}
606 if self._queued_file is None:
607 state['_current_file'] = None
609 state['_current_file'] = (os.path.realpath(self._queued_file.name),
610 self._queued_file.tell())
613 def _queue_file(self, source, filename=None):
615 src_path = os.path.realpath(source)
617 raise errors.AssertionError("{} not a file path".format(source))
619 path_stat = os.stat(src_path)
620 except OSError as stat_error:
622 super(ResumableCollectionWriter, self)._queue_file(source, filename)
623 fd_stat = os.fstat(self._queued_file.fileno())
624 if not S_ISREG(fd_stat.st_mode):
625 # We won't be able to resume from this cache anyway, so don't
626 # worry about further checks.
627 self._dependencies[source] = tuple(fd_stat)
628 elif path_stat is None:
629 raise errors.AssertionError(
630 "could not stat {}: {}".format(source, stat_error))
631 elif path_stat.st_ino != fd_stat.st_ino:
632 raise errors.AssertionError(
633 "{} changed between open and stat calls".format(source))
635 self._dependencies[src_path] = tuple(fd_stat)
637 def write(self, data):
638 if self._queued_file is None:
639 raise errors.AssertionError(
640 "resumable writer can't accept unsourced data")
641 return super(ResumableCollectionWriter, self).write(data)
644 class SynchronizedCollectionBase(CollectionBase):
649 def __init__(self, parent=None):
654 raise NotImplementedError()
657 raise NotImplementedError()
659 def _my_block_manager(self):
660 raise NotImplementedError()
662 def _root_lock(self):
663 raise NotImplementedError()
666 raise NotImplementedError()
668 def _sync_mode(self):
669 raise NotImplementedError()
672 def _populate_first(orig_func):
673 # Decorator for methods that read actual Collection data.
674 @functools.wraps(orig_func)
675 def wrapper(self, *args, **kwargs):
676 if self._items is None:
678 return orig_func(self, *args, **kwargs)
681 @arvfile._synchronized
683 def find(self, path, create=False, create_collection=False):
684 """Recursively search the specified file path. May return either a Collection
688 If true, create path components (i.e. Collections) that are
689 missing. If "create" is False, return None if a path component is
693 If the path is not found, "create" is True, and
694 "create_collection" is False, then create and return a new
695 ArvadosFile for the last path component. If "create_collection" is
696 True, then create and return a new Collection for the last path
700 if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
701 raise IOError((errno.EROFS, "Collection is read only"))
708 item = self._items.get(p[0])
710 # item must be a file
711 if item is None and create:
713 if create_collection:
714 item = Subcollection(self)
716 item = ArvadosFile(self)
717 self._items[p[0]] = item
720 if item is None and create:
721 # create new collection
722 item = Subcollection(self)
723 self._items[p[0]] = item
725 return item.find("/".join(p), create=create)
729 def open(self, path, mode):
730 """Open a file-like object for access.
733 path to a file in the collection
735 one of "r", "r+", "w", "w+", "a", "a+"
739 opens for reading and writing. Reads/writes share a file pointer.
741 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
743 opens for reading and writing. All writes are appended to
744 the end of the file. Writing does not affect the file pointer for
747 mode = mode.replace("b", "")
748 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
749 raise ArgumentError("Bad mode '%s'" % mode)
750 create = (mode != "r")
752 if create and self._sync_mode() == SynchronizedCollectionBase.SYNC_READONLY:
753 raise IOError((errno.EROFS, "Collection is read only"))
755 f = self.find(path, create=create)
757 raise IOError((errno.ENOENT, "File not found"))
758 if not isinstance(f, ArvadosFile):
759 raise IOError((errno.EISDIR, "Path must refer to a file."))
765 return ArvadosFileReader(f, path, mode)
767 return ArvadosFileWriter(f, path, mode)
769 @arvfile._synchronized
772 """Test if the collection (or any subcollection or file) has been modified
773 since it was created."""
774 for k,v in self._items.items():
779 @arvfile._synchronized
781 def set_unmodified(self):
782 """Recursively clear modified flag"""
783 for k,v in self._items.items():
786 @arvfile._synchronized
789 """Iterate over names of files and collections contained in this collection."""
790 return self._items.keys()
792 @arvfile._synchronized
795 """Iterate over names of files and collections directly contained in this collection."""
796 return self._items.keys()
798 @arvfile._synchronized
800 def __getitem__(self, k):
801 """Get a file or collection that is directly contained by this collection. If
802 you want to search a path, use `find()` instead.
804 return self._items[k]
806 @arvfile._synchronized
808 def __contains__(self, k):
809 """If there is a file or collection a directly contained by this collection
811 return k in self._items
813 @arvfile._synchronized
816 """Get the number of items directly contained in this collection"""
817 return len(self._items)
820 @arvfile._synchronized
822 def __delitem__(self, p):
823 """Delete an item by name which is directly contained by this collection."""
826 @arvfile._synchronized
829 """Get a list of names of files and collections directly contained in this collection."""
830 return self._items.keys()
832 @arvfile._synchronized
835 """Get a list of files and collection objects directly contained in this collection."""
836 return self._items.values()
838 @arvfile._synchronized
841 """Get a list of (name, object) tuples directly contained in this collection."""
842 return self._items.items()
844 def exists(self, path):
845 """Test if there is a file or collection at "path" """
846 return self.find(path) != None
849 @arvfile._synchronized
851 def remove(self, path, rm_r=False):
852 """Remove the file or subcollection (directory) at `path`.
854 Specify whether to remove non-empty subcollections (True), or raise an error (False).
858 # Remove '.' from the front of the path
862 item = self._items.get(p[0])
864 raise IOError((errno.ENOENT, "File not found"))
866 if isinstance(SynchronizedCollection, self._items[p[0]]) and len(self._items[p[0]]) > 0 and not rm_r:
867 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
868 del self._items[p[0]]
871 item.remove("/".join(p))
873 raise IOError((errno.ENOENT, "File not found"))
875 def _cloneinto(self, target):
876 for k,v in self._items:
877 target._items[k] = v.clone(new_parent=target)
880 raise NotImplementedError()
883 @arvfile._synchronized
885 def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
887 copyto('/foo', '/bar') will overwrite 'foo' if it exists.
888 copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
890 if source_collection is None:
891 source_collection = self
893 # Find the object to copy
894 sp = source_path.split("/")
895 source_obj = source_collection.find(source_path)
896 if source_obj is None:
897 raise IOError((errno.ENOENT, "File not found"))
899 # Find parent collection the target path
900 tp = target_path.split("/")
901 target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
903 # Determine the name to use.
904 target_name = tp[-1] if tp[-1] else sp[-1]
906 if target_name in target_dir and not overwrite:
907 raise IOError((errno.EEXIST, "File already exists"))
909 # Actually make the copy.
910 target_dir[target_name]._items = source_obj.clone(target_dir)
912 @arvfile._synchronized
914 def manifest_text(self, strip=False, normalize=False):
915 """Get the manifest text for this collection, sub collections and files.
918 If True, remove signing tokens from block locators if present.
919 If False, block locators are left unchanged.
922 If True, always export the manifest text in normalized form
923 even if the Collection is not modified. If False and the collection
924 is not modified, return the original manifest text even if it is not
928 if self.modified() or self._manifest_text is None or normalize:
929 return export_manifest(self, stream_name=".", portable_locators=strip)
932 return self.stripped_manifest()
934 return self._manifest_text
936 def portable_data_hash(self):
937 """Get the portable data hash for this collection's manifest."""
938 stripped = self.manifest_text(strip=True)
939 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
942 class Collection(SynchronizedCollectionBase):
943 """Store an Arvados collection, consisting of a set of files and
947 def __init__(self, manifest_locator_or_text=None,
954 sync=Collection.SYNC_READONLY):
955 """:manifest_locator_or_text:
956 One of Arvados collection UUID, block locator of
957 a manifest, raw manifest text, or None (to create an empty collection).
959 the parent Collection, may be None.
961 the arvados configuration to get the hostname and api token.
962 Prefer this over supplying your own api_client and keep_client (except in testing).
963 Will use default config settings if not specified.
965 The API client object to use for requests. If not specified, create one using `config`.
967 the Keep client to use for requests. If not specified, create one using `config`.
969 the number of retries for API and Keep requests.
971 the block manager to use. If not specified, create one.
973 Set synchronization policy with API server collection record.
975 Collection is read only. No synchronization. This mode will
976 also forego locking, which gives better performance.
978 Synchronize on explicit request via `merge()` or `save()`
980 Synchronize with server in response to background websocket events,
981 on block write, or on file close.
987 self._api_client = api_client
988 self._keep_client = keep_client
989 self._block_manager = block_manager
990 self._config = config
991 self.num_retries = num_retries
992 self._manifest_locator = None
993 self._manifest_text = None
994 self._api_response = None
996 self.lock = threading.RLock()
998 if manifest_locator_or_text:
999 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1000 self._manifest_locator = manifest_locator_or_text
1001 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1002 self._manifest_locator = manifest_locator_or_text
1003 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1004 self._manifest_text = manifest_locator_or_text
1006 raise errors.ArgumentError(
1007 "Argument to CollectionReader must be a manifest or a collection UUID")
1009 def _root_lock(self):
1012 def sync_mode(self):
1015 @arvfile._synchronized
1017 if self._api_client is None:
1018 self._api_client = arvados.api.SafeApi(self._config)
1019 self._keep_client = self._api_client.keep
1020 return self._api_client
1022 @arvfile._synchronized
1024 if self._keep_client is None:
1025 if self._api_client is None:
1028 self._keep_client = KeepClient(api=self._api_client)
1029 return self._keep_client
1031 @arvfile._synchronized
1032 def _my_block_manager(self):
1033 if self._block_manager is None:
1034 self._block_manager = BlockManager(self._my_keep())
1035 return self._block_manager
1037 def _populate_from_api_server(self):
1038 # As in KeepClient itself, we must wait until the last
1039 # possible moment to instantiate an API client, in order to
1040 # avoid tripping up clients that don't have access to an API
1041 # server. If we do build one, make sure our Keep client uses
1042 # it. If instantiation fails, we'll fall back to the except
1043 # clause, just like any other Collection lookup
1044 # failure. Return an exception, or None if successful.
1046 self._api_response = self._my_api().collections().get(
1047 uuid=self._manifest_locator).execute(
1048 num_retries=self.num_retries)
1049 self._manifest_text = self._api_response['manifest_text']
1051 except Exception as e:
1054 def _populate_from_keep(self):
1055 # Retrieve a manifest directly from Keep. This has a chance of
1056 # working if [a] the locator includes a permission signature
1057 # or [b] the Keep services are operating in world-readable
1058 # mode. Return an exception, or None if successful.
1060 self._manifest_text = self._my_keep().get(
1061 self._manifest_locator, num_retries=self.num_retries)
1062 except Exception as e:
1065 def _populate(self):
1067 if self._manifest_locator is None and self._manifest_text is None:
1069 error_via_api = None
1070 error_via_keep = None
1071 should_try_keep = ((self._manifest_text is None) and
1072 util.keep_locator_pattern.match(
1073 self._manifest_locator))
1074 if ((self._manifest_text is None) and
1075 util.signed_locator_pattern.match(self._manifest_locator)):
1076 error_via_keep = self._populate_from_keep()
1077 if self._manifest_text is None:
1078 error_via_api = self._populate_from_api_server()
1079 if error_via_api is not None and not should_try_keep:
1081 if ((self._manifest_text is None) and
1082 not error_via_keep and
1084 # Looks like a keep locator, and we didn't already try keep above
1085 error_via_keep = self._populate_from_keep()
1086 if self._manifest_text is None:
1088 raise arvados.errors.NotFoundError(
1089 ("Failed to retrieve collection '{}' " +
1090 "from either API server ({}) or Keep ({})."
1092 self._manifest_locator,
1096 import_manifest(self._manifest_text, self)
1098 if self._sync == SYNC_READONLY:
1099 # Now that we're populated, knowing that this will be readonly,
1100 # forego any further locking.
1101 self.lock = NoopLock()
1103 def __enter__(self):
1106 def __exit__(self, exc_type, exc_value, traceback):
1107 """Support scoped auto-commit in a with: block"""
1108 self.save(allow_no_locator=True)
1109 if self._block_manager is not None:
1110 self._block_manager.stop_threads()
1112 @arvfile._synchronized
1114 def clone(self, new_parent=None, new_sync=Collection.SYNC_READONLY, new_config=self.config):
1115 c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1116 if new_sync == Collection.SYNC_READONLY:
1122 @arvfile._synchronized
1124 def api_response(self):
1126 api_response() -> dict or None
1128 Returns information about this Collection fetched from the API server.
1129 If the Collection exists in Keep but not the API server, currently
1130 returns None. Future versions may provide a synthetic response.
1132 return self._api_response
1135 @arvfile._synchronized
1137 def save(self, allow_no_locator=False):
1138 """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1139 update the collection record to Keep.
1142 If there is no collection uuid associated with this
1143 Collection and `allow_no_locator` is False, raise an error. If True,
1144 do not raise an error.
1147 self._my_block_manager().commit_all()
1148 self._my_keep().put(self.manifest_text(strip=True))
1149 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1150 self._api_response = self._my_api().collections().update(
1151 uuid=self._manifest_locator,
1152 body={'manifest_text': self.manifest_text(strip=False)}
1154 num_retries=self.num_retries)
1155 elif not allow_no_locator:
1156 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1157 self.set_unmodified()
1160 @arvfile._synchronized
1162 def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1163 """Save a new collection record.
1166 The collection name.
1169 the user, or project uuid that will own this collection.
1170 If None, defaults to the current user.
1172 :ensure_unique_name:
1173 If True, ask the API server to rename the collection
1174 if it conflicts with a collection with the same name and owner. If
1175 False, a name conflict will result in an error.
1178 self._my_block_manager().commit_all()
1179 self._my_keep().put(self.manifest_text(strip=True))
1180 body = {"manifest_text": self.manifest_text(strip=False),
1183 body["owner_uuid"] = owner_uuid
1184 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1185 self._manifest_locator = self._api_response["uuid"]
1186 self.set_unmodified()
1189 class Subcollection(SynchronizedCollectionBase):
1190 """This is a subdirectory within a collection that doesn't have its own API
1191 server record. It falls under the umbrella of the root collection."""
1193 def __init__(self, parent):
1194 super(Subcollection, self).__init__(parent)
1195 self.lock = parent._root_lock()
1198 return self.parent._root_lock()
1200 def sync_mode(self):
1201 return self.parent.sync_mode()
1204 return self.parent._my_api()
1207 return self.parent._my_keep()
1209 def _my_block_manager(self):
1210 return self.parent._my_block_manager()
1212 def _populate(self):
1213 self.parent._populate()
1215 @arvfile._synchronized
1217 def clone(self, new_parent):
1218 c = Subcollection(new_parent)
1223 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
1224 """Import a manifest into a `Collection`.
1227 The manifest text to import from.
1230 The `Collection` that will be initialized (must be empty).
1231 If None, create a new `Collection` object.
1234 The API client object that will be used when creating a new `Collection` object.
1237 The keep client object that will be used when creating a new `Collection` object.
1240 the default number of api client and keep retries on error.
1242 if into_collection is not None:
1243 if len(into_collection) > 0:
1244 raise ArgumentError("Can only import manifest into an empty collection")
1247 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
1256 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1260 if state == STREAM_NAME:
1261 # starting a new stream
1262 stream_name = tok.replace('\\040', ' ')
1270 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1272 blocksize = long(s.group(1))
1273 blocks.append(Range(tok, streamoffset, blocksize))
1274 streamoffset += blocksize
1278 if state == SEGMENTS:
1279 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1281 pos = long(s.group(1))
1282 size = long(s.group(2))
1283 name = s.group(3).replace('\\040', ' ')
1284 f = c.find("%s/%s" % (stream_name, name), create=True)
1285 f.add_segment(blocks, pos, size)
1288 raise errors.SyntaxError("Invalid manifest format")
1297 def export_manifest(item, stream_name=".", portable_locators=False):
1300 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1301 `item` is a is a `Collection`, this will also export subcollections.
1304 the name of the stream when exporting `item`.
1307 If True, strip any permission hints on block locators.
1308 If False, use block locators as-is.
1311 if isinstance(item, SynchronizedCollectionBase):
1313 sorted_keys = sorted(item.keys())
1314 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1317 for s in v.segments:
1319 if loc.startswith("bufferblock"):
1320 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1321 if portable_locators:
1322 loc = KeepLocator(loc).stripped()
1323 st.append(LocatorAndRange(loc, locator_block_size(loc),
1324 s.segment_offset, s.range_size))
1327 buf += ' '.join(normalize_stream(stream_name, stream))
1329 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1330 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1331 elif isinstance(item, ArvadosFile):
1333 for s in item.segments:
1335 if loc.startswith("bufferblock"):
1336 loc = item._bufferblocks[loc].calculate_locator()
1337 if portable_locators:
1338 loc = KeepLocator(loc).stripped()
1339 st.append(LocatorAndRange(loc, locator_block_size(loc),
1340 s.segment_offset, s.range_size))
1341 stream[stream_name] = st
1342 buf += ' '.join(normalize_stream(stream_name, stream))