8 from collections import deque
11 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager, _synchronized, _must_be_writable, SYNC_READONLY, SYNC_EXPLICIT, SYNC_LIVE, NoopLock
13 from .stream import StreamReader, normalize_stream, locator_block_size
14 from .ranges import Range, LocatorAndRange
20 _logger = logging.getLogger('arvados.collection')
22 class CollectionBase(object):
26 def __exit__(self, exc_type, exc_value, traceback):
30 if self._keep_client is None:
31 self._keep_client = KeepClient(api_client=self._api_client,
32 num_retries=self.num_retries)
33 return self._keep_client
35 def stripped_manifest(self):
37 Return the manifest for the current collection with all
38 non-portable hints (i.e., permission signatures and other
39 hints other than size hints) removed from the locators.
41 raw = self.manifest_text()
43 for line in raw.split("\n"):
46 clean_fields = fields[:1] + [
47 (re.sub(r'\+[^\d][^\+]*', '', x)
48 if re.match(util.keep_locator_pattern, x)
51 clean += [' '.join(clean_fields), "\n"]
55 class CollectionReader(CollectionBase):
56 def __init__(self, manifest_locator_or_text, api_client=None,
57 keep_client=None, num_retries=0):
58 """Instantiate a CollectionReader.
60 This class parses Collection manifests to provide a simple interface
61 to read its underlying files.
64 * manifest_locator_or_text: One of a Collection UUID, portable data
65 hash, or full manifest text.
66 * api_client: The API client to use to look up Collections. If not
67 provided, CollectionReader will build one from available Arvados
69 * keep_client: The KeepClient to use to download Collection data.
70 If not provided, CollectionReader will build one from available
71 Arvados configuration.
72 * num_retries: The default number of times to retry failed
73 service requests. Default 0. You may change this value
74 after instantiation, but note those changes may not
75 propagate to related objects like the Keep client.
77 self._api_client = api_client
78 self._keep_client = keep_client
79 self.num_retries = num_retries
80 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
81 self._manifest_locator = manifest_locator_or_text
82 self._manifest_text = None
83 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
84 self._manifest_locator = manifest_locator_or_text
85 self._manifest_text = None
86 elif re.match(util.manifest_pattern, manifest_locator_or_text):
87 self._manifest_text = manifest_locator_or_text
88 self._manifest_locator = None
90 raise errors.ArgumentError(
91 "Argument to CollectionReader must be a manifest or a collection UUID")
92 self._api_response = None
95 def _populate_from_api_server(self):
96 # As in KeepClient itself, we must wait until the last
97 # possible moment to instantiate an API client, in order to
98 # avoid tripping up clients that don't have access to an API
99 # server. If we do build one, make sure our Keep client uses
100 # it. If instantiation fails, we'll fall back to the except
101 # clause, just like any other Collection lookup
102 # failure. Return an exception, or None if successful.
104 if self._api_client is None:
105 self._api_client = arvados.api('v1')
106 self._keep_client = None # Make a new one with the new api.
107 self._api_response = self._api_client.collections().get(
108 uuid=self._manifest_locator).execute(
109 num_retries=self.num_retries)
110 self._manifest_text = self._api_response['manifest_text']
112 except Exception as e:
115 def _populate_from_keep(self):
116 # Retrieve a manifest directly from Keep. This has a chance of
117 # working if [a] the locator includes a permission signature
118 # or [b] the Keep services are operating in world-readable
119 # mode. Return an exception, or None if successful.
121 self._manifest_text = self._my_keep().get(
122 self._manifest_locator, num_retries=self.num_retries)
123 except Exception as e:
128 error_via_keep = None
129 should_try_keep = ((self._manifest_text is None) and
130 util.keep_locator_pattern.match(
131 self._manifest_locator))
132 if ((self._manifest_text is None) and
133 util.signed_locator_pattern.match(self._manifest_locator)):
134 error_via_keep = self._populate_from_keep()
135 if self._manifest_text is None:
136 error_via_api = self._populate_from_api_server()
137 if error_via_api is not None and not should_try_keep:
139 if ((self._manifest_text is None) and
140 not error_via_keep and
142 # Looks like a keep locator, and we didn't already try keep above
143 error_via_keep = self._populate_from_keep()
144 if self._manifest_text is None:
146 raise arvados.errors.NotFoundError(
147 ("Failed to retrieve collection '{}' " +
148 "from either API server ({}) or Keep ({})."
150 self._manifest_locator,
153 self._streams = [sline.split()
154 for sline in self._manifest_text.split("\n")
157 def _populate_first(orig_func):
158 # Decorator for methods that read actual Collection data.
159 @functools.wraps(orig_func)
160 def wrapper(self, *args, **kwargs):
161 if self._streams is None:
163 return orig_func(self, *args, **kwargs)
167 def api_response(self):
168 """api_response() -> dict or None
170 Returns information about this Collection fetched from the API server.
171 If the Collection exists in Keep but not the API server, currently
172 returns None. Future versions may provide a synthetic response.
174 return self._api_response
180 for s in self.all_streams():
181 for f in s.all_files():
182 streamname, filename = split(s.name() + "/" + f.name())
183 if streamname not in streams:
184 streams[streamname] = {}
185 if filename not in streams[streamname]:
186 streams[streamname][filename] = []
188 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
190 self._streams = [normalize_stream(s, streams[s])
191 for s in sorted(streams)]
193 # Regenerate the manifest text based on the normalized streams
194 self._manifest_text = ''.join(
195 [StreamReader(stream, keep=self._my_keep()).manifest_text()
196 for stream in self._streams])
199 def open(self, streampath, filename=None):
200 """open(streampath[, filename]) -> file-like object
202 Pass in the path of a file to read from the Collection, either as a
203 single string or as two separate stream name and file name arguments.
204 This method returns a file-like object to read that file.
207 streampath, filename = split(streampath)
208 keep_client = self._my_keep()
209 for stream_s in self._streams:
210 stream = StreamReader(stream_s, keep_client,
211 num_retries=self.num_retries)
212 if stream.name() == streampath:
215 raise ValueError("stream '{}' not found in Collection".
218 return stream.files()[filename]
220 raise ValueError("file '{}' not found in Collection stream '{}'".
221 format(filename, streampath))
224 def all_streams(self):
225 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
226 for s in self._streams]
229 for s in self.all_streams():
230 for f in s.all_files():
234 def manifest_text(self, strip=False, normalize=False):
236 cr = CollectionReader(self.manifest_text())
238 return cr.manifest_text(strip=strip, normalize=False)
240 return self.stripped_manifest()
242 return self._manifest_text
245 class _WriterFile(ArvadosFileBase):
246 def __init__(self, coll_writer, name):
247 super(_WriterFile, self).__init__(name, 'wb')
248 self.dest = coll_writer
251 super(_WriterFile, self).close()
252 self.dest.finish_current_file()
254 @ArvadosFileBase._before_close
255 def write(self, data):
256 self.dest.write(data)
258 @ArvadosFileBase._before_close
259 def writelines(self, seq):
263 @ArvadosFileBase._before_close
265 self.dest.flush_data()
268 class CollectionWriter(CollectionBase):
269 def __init__(self, api_client=None, num_retries=0):
270 """Instantiate a CollectionWriter.
272 CollectionWriter lets you build a new Arvados Collection from scratch.
273 Write files to it. The CollectionWriter will upload data to Keep as
274 appropriate, and provide you with the Collection manifest text when
278 * api_client: The API client to use to look up Collections. If not
279 provided, CollectionReader will build one from available Arvados
281 * num_retries: The default number of times to retry failed
282 service requests. Default 0. You may change this value
283 after instantiation, but note those changes may not
284 propagate to related objects like the Keep client.
286 self._api_client = api_client
287 self.num_retries = num_retries
288 self._keep_client = None
289 self._data_buffer = []
290 self._data_buffer_len = 0
291 self._current_stream_files = []
292 self._current_stream_length = 0
293 self._current_stream_locators = []
294 self._current_stream_name = '.'
295 self._current_file_name = None
296 self._current_file_pos = 0
297 self._finished_streams = []
298 self._close_file = None
299 self._queued_file = None
300 self._queued_dirents = deque()
301 self._queued_trees = deque()
302 self._last_open = None
304 def __exit__(self, exc_type, exc_value, traceback):
308 def do_queued_work(self):
309 # The work queue consists of three pieces:
310 # * _queued_file: The file object we're currently writing to the
312 # * _queued_dirents: Entries under the current directory
313 # (_queued_trees[0]) that we want to write or recurse through.
314 # This may contain files from subdirectories if
315 # max_manifest_depth == 0 for this directory.
316 # * _queued_trees: Directories that should be written as separate
317 # streams to the Collection.
318 # This function handles the smallest piece of work currently queued
319 # (current file, then current directory, then next directory) until
320 # no work remains. The _work_THING methods each do a unit of work on
321 # THING. _queue_THING methods add a THING to the work queue.
323 if self._queued_file:
325 elif self._queued_dirents:
327 elif self._queued_trees:
332 def _work_file(self):
334 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
338 self.finish_current_file()
340 self._queued_file.close()
341 self._close_file = None
342 self._queued_file = None
344 def _work_dirents(self):
345 path, stream_name, max_manifest_depth = self._queued_trees[0]
346 if stream_name != self.current_stream_name():
347 self.start_new_stream(stream_name)
348 while self._queued_dirents:
349 dirent = self._queued_dirents.popleft()
350 target = os.path.join(path, dirent)
351 if os.path.isdir(target):
352 self._queue_tree(target,
353 os.path.join(stream_name, dirent),
354 max_manifest_depth - 1)
356 self._queue_file(target, dirent)
358 if not self._queued_dirents:
359 self._queued_trees.popleft()
361 def _work_trees(self):
362 path, stream_name, max_manifest_depth = self._queued_trees[0]
363 d = util.listdir_recursive(
364 path, max_depth = (None if max_manifest_depth == 0 else 0))
366 self._queue_dirents(stream_name, d)
368 self._queued_trees.popleft()
370 def _queue_file(self, source, filename=None):
371 assert (self._queued_file is None), "tried to queue more than one file"
372 if not hasattr(source, 'read'):
373 source = open(source, 'rb')
374 self._close_file = True
376 self._close_file = False
378 filename = os.path.basename(source.name)
379 self.start_new_file(filename)
380 self._queued_file = source
382 def _queue_dirents(self, stream_name, dirents):
383 assert (not self._queued_dirents), "tried to queue more than one tree"
384 self._queued_dirents = deque(sorted(dirents))
386 def _queue_tree(self, path, stream_name, max_manifest_depth):
387 self._queued_trees.append((path, stream_name, max_manifest_depth))
389 def write_file(self, source, filename=None):
390 self._queue_file(source, filename)
391 self.do_queued_work()
393 def write_directory_tree(self,
394 path, stream_name='.', max_manifest_depth=-1):
395 self._queue_tree(path, stream_name, max_manifest_depth)
396 self.do_queued_work()
398 def write(self, newdata):
399 if hasattr(newdata, '__iter__'):
403 self._data_buffer.append(newdata)
404 self._data_buffer_len += len(newdata)
405 self._current_stream_length += len(newdata)
406 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
409 def open(self, streampath, filename=None):
410 """open(streampath[, filename]) -> file-like object
412 Pass in the path of a file to write to the Collection, either as a
413 single string or as two separate stream name and file name arguments.
414 This method returns a file-like object you can write to add it to the
417 You may only have one file object from the Collection open at a time,
418 so be sure to close the object when you're done. Using the object in
419 a with statement makes that easy::
421 with cwriter.open('./doc/page1.txt') as outfile:
422 outfile.write(page1_data)
423 with cwriter.open('./doc/page2.txt') as outfile:
424 outfile.write(page2_data)
427 streampath, filename = split(streampath)
428 if self._last_open and not self._last_open.closed:
429 raise errors.AssertionError(
430 "can't open '{}' when '{}' is still open".format(
431 filename, self._last_open.name))
432 if streampath != self.current_stream_name():
433 self.start_new_stream(streampath)
434 self.set_current_file_name(filename)
435 self._last_open = _WriterFile(self, filename)
436 return self._last_open
438 def flush_data(self):
439 data_buffer = ''.join(self._data_buffer)
441 self._current_stream_locators.append(
442 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
443 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
444 self._data_buffer_len = len(self._data_buffer[0])
446 def start_new_file(self, newfilename=None):
447 self.finish_current_file()
448 self.set_current_file_name(newfilename)
450 def set_current_file_name(self, newfilename):
451 if re.search(r'[\t\n]', newfilename):
452 raise errors.AssertionError(
453 "Manifest filenames cannot contain whitespace: %s" %
455 elif re.search(r'\x00', newfilename):
456 raise errors.AssertionError(
457 "Manifest filenames cannot contain NUL characters: %s" %
459 self._current_file_name = newfilename
461 def current_file_name(self):
462 return self._current_file_name
464 def finish_current_file(self):
465 if self._current_file_name is None:
466 if self._current_file_pos == self._current_stream_length:
468 raise errors.AssertionError(
469 "Cannot finish an unnamed file " +
470 "(%d bytes at offset %d in '%s' stream)" %
471 (self._current_stream_length - self._current_file_pos,
472 self._current_file_pos,
473 self._current_stream_name))
474 self._current_stream_files.append([
475 self._current_file_pos,
476 self._current_stream_length - self._current_file_pos,
477 self._current_file_name])
478 self._current_file_pos = self._current_stream_length
479 self._current_file_name = None
481 def start_new_stream(self, newstreamname='.'):
482 self.finish_current_stream()
483 self.set_current_stream_name(newstreamname)
485 def set_current_stream_name(self, newstreamname):
486 if re.search(r'[\t\n]', newstreamname):
487 raise errors.AssertionError(
488 "Manifest stream names cannot contain whitespace")
489 self._current_stream_name = '.' if newstreamname=='' else newstreamname
491 def current_stream_name(self):
492 return self._current_stream_name
494 def finish_current_stream(self):
495 self.finish_current_file()
497 if not self._current_stream_files:
499 elif self._current_stream_name is None:
500 raise errors.AssertionError(
501 "Cannot finish an unnamed stream (%d bytes in %d files)" %
502 (self._current_stream_length, len(self._current_stream_files)))
504 if not self._current_stream_locators:
505 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
506 self._finished_streams.append([self._current_stream_name,
507 self._current_stream_locators,
508 self._current_stream_files])
509 self._current_stream_files = []
510 self._current_stream_length = 0
511 self._current_stream_locators = []
512 self._current_stream_name = None
513 self._current_file_pos = 0
514 self._current_file_name = None
517 # Store the manifest in Keep and return its locator.
518 return self._my_keep().put(self.manifest_text())
520 def portable_data_hash(self):
521 stripped = self.stripped_manifest()
522 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
524 def manifest_text(self):
525 self.finish_current_stream()
528 for stream in self._finished_streams:
529 if not re.search(r'^\.(/.*)?$', stream[0]):
531 manifest += stream[0].replace(' ', '\\040')
532 manifest += ' ' + ' '.join(stream[1])
533 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
538 def data_locators(self):
540 for name, locators, files in self._finished_streams:
545 class ResumableCollectionWriter(CollectionWriter):
546 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
547 '_current_stream_locators', '_current_stream_name',
548 '_current_file_name', '_current_file_pos', '_close_file',
549 '_data_buffer', '_dependencies', '_finished_streams',
550 '_queued_dirents', '_queued_trees']
552 def __init__(self, api_client=None, num_retries=0):
553 self._dependencies = {}
554 super(ResumableCollectionWriter, self).__init__(
555 api_client, num_retries=num_retries)
558 def from_state(cls, state, *init_args, **init_kwargs):
559 # Try to build a new writer from scratch with the given state.
560 # If the state is not suitable to resume (because files have changed,
561 # been deleted, aren't predictable, etc.), raise a
562 # StaleWriterStateError. Otherwise, return the initialized writer.
563 # The caller is responsible for calling writer.do_queued_work()
564 # appropriately after it's returned.
565 writer = cls(*init_args, **init_kwargs)
566 for attr_name in cls.STATE_PROPS:
567 attr_value = state[attr_name]
568 attr_class = getattr(writer, attr_name).__class__
569 # Coerce the value into the same type as the initial value, if
571 if attr_class not in (type(None), attr_value.__class__):
572 attr_value = attr_class(attr_value)
573 setattr(writer, attr_name, attr_value)
574 # Check dependencies before we try to resume anything.
575 if any(KeepLocator(ls).permission_expired()
576 for ls in writer._current_stream_locators):
577 raise errors.StaleWriterStateError(
578 "locators include expired permission hint")
579 writer.check_dependencies()
580 if state['_current_file'] is not None:
581 path, pos = state['_current_file']
583 writer._queued_file = open(path, 'rb')
584 writer._queued_file.seek(pos)
585 except IOError as error:
586 raise errors.StaleWriterStateError(
587 "failed to reopen active file {}: {}".format(path, error))
590 def check_dependencies(self):
591 for path, orig_stat in self._dependencies.items():
592 if not S_ISREG(orig_stat[ST_MODE]):
593 raise errors.StaleWriterStateError("{} not file".format(path))
595 now_stat = tuple(os.stat(path))
596 except OSError as error:
597 raise errors.StaleWriterStateError(
598 "failed to stat {}: {}".format(path, error))
599 if ((not S_ISREG(now_stat[ST_MODE])) or
600 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
601 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
602 raise errors.StaleWriterStateError("{} changed".format(path))
604 def dump_state(self, copy_func=lambda x: x):
605 state = {attr: copy_func(getattr(self, attr))
606 for attr in self.STATE_PROPS}
607 if self._queued_file is None:
608 state['_current_file'] = None
610 state['_current_file'] = (os.path.realpath(self._queued_file.name),
611 self._queued_file.tell())
614 def _queue_file(self, source, filename=None):
616 src_path = os.path.realpath(source)
618 raise errors.AssertionError("{} not a file path".format(source))
620 path_stat = os.stat(src_path)
621 except OSError as stat_error:
623 super(ResumableCollectionWriter, self)._queue_file(source, filename)
624 fd_stat = os.fstat(self._queued_file.fileno())
625 if not S_ISREG(fd_stat.st_mode):
626 # We won't be able to resume from this cache anyway, so don't
627 # worry about further checks.
628 self._dependencies[source] = tuple(fd_stat)
629 elif path_stat is None:
630 raise errors.AssertionError(
631 "could not stat {}: {}".format(source, stat_error))
632 elif path_stat.st_ino != fd_stat.st_ino:
633 raise errors.AssertionError(
634 "{} changed between open and stat calls".format(source))
636 self._dependencies[src_path] = tuple(fd_stat)
638 def write(self, data):
639 if self._queued_file is None:
640 raise errors.AssertionError(
641 "resumable writer can't accept unsourced data")
642 return super(ResumableCollectionWriter, self).write(data)
647 class SynchronizedCollectionBase(CollectionBase):
648 def __init__(self, parent=None):
653 raise NotImplementedError()
656 raise NotImplementedError()
658 def _my_block_manager(self):
659 raise NotImplementedError()
661 def _root_lock(self):
662 raise NotImplementedError()
665 raise NotImplementedError()
668 raise NotImplementedError()
670 def notify(self, collection, event, name, item):
671 raise NotImplementedError()
674 def find(self, path, create=False, create_collection=False):
675 """Recursively search the specified file path. May return either a Collection
679 If true, create path components (i.e. Collections) that are
680 missing. If "create" is False, return None if a path component is
684 If the path is not found, "create" is True, and
685 "create_collection" is False, then create and return a new
686 ArvadosFile for the last path component. If "create_collection" is
687 True, then create and return a new Collection for the last path
691 if create and self.sync_mode() == SYNC_READONLY:
692 raise IOError((errno.EROFS, "Collection is read only"))
699 item = self._items.get(p[0])
701 # item must be a file
702 if item is None and create:
704 if create_collection:
705 item = Subcollection(self)
707 item = ArvadosFile(self)
708 self._items[p[0]] = item
709 self.notify(self, ADD, p[0], item)
712 if item is None and create:
713 # create new collection
714 item = Subcollection(self)
715 self._items[p[0]] = item
716 self.notify(self, ADD, p[0], item)
718 return item.find("/".join(p), create=create)
722 def open(self, path, mode):
723 """Open a file-like object for access.
726 path to a file in the collection
728 one of "r", "r+", "w", "w+", "a", "a+"
732 opens for reading and writing. Reads/writes share a file pointer.
734 truncates to 0 and opens for reading and writing. Reads/writes share a file pointer.
736 opens for reading and writing. All writes are appended to
737 the end of the file. Writing does not affect the file pointer for
740 mode = mode.replace("b", "")
741 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
742 raise ArgumentError("Bad mode '%s'" % mode)
743 create = (mode != "r")
745 if create and self.sync_mode() == SYNC_READONLY:
746 raise IOError((errno.EROFS, "Collection is read only"))
748 f = self.find(path, create=create)
751 raise IOError((errno.ENOENT, "File not found"))
752 if not isinstance(f, ArvadosFile):
753 raise IOError((errno.EISDIR, "Path must refer to a file."))
759 return ArvadosFileReader(f, path, mode, num_retries=self.num_retries)
761 return ArvadosFileWriter(f, path, mode, num_retries=self.num_retries)
765 """Test if the collection (or any subcollection or file) has been modified
766 since it was created."""
767 for k,v in self._items.items():
773 def set_unmodified(self):
774 """Recursively clear modified flag"""
775 for k,v in self._items.items():
780 """Iterate over names of files and collections contained in this collection."""
781 return self._items.keys()
785 """Iterate over names of files and collections directly contained in this collection."""
786 return self._items.keys()
789 def __getitem__(self, k):
790 """Get a file or collection that is directly contained by this collection. If
791 you want to search a path, use `find()` instead.
793 return self._items[k]
796 def __contains__(self, k):
797 """If there is a file or collection a directly contained by this collection
799 return k in self._items
803 """Get the number of items directly contained in this collection"""
804 return len(self._items)
808 def __delitem__(self, p):
809 """Delete an item by name which is directly contained by this collection."""
811 self.notify(self, DEL, p, None)
815 """Get a list of names of files and collections directly contained in this collection."""
816 return self._items.keys()
820 """Get a list of files and collection objects directly contained in this collection."""
821 return self._items.values()
825 """Get a list of (name, object) tuples directly contained in this collection."""
826 return self._items.items()
828 def exists(self, path):
829 """Test if there is a file or collection at "path" """
830 return self.find(path) != None
834 def remove(self, path, rm_r=False):
835 """Remove the file or subcollection (directory) at `path`.
837 Specify whether to remove non-empty subcollections (True), or raise an error (False).
841 # Remove '.' from the front of the path
845 item = self._items.get(p[0])
847 raise IOError((errno.ENOENT, "File not found"))
849 if isinstance(self._items[p[0]], SynchronizedCollectionBase) and len(self._items[p[0]]) > 0 and not rm_r:
850 raise IOError((errno.ENOTEMPTY, "Subcollection not empty"))
851 del self._items[p[0]]
852 self.notify(self, DEL, p[0], None)
855 item.remove("/".join(p))
857 raise IOError((errno.ENOENT, "File not found"))
859 def _cloneinto(self, target):
860 for k,v in self._items:
861 target._items[k] = v.clone(new_parent=target)
864 raise NotImplementedError()
868 def copyto(self, target_path, source_path, source_collection=None, overwrite=False):
870 copyto('/foo', '/bar') will overwrite 'foo' if it exists.
871 copyto('/foo/', '/bar') will place 'bar' in subcollection 'foo'
873 if source_collection is None:
874 source_collection = self
876 # Find the object to copy
877 sp = source_path.split("/")
878 source_obj = source_collection.find(source_path)
879 if source_obj is None:
880 raise IOError((errno.ENOENT, "File not found"))
882 # Find parent collection the target path
883 tp = target_path.split("/")
884 target_dir = self.find(tp[0:-1].join("/"), create=True, create_collection=True)
886 # Determine the name to use.
887 target_name = tp[-1] if tp[-1] else sp[-1]
889 if target_name in target_dir and not overwrite:
890 raise IOError((errno.EEXIST, "File already exists"))
892 # Actually make the copy.
893 dup = source_obj.clone(target_dir)
894 with target_dir.lock:
895 target_dir._items[target_name] = dup
897 self.notify(target_dir, ADD, target_name, dup)
901 def manifest_text(self, strip=False, normalize=False):
902 """Get the manifest text for this collection, sub collections and files.
905 If True, remove signing tokens from block locators if present.
906 If False, block locators are left unchanged.
909 If True, always export the manifest text in normalized form
910 even if the Collection is not modified. If False and the collection
911 is not modified, return the original manifest text even if it is not
915 if self.modified() or self._manifest_text is None or normalize:
916 return export_manifest(self, stream_name=".", portable_locators=strip)
919 return self.stripped_manifest()
921 return self._manifest_text
925 def merge(self, other):
926 for k in other.keys():
928 if isinstance(self[k], Subcollection) and isinstance(other[k], Subcollection):
929 self[k].merge(other[k])
931 if self[k] != other[k]:
932 name = "%s~conflict-%s~" % (k, time.strftime("%Y-%m-%d~%H:%M%:%S",
934 self[name] = other[k].clone(self)
935 self.notify(self, name, ADD, self[name])
937 self[k] = other[k].clone(self)
938 self.notify(self, k, ADD, self[k])
940 def portable_data_hash(self):
941 """Get the portable data hash for this collection's manifest."""
942 stripped = self.manifest_text(strip=True)
943 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
946 class Collection(SynchronizedCollectionBase):
947 """Store an Arvados collection, consisting of a set of files and
951 def __init__(self, manifest_locator_or_text=None,
959 """:manifest_locator_or_text:
960 One of Arvados collection UUID, block locator of
961 a manifest, raw manifest text, or None (to create an empty collection).
963 the parent Collection, may be None.
965 the arvados configuration to get the hostname and api token.
966 Prefer this over supplying your own api_client and keep_client (except in testing).
967 Will use default config settings if not specified.
969 The API client object to use for requests. If not specified, create one using `config`.
971 the Keep client to use for requests. If not specified, create one using `config`.
973 the number of retries for API and Keep requests.
975 the block manager to use. If not specified, create one.
977 Set synchronization policy with API server collection record.
979 Collection is read only. No synchronization. This mode will
980 also forego locking, which gives better performance.
982 Synchronize on explicit request via `update()` or `save()`
984 Synchronize with server in response to background websocket events,
985 on block write, or on file close.
988 super(Collection, self).__init__(parent)
989 self._api_client = api_client
990 self._keep_client = keep_client
991 self._block_manager = block_manager
992 self._config = config
993 self.num_retries = num_retries
994 self._manifest_locator = None
995 self._manifest_text = None
996 self._api_response = None
998 self.lock = threading.RLock()
1002 if manifest_locator_or_text:
1003 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
1004 self._manifest_locator = manifest_locator_or_text
1005 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
1006 self._manifest_locator = manifest_locator_or_text
1007 elif re.match(util.manifest_pattern, manifest_locator_or_text):
1008 self._manifest_text = manifest_locator_or_text
1010 raise errors.ArgumentError(
1011 "Argument to CollectionReader must be a manifest or a collection UUID")
1015 if self._sync == SYNC_LIVE:
1016 if not self._manifest_locator or not re.match(util.collection_uuid_pattern, self._manifest_locator):
1017 raise errors.ArgumentError("Cannot SYNC_LIVE unless a collection uuid is specified")
1018 self.events = events.subscribe(arvados.api(), [["object_uuid", "=", self._manifest_locator]], self.on_message)
1021 def create(name, owner_uuid=None, sync=SYNC_EXPLICIT):
1022 c = Collection(sync=sync)
1023 c.save_as(name, owner_uuid=owner_uuid, ensure_unique_name=True)
1026 def _root_lock(self):
1029 def sync_mode(self):
1032 def on_message(self):
1037 n = self._my_api().collections().get(uuid=self._manifest_locator, select=["manifest_text"]).execute()
1038 other = import_collection(n["manifest_text"])
1043 if self._api_client is None:
1044 self._api_client = arvados.api.SafeApi(self._config)
1045 self._keep_client = self._api_client.keep
1046 return self._api_client
1050 if self._keep_client is None:
1051 if self._api_client is None:
1054 self._keep_client = KeepClient(api=self._api_client)
1055 return self._keep_client
1058 def _my_block_manager(self):
1059 if self._block_manager is None:
1060 self._block_manager = BlockManager(self._my_keep())
1061 return self._block_manager
1063 def _populate_from_api_server(self):
1064 # As in KeepClient itself, we must wait until the last
1065 # possible moment to instantiate an API client, in order to
1066 # avoid tripping up clients that don't have access to an API
1067 # server. If we do build one, make sure our Keep client uses
1068 # it. If instantiation fails, we'll fall back to the except
1069 # clause, just like any other Collection lookup
1070 # failure. Return an exception, or None if successful.
1072 self._api_response = self._my_api().collections().get(
1073 uuid=self._manifest_locator).execute(
1074 num_retries=self.num_retries)
1075 self._manifest_text = self._api_response['manifest_text']
1077 except Exception as e:
1080 def _populate_from_keep(self):
1081 # Retrieve a manifest directly from Keep. This has a chance of
1082 # working if [a] the locator includes a permission signature
1083 # or [b] the Keep services are operating in world-readable
1084 # mode. Return an exception, or None if successful.
1086 self._manifest_text = self._my_keep().get(
1087 self._manifest_locator, num_retries=self.num_retries)
1088 except Exception as e:
1091 def _populate(self):
1092 if self._manifest_locator is None and self._manifest_text is None:
1094 error_via_api = None
1095 error_via_keep = None
1096 should_try_keep = ((self._manifest_text is None) and
1097 util.keep_locator_pattern.match(
1098 self._manifest_locator))
1099 if ((self._manifest_text is None) and
1100 util.signed_locator_pattern.match(self._manifest_locator)):
1101 error_via_keep = self._populate_from_keep()
1102 if self._manifest_text is None:
1103 error_via_api = self._populate_from_api_server()
1104 if error_via_api is not None and not should_try_keep:
1106 if ((self._manifest_text is None) and
1107 not error_via_keep and
1109 # Looks like a keep locator, and we didn't already try keep above
1110 error_via_keep = self._populate_from_keep()
1111 if self._manifest_text is None:
1113 raise arvados.errors.NotFoundError(
1114 ("Failed to retrieve collection '{}' " +
1115 "from either API server ({}) or Keep ({})."
1117 self._manifest_locator,
1121 import_manifest(self._manifest_text, self)
1123 if self._sync == SYNC_READONLY:
1124 # Now that we're populated, knowing that this will be readonly,
1125 # forego any further locking.
1126 self.lock = NoopLock()
1128 def __enter__(self):
1131 def __exit__(self, exc_type, exc_value, traceback):
1132 """Support scoped auto-commit in a with: block"""
1133 if self._sync != SYNC_READONLY:
1134 self.save(allow_no_locator=True)
1135 if self._block_manager is not None:
1136 self._block_manager.stop_threads()
1139 def clone(self, new_parent=None, new_sync=SYNC_READONLY, new_config=None):
1140 if new_config is None:
1141 new_config = self.config
1142 c = Collection(parent=new_parent, config=new_config, sync=new_sync)
1143 if new_sync == SYNC_READONLY:
1150 def api_response(self):
1152 api_response() -> dict or None
1154 Returns information about this Collection fetched from the API server.
1155 If the Collection exists in Keep but not the API server, currently
1156 returns None. Future versions may provide a synthetic response.
1158 return self._api_response
1162 def save(self, allow_no_locator=False):
1163 """Commit pending buffer blocks to Keep, write the manifest to Keep, and
1164 update the collection record to Keep.
1167 If there is no collection uuid associated with this
1168 Collection and `allow_no_locator` is False, raise an error. If True,
1169 do not raise an error.
1172 self._my_block_manager().commit_all()
1173 self._my_keep().put(self.manifest_text(strip=True))
1174 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
1175 self._api_response = self._my_api().collections().update(
1176 uuid=self._manifest_locator,
1177 body={'manifest_text': self.manifest_text(strip=False)}
1179 num_retries=self.num_retries)
1180 elif not allow_no_locator:
1181 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
1182 self.set_unmodified()
1186 def save_as(self, name, owner_uuid=None, ensure_unique_name=False):
1187 """Save a new collection record.
1190 The collection name.
1193 the user, or project uuid that will own this collection.
1194 If None, defaults to the current user.
1196 :ensure_unique_name:
1197 If True, ask the API server to rename the collection
1198 if it conflicts with a collection with the same name and owner. If
1199 False, a name conflict will result in an error.
1202 self._my_block_manager().commit_all()
1203 self._my_keep().put(self.manifest_text(strip=True))
1204 body = {"manifest_text": self.manifest_text(strip=False),
1207 body["owner_uuid"] = owner_uuid
1208 self._api_response = self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=self.num_retries)
1211 self.events.unsubscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1213 self._manifest_locator = self._api_response["uuid"]
1216 self.events.subscribe(filters=[["object_uuid", "=", self._manifest_locator]])
1218 self.set_unmodified()
1221 def subscribe(self, callback):
1222 self.callbacks.append(callback)
1225 def unsubscribe(self, callback):
1226 self.callbacks.remove(callback)
1229 def notify(self, collection, event, name, item):
1230 for c in self.callbacks:
1231 c(collection, event, name, item)
1233 class Subcollection(SynchronizedCollectionBase):
1234 """This is a subdirectory within a collection that doesn't have its own API
1235 server record. It falls under the umbrella of the root collection."""
1237 def __init__(self, parent):
1238 super(Subcollection, self).__init__(parent)
1239 self.lock = parent._root_lock()
1241 def _root_lock(self):
1242 return self.parent._root_lock()
1244 def sync_mode(self):
1245 return self.parent.sync_mode()
1248 return self.parent._my_api()
1251 return self.parent._my_keep()
1253 def _my_block_manager(self):
1254 return self.parent._my_block_manager()
1256 def _populate(self):
1257 self.parent._populate()
1259 def notify(self, collection, event, name, item):
1260 self.parent.notify(collection, event, name, item)
1263 def clone(self, new_parent):
1264 c = Subcollection(new_parent)
1269 def import_manifest(manifest_text,
1270 into_collection=None,
1274 sync=SYNC_READONLY):
1275 """Import a manifest into a `Collection`.
1278 The manifest text to import from.
1281 The `Collection` that will be initialized (must be empty).
1282 If None, create a new `Collection` object.
1285 The API client object that will be used when creating a new `Collection` object.
1288 The keep client object that will be used when creating a new `Collection` object.
1291 the default number of api client and keep retries on error.
1294 Collection sync mode (only if into_collection is None)
1296 if into_collection is not None:
1297 if len(into_collection) > 0:
1298 raise ArgumentError("Can only import manifest into an empty collection")
1301 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries, sync=sync)
1303 save_sync = c.sync_mode()
1313 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
1317 if state == STREAM_NAME:
1318 # starting a new stream
1319 stream_name = tok.replace('\\040', ' ')
1327 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
1329 blocksize = long(s.group(1))
1330 blocks.append(Range(tok, streamoffset, blocksize))
1331 streamoffset += blocksize
1335 if state == SEGMENTS:
1336 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
1338 pos = long(s.group(1))
1339 size = long(s.group(2))
1340 name = s.group(3).replace('\\040', ' ')
1341 f = c.find("%s/%s" % (stream_name, name), create=True)
1342 f.add_segment(blocks, pos, size)
1345 raise errors.SyntaxError("Invalid manifest format")
1355 def export_manifest(item, stream_name=".", portable_locators=False):
1358 Create a manifest for `item` (must be a `Collection` or `ArvadosFile`). If
1359 `item` is a is a `Collection`, this will also export subcollections.
1362 the name of the stream when exporting `item`.
1365 If True, strip any permission hints on block locators.
1366 If False, use block locators as-is.
1369 if isinstance(item, SynchronizedCollectionBase):
1371 sorted_keys = sorted(item.keys())
1372 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1375 for s in v.segments():
1377 if loc.startswith("bufferblock"):
1378 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1379 if portable_locators:
1380 loc = KeepLocator(loc).stripped()
1381 st.append(LocatorAndRange(loc, locator_block_size(loc),
1382 s.segment_offset, s.range_size))
1385 buf += ' '.join(normalize_stream(stream_name, stream))
1387 for k in [s for s in sorted_keys if isinstance(item[s], SynchronizedCollectionBase)]:
1388 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k), portable_locators=portable_locators)
1389 elif isinstance(item, ArvadosFile):
1391 for s in item.segments:
1393 if loc.startswith("bufferblock"):
1394 loc = item._bufferblocks[loc].calculate_locator()
1395 if portable_locators:
1396 loc = KeepLocator(loc).stripped()
1397 st.append(LocatorAndRange(loc, locator_block_size(loc),
1398 s.segment_offset, s.range_size))
1399 stream[stream_name] = st
1400 buf += ' '.join(normalize_stream(stream_name, stream))