7 from collections import deque
10 from .arvfile import ArvadosFileBase, split, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, BlockManager
12 from .stream import StreamReader, normalize_stream, locator_block_size
13 from .ranges import Range, LocatorAndRange
18 _logger = logging.getLogger('arvados.collection')
20 class CollectionBase(object):
24 def __exit__(self, exc_type, exc_value, traceback):
28 if self._keep_client is None:
29 self._keep_client = KeepClient(api_client=self._api_client,
30 num_retries=self.num_retries)
31 return self._keep_client
33 def stripped_manifest(self):
35 Return the manifest for the current collection with all
36 non-portable hints (i.e., permission signatures and other
37 hints other than size hints) removed from the locators.
39 raw = self.manifest_text()
41 for line in raw.split("\n"):
44 clean_fields = fields[:1] + [
45 (re.sub(r'\+[^\d][^\+]*', '', x)
46 if re.match(util.keep_locator_pattern, x)
49 clean += [' '.join(clean_fields), "\n"]
53 class CollectionReader(CollectionBase):
54 def __init__(self, manifest_locator_or_text, api_client=None,
55 keep_client=None, num_retries=0):
56 """Instantiate a CollectionReader.
58 This class parses Collection manifests to provide a simple interface
59 to read its underlying files.
62 * manifest_locator_or_text: One of a Collection UUID, portable data
63 hash, or full manifest text.
64 * api_client: The API client to use to look up Collections. If not
65 provided, CollectionReader will build one from available Arvados
67 * keep_client: The KeepClient to use to download Collection data.
68 If not provided, CollectionReader will build one from available
69 Arvados configuration.
70 * num_retries: The default number of times to retry failed
71 service requests. Default 0. You may change this value
72 after instantiation, but note those changes may not
73 propagate to related objects like the Keep client.
75 self._api_client = api_client
76 self._keep_client = keep_client
77 self.num_retries = num_retries
78 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
79 self._manifest_locator = manifest_locator_or_text
80 self._manifest_text = None
81 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
82 self._manifest_locator = manifest_locator_or_text
83 self._manifest_text = None
84 elif re.match(util.manifest_pattern, manifest_locator_or_text):
85 self._manifest_text = manifest_locator_or_text
86 self._manifest_locator = None
88 raise errors.ArgumentError(
89 "Argument to CollectionReader must be a manifest or a collection UUID")
90 self._api_response = None
93 def _populate_from_api_server(self):
94 # As in KeepClient itself, we must wait until the last
95 # possible moment to instantiate an API client, in order to
96 # avoid tripping up clients that don't have access to an API
97 # server. If we do build one, make sure our Keep client uses
98 # it. If instantiation fails, we'll fall back to the except
99 # clause, just like any other Collection lookup
100 # failure. Return an exception, or None if successful.
102 if self._api_client is None:
103 self._api_client = arvados.api('v1')
104 self._keep_client = None # Make a new one with the new api.
105 self._api_response = self._api_client.collections().get(
106 uuid=self._manifest_locator).execute(
107 num_retries=self.num_retries)
108 self._manifest_text = self._api_response['manifest_text']
110 except Exception as e:
113 def _populate_from_keep(self):
114 # Retrieve a manifest directly from Keep. This has a chance of
115 # working if [a] the locator includes a permission signature
116 # or [b] the Keep services are operating in world-readable
117 # mode. Return an exception, or None if successful.
119 self._manifest_text = self._my_keep().get(
120 self._manifest_locator, num_retries=self.num_retries)
121 except Exception as e:
126 error_via_keep = None
127 should_try_keep = ((self._manifest_text is None) and
128 util.keep_locator_pattern.match(
129 self._manifest_locator))
130 if ((self._manifest_text is None) and
131 util.signed_locator_pattern.match(self._manifest_locator)):
132 error_via_keep = self._populate_from_keep()
133 if self._manifest_text is None:
134 error_via_api = self._populate_from_api_server()
135 if error_via_api is not None and not should_try_keep:
137 if ((self._manifest_text is None) and
138 not error_via_keep and
140 # Looks like a keep locator, and we didn't already try keep above
141 error_via_keep = self._populate_from_keep()
142 if self._manifest_text is None:
144 raise arvados.errors.NotFoundError(
145 ("Failed to retrieve collection '{}' " +
146 "from either API server ({}) or Keep ({})."
148 self._manifest_locator,
151 self._streams = [sline.split()
152 for sline in self._manifest_text.split("\n")
155 def _populate_first(orig_func):
156 # Decorator for methods that read actual Collection data.
157 @functools.wraps(orig_func)
158 def wrapper(self, *args, **kwargs):
159 if self._streams is None:
161 return orig_func(self, *args, **kwargs)
165 def api_response(self):
166 """api_response() -> dict or None
168 Returns information about this Collection fetched from the API server.
169 If the Collection exists in Keep but not the API server, currently
170 returns None. Future versions may provide a synthetic response.
172 return self._api_response
178 for s in self.all_streams():
179 for f in s.all_files():
180 streamname, filename = split(s.name() + "/" + f.name())
181 if streamname not in streams:
182 streams[streamname] = {}
183 if filename not in streams[streamname]:
184 streams[streamname][filename] = []
186 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
188 self._streams = [normalize_stream(s, streams[s])
189 for s in sorted(streams)]
191 # Regenerate the manifest text based on the normalized streams
192 self._manifest_text = ''.join(
193 [StreamReader(stream, keep=self._my_keep()).manifest_text()
194 for stream in self._streams])
197 def open(self, streampath, filename=None):
198 """open(streampath[, filename]) -> file-like object
200 Pass in the path of a file to read from the Collection, either as a
201 single string or as two separate stream name and file name arguments.
202 This method returns a file-like object to read that file.
205 streampath, filename = split(streampath)
206 keep_client = self._my_keep()
207 for stream_s in self._streams:
208 stream = StreamReader(stream_s, keep_client,
209 num_retries=self.num_retries)
210 if stream.name() == streampath:
213 raise ValueError("stream '{}' not found in Collection".
216 return stream.files()[filename]
218 raise ValueError("file '{}' not found in Collection stream '{}'".
219 format(filename, streampath))
222 def all_streams(self):
223 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
224 for s in self._streams]
227 for s in self.all_streams():
228 for f in s.all_files():
232 def manifest_text(self, strip=False, normalize=False):
234 cr = CollectionReader(self.manifest_text())
236 return cr.manifest_text(strip=strip, normalize=False)
238 return self.stripped_manifest()
240 return self._manifest_text
243 class _WriterFile(ArvadosFileBase):
244 def __init__(self, coll_writer, name):
245 super(_WriterFile, self).__init__(name, 'wb')
246 self.dest = coll_writer
249 super(_WriterFile, self).close()
250 self.dest.finish_current_file()
252 @ArvadosFileBase._before_close
253 def write(self, data):
254 self.dest.write(data)
256 @ArvadosFileBase._before_close
257 def writelines(self, seq):
261 @ArvadosFileBase._before_close
263 self.dest.flush_data()
266 class CollectionWriter(CollectionBase):
267 def __init__(self, api_client=None, num_retries=0):
268 """Instantiate a CollectionWriter.
270 CollectionWriter lets you build a new Arvados Collection from scratch.
271 Write files to it. The CollectionWriter will upload data to Keep as
272 appropriate, and provide you with the Collection manifest text when
276 * api_client: The API client to use to look up Collections. If not
277 provided, CollectionReader will build one from available Arvados
279 * num_retries: The default number of times to retry failed
280 service requests. Default 0. You may change this value
281 after instantiation, but note those changes may not
282 propagate to related objects like the Keep client.
284 self._api_client = api_client
285 self.num_retries = num_retries
286 self._keep_client = None
287 self._data_buffer = []
288 self._data_buffer_len = 0
289 self._current_stream_files = []
290 self._current_stream_length = 0
291 self._current_stream_locators = []
292 self._current_stream_name = '.'
293 self._current_file_name = None
294 self._current_file_pos = 0
295 self._finished_streams = []
296 self._close_file = None
297 self._queued_file = None
298 self._queued_dirents = deque()
299 self._queued_trees = deque()
300 self._last_open = None
302 def __exit__(self, exc_type, exc_value, traceback):
306 def do_queued_work(self):
307 # The work queue consists of three pieces:
308 # * _queued_file: The file object we're currently writing to the
310 # * _queued_dirents: Entries under the current directory
311 # (_queued_trees[0]) that we want to write or recurse through.
312 # This may contain files from subdirectories if
313 # max_manifest_depth == 0 for this directory.
314 # * _queued_trees: Directories that should be written as separate
315 # streams to the Collection.
316 # This function handles the smallest piece of work currently queued
317 # (current file, then current directory, then next directory) until
318 # no work remains. The _work_THING methods each do a unit of work on
319 # THING. _queue_THING methods add a THING to the work queue.
321 if self._queued_file:
323 elif self._queued_dirents:
325 elif self._queued_trees:
330 def _work_file(self):
332 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
336 self.finish_current_file()
338 self._queued_file.close()
339 self._close_file = None
340 self._queued_file = None
342 def _work_dirents(self):
343 path, stream_name, max_manifest_depth = self._queued_trees[0]
344 if stream_name != self.current_stream_name():
345 self.start_new_stream(stream_name)
346 while self._queued_dirents:
347 dirent = self._queued_dirents.popleft()
348 target = os.path.join(path, dirent)
349 if os.path.isdir(target):
350 self._queue_tree(target,
351 os.path.join(stream_name, dirent),
352 max_manifest_depth - 1)
354 self._queue_file(target, dirent)
356 if not self._queued_dirents:
357 self._queued_trees.popleft()
359 def _work_trees(self):
360 path, stream_name, max_manifest_depth = self._queued_trees[0]
361 d = util.listdir_recursive(
362 path, max_depth = (None if max_manifest_depth == 0 else 0))
364 self._queue_dirents(stream_name, d)
366 self._queued_trees.popleft()
368 def _queue_file(self, source, filename=None):
369 assert (self._queued_file is None), "tried to queue more than one file"
370 if not hasattr(source, 'read'):
371 source = open(source, 'rb')
372 self._close_file = True
374 self._close_file = False
376 filename = os.path.basename(source.name)
377 self.start_new_file(filename)
378 self._queued_file = source
380 def _queue_dirents(self, stream_name, dirents):
381 assert (not self._queued_dirents), "tried to queue more than one tree"
382 self._queued_dirents = deque(sorted(dirents))
384 def _queue_tree(self, path, stream_name, max_manifest_depth):
385 self._queued_trees.append((path, stream_name, max_manifest_depth))
387 def write_file(self, source, filename=None):
388 self._queue_file(source, filename)
389 self.do_queued_work()
391 def write_directory_tree(self,
392 path, stream_name='.', max_manifest_depth=-1):
393 self._queue_tree(path, stream_name, max_manifest_depth)
394 self.do_queued_work()
396 def write(self, newdata):
397 if hasattr(newdata, '__iter__'):
401 self._data_buffer.append(newdata)
402 self._data_buffer_len += len(newdata)
403 self._current_stream_length += len(newdata)
404 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
407 def open(self, streampath, filename=None):
408 """open(streampath[, filename]) -> file-like object
410 Pass in the path of a file to write to the Collection, either as a
411 single string or as two separate stream name and file name arguments.
412 This method returns a file-like object you can write to add it to the
415 You may only have one file object from the Collection open at a time,
416 so be sure to close the object when you're done. Using the object in
417 a with statement makes that easy::
419 with cwriter.open('./doc/page1.txt') as outfile:
420 outfile.write(page1_data)
421 with cwriter.open('./doc/page2.txt') as outfile:
422 outfile.write(page2_data)
425 streampath, filename = split(streampath)
426 if self._last_open and not self._last_open.closed:
427 raise errors.AssertionError(
428 "can't open '{}' when '{}' is still open".format(
429 filename, self._last_open.name))
430 if streampath != self.current_stream_name():
431 self.start_new_stream(streampath)
432 self.set_current_file_name(filename)
433 self._last_open = _WriterFile(self, filename)
434 return self._last_open
436 def flush_data(self):
437 data_buffer = ''.join(self._data_buffer)
439 self._current_stream_locators.append(
440 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
441 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
442 self._data_buffer_len = len(self._data_buffer[0])
444 def start_new_file(self, newfilename=None):
445 self.finish_current_file()
446 self.set_current_file_name(newfilename)
448 def set_current_file_name(self, newfilename):
449 if re.search(r'[\t\n]', newfilename):
450 raise errors.AssertionError(
451 "Manifest filenames cannot contain whitespace: %s" %
453 elif re.search(r'\x00', newfilename):
454 raise errors.AssertionError(
455 "Manifest filenames cannot contain NUL characters: %s" %
457 self._current_file_name = newfilename
459 def current_file_name(self):
460 return self._current_file_name
462 def finish_current_file(self):
463 if self._current_file_name is None:
464 if self._current_file_pos == self._current_stream_length:
466 raise errors.AssertionError(
467 "Cannot finish an unnamed file " +
468 "(%d bytes at offset %d in '%s' stream)" %
469 (self._current_stream_length - self._current_file_pos,
470 self._current_file_pos,
471 self._current_stream_name))
472 self._current_stream_files.append([
473 self._current_file_pos,
474 self._current_stream_length - self._current_file_pos,
475 self._current_file_name])
476 self._current_file_pos = self._current_stream_length
477 self._current_file_name = None
479 def start_new_stream(self, newstreamname='.'):
480 self.finish_current_stream()
481 self.set_current_stream_name(newstreamname)
483 def set_current_stream_name(self, newstreamname):
484 if re.search(r'[\t\n]', newstreamname):
485 raise errors.AssertionError(
486 "Manifest stream names cannot contain whitespace")
487 self._current_stream_name = '.' if newstreamname=='' else newstreamname
489 def current_stream_name(self):
490 return self._current_stream_name
492 def finish_current_stream(self):
493 self.finish_current_file()
495 if not self._current_stream_files:
497 elif self._current_stream_name is None:
498 raise errors.AssertionError(
499 "Cannot finish an unnamed stream (%d bytes in %d files)" %
500 (self._current_stream_length, len(self._current_stream_files)))
502 if not self._current_stream_locators:
503 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
504 self._finished_streams.append([self._current_stream_name,
505 self._current_stream_locators,
506 self._current_stream_files])
507 self._current_stream_files = []
508 self._current_stream_length = 0
509 self._current_stream_locators = []
510 self._current_stream_name = None
511 self._current_file_pos = 0
512 self._current_file_name = None
515 # Store the manifest in Keep and return its locator.
516 return self._my_keep().put(self.manifest_text())
518 def portable_data_hash(self):
519 stripped = self.stripped_manifest()
520 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
522 def manifest_text(self):
523 self.finish_current_stream()
526 for stream in self._finished_streams:
527 if not re.search(r'^\.(/.*)?$', stream[0]):
529 manifest += stream[0].replace(' ', '\\040')
530 manifest += ' ' + ' '.join(stream[1])
531 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
536 def data_locators(self):
538 for name, locators, files in self._finished_streams:
543 class ResumableCollectionWriter(CollectionWriter):
544 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
545 '_current_stream_locators', '_current_stream_name',
546 '_current_file_name', '_current_file_pos', '_close_file',
547 '_data_buffer', '_dependencies', '_finished_streams',
548 '_queued_dirents', '_queued_trees']
550 def __init__(self, api_client=None, num_retries=0):
551 self._dependencies = {}
552 super(ResumableCollectionWriter, self).__init__(
553 api_client, num_retries=num_retries)
556 def from_state(cls, state, *init_args, **init_kwargs):
557 # Try to build a new writer from scratch with the given state.
558 # If the state is not suitable to resume (because files have changed,
559 # been deleted, aren't predictable, etc.), raise a
560 # StaleWriterStateError. Otherwise, return the initialized writer.
561 # The caller is responsible for calling writer.do_queued_work()
562 # appropriately after it's returned.
563 writer = cls(*init_args, **init_kwargs)
564 for attr_name in cls.STATE_PROPS:
565 attr_value = state[attr_name]
566 attr_class = getattr(writer, attr_name).__class__
567 # Coerce the value into the same type as the initial value, if
569 if attr_class not in (type(None), attr_value.__class__):
570 attr_value = attr_class(attr_value)
571 setattr(writer, attr_name, attr_value)
572 # Check dependencies before we try to resume anything.
573 if any(KeepLocator(ls).permission_expired()
574 for ls in writer._current_stream_locators):
575 raise errors.StaleWriterStateError(
576 "locators include expired permission hint")
577 writer.check_dependencies()
578 if state['_current_file'] is not None:
579 path, pos = state['_current_file']
581 writer._queued_file = open(path, 'rb')
582 writer._queued_file.seek(pos)
583 except IOError as error:
584 raise errors.StaleWriterStateError(
585 "failed to reopen active file {}: {}".format(path, error))
588 def check_dependencies(self):
589 for path, orig_stat in self._dependencies.items():
590 if not S_ISREG(orig_stat[ST_MODE]):
591 raise errors.StaleWriterStateError("{} not file".format(path))
593 now_stat = tuple(os.stat(path))
594 except OSError as error:
595 raise errors.StaleWriterStateError(
596 "failed to stat {}: {}".format(path, error))
597 if ((not S_ISREG(now_stat[ST_MODE])) or
598 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
599 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
600 raise errors.StaleWriterStateError("{} changed".format(path))
602 def dump_state(self, copy_func=lambda x: x):
603 state = {attr: copy_func(getattr(self, attr))
604 for attr in self.STATE_PROPS}
605 if self._queued_file is None:
606 state['_current_file'] = None
608 state['_current_file'] = (os.path.realpath(self._queued_file.name),
609 self._queued_file.tell())
612 def _queue_file(self, source, filename=None):
614 src_path = os.path.realpath(source)
616 raise errors.AssertionError("{} not a file path".format(source))
618 path_stat = os.stat(src_path)
619 except OSError as stat_error:
621 super(ResumableCollectionWriter, self)._queue_file(source, filename)
622 fd_stat = os.fstat(self._queued_file.fileno())
623 if not S_ISREG(fd_stat.st_mode):
624 # We won't be able to resume from this cache anyway, so don't
625 # worry about further checks.
626 self._dependencies[source] = tuple(fd_stat)
627 elif path_stat is None:
628 raise errors.AssertionError(
629 "could not stat {}: {}".format(source, stat_error))
630 elif path_stat.st_ino != fd_stat.st_ino:
631 raise errors.AssertionError(
632 "{} changed between open and stat calls".format(source))
634 self._dependencies[src_path] = tuple(fd_stat)
636 def write(self, data):
637 if self._queued_file is None:
638 raise errors.AssertionError(
639 "resumable writer can't accept unsourced data")
640 return super(ResumableCollectionWriter, self).write(data)
643 class Collection(CollectionBase):
644 def __init__(self, parent=None, manifest_locator_or_text=None, api_client=None,
645 keep_client=None, num_retries=0, block_manager=None):
647 self._parent = parent
649 self._api_client = api_client
650 self._keep_client = keep_client
651 self._block_manager = block_manager
653 self.num_retries = num_retries
654 self._manifest_locator = None
655 self._manifest_text = None
656 self._api_response = None
658 if manifest_locator_or_text:
659 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
660 self._manifest_locator = manifest_locator_or_text
661 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
662 self._manifest_locator = manifest_locator_or_text
663 elif re.match(util.manifest_pattern, manifest_locator_or_text):
664 self._manifest_text = manifest_locator_or_text
666 raise errors.ArgumentError(
667 "Argument to CollectionReader must be a manifest or a collection UUID")
670 if self._api_client is None:
671 if self._parent is not None:
672 return self._parent._my_api()
673 self._api_client = arvados.api('v1')
674 self._keep_client = None # Make a new one with the new api.
675 return self._api_client
678 if self._keep_client is None:
679 if self._parent is not None:
680 return self._parent._my_keep()
681 self._keep_client = KeepClient(api_client=self._my_api(),
682 num_retries=self.num_retries)
683 return self._keep_client
685 def _my_block_manager(self):
686 if self._block_manager is None:
687 if self._parent is not None:
688 return self._parent._my_block_manager()
689 self._block_manager = BlockManager(self._my_keep())
690 return self._block_manager
692 def _populate_from_api_server(self):
693 # As in KeepClient itself, we must wait until the last
694 # possible moment to instantiate an API client, in order to
695 # avoid tripping up clients that don't have access to an API
696 # server. If we do build one, make sure our Keep client uses
697 # it. If instantiation fails, we'll fall back to the except
698 # clause, just like any other Collection lookup
699 # failure. Return an exception, or None if successful.
701 self._api_response = self._my_api().collections().get(
702 uuid=self._manifest_locator).execute(
703 num_retries=self.num_retries)
704 self._manifest_text = self._api_response['manifest_text']
706 except Exception as e:
709 def _populate_from_keep(self):
710 # Retrieve a manifest directly from Keep. This has a chance of
711 # working if [a] the locator includes a permission signature
712 # or [b] the Keep services are operating in world-readable
713 # mode. Return an exception, or None if successful.
715 self._manifest_text = self._my_keep().get(
716 self._manifest_locator, num_retries=self.num_retries)
717 except Exception as e:
722 if self._manifest_locator is None and self._manifest_text is None:
725 error_via_keep = None
726 should_try_keep = ((self._manifest_text is None) and
727 util.keep_locator_pattern.match(
728 self._manifest_locator))
729 if ((self._manifest_text is None) and
730 util.signed_locator_pattern.match(self._manifest_locator)):
731 error_via_keep = self._populate_from_keep()
732 if self._manifest_text is None:
733 error_via_api = self._populate_from_api_server()
734 if error_via_api is not None and not should_try_keep:
736 if ((self._manifest_text is None) and
737 not error_via_keep and
739 # Looks like a keep locator, and we didn't already try keep above
740 error_via_keep = self._populate_from_keep()
741 if self._manifest_text is None:
743 raise arvados.errors.NotFoundError(
744 ("Failed to retrieve collection '{}' " +
745 "from either API server ({}) or Keep ({})."
747 self._manifest_locator,
751 import_manifest(self._manifest_text, self)
753 def _populate_first(orig_func):
754 # Decorator for methods that read actual Collection data.
755 @functools.wraps(orig_func)
756 def wrapper(self, *args, **kwargs):
757 if self._items is None:
759 return orig_func(self, *args, **kwargs)
765 def __exit__(self, exc_type, exc_value, traceback):
766 self.save(no_locator=True)
767 if self._block_manager is not None:
768 self._block_manager.stop_threads()
771 def find(self, path, create=False):
777 item = self._items.get(p[0])
779 # item must be a file
780 if item is None and create:
782 item = ArvadosFile(self, keep=self._keep_client)
783 self._items[p[0]] = item
786 if item is None and create:
787 # create new collection
788 item = Collection(parent=self, num_retries=self.num_retries)
789 self._items[p[0]] = item
791 return item.find("/".join(p), create=create)
796 def api_response(self):
797 """api_response() -> dict or None
799 Returns information about this Collection fetched from the API server.
800 If the Collection exists in Keep but not the API server, currently
801 returns None. Future versions may provide a synthetic response.
803 return self._api_response
805 def open(self, path, mode):
806 mode = mode.replace("b", "")
807 if len(mode) == 0 or mode[0] not in ("r", "w", "a"):
808 raise ArgumentError("Bad mode '%s'" % mode)
809 create = (mode != "r")
811 f = self.find(path, create=create)
813 raise IOError((errno.ENOENT, "File not found"))
814 if not isinstance(f, ArvadosFile):
815 raise IOError((errno.EISDIR, "Path must refer to a file."))
821 return ArvadosFileReader(f, path, mode)
823 return ArvadosFileWriter(f, path, mode)
827 for k,v in self._items.items():
833 def set_unmodified(self):
834 for k,v in self._items.items():
839 self._items.iterkeys()
843 self._items.iterkeys()
846 def __getitem__(self, k):
847 return self._items[k]
850 def __contains__(self, k):
851 return k in self._items
855 return len(self._items)
858 def __delitem__(self, p):
863 return self._items.keys()
867 return self._items.values()
871 return self._items.items()
874 def exists(self, path):
875 return self.find(path) != None
878 def remove(self, path):
884 item = self._items.get(p[0])
886 raise IOError((errno.ENOENT, "File not found"))
888 del self._items[p[0]]
891 item.remove("/".join(p))
893 raise IOError((errno.ENOENT, "File not found"))
896 def manifest_text(self, strip=False, normalize=False):
897 if self.modified() or self._manifest_text is None or normalize:
898 return export_manifest(self, stream_name=".", portable_locators=strip)
901 return self.stripped_manifest()
903 return self._manifest_text
905 def portable_data_hash(self):
906 stripped = self.manifest_text(strip=True)
907 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
910 def save(self, no_locator=False):
912 self._my_block_manager().commit_all()
913 self._my_keep().put(self.manifest_text(strip=True))
914 if self._manifest_locator is not None and re.match(util.collection_uuid_pattern, self._manifest_locator):
915 self._api_response = self._my_api().collections().update(
916 uuid=self._manifest_locator,
917 body={'manifest_text': self.manifest_text(strip=False)}
919 num_retries=self.num_retries)
921 raise AssertionError("Collection manifest_locator must be a collection uuid. Use save_as() for new collections.")
922 self.set_unmodified()
925 def save_as(self, name, owner_uuid=None):
926 self._my_block_manager().commit_all()
927 self._my_keep().put(self.manifest_text(strip=True))
928 body = {"manifest_text": self.manifest_text(strip=False),
931 body["owner_uuid"] = owner_uuid
932 self._api_response = self._my_api().collections().create(body=body).execute(num_retries=self.num_retries)
933 self._manifest_locator = self._api_response["uuid"]
934 self.set_unmodified()
937 def import_manifest(manifest_text, into_collection=None, api_client=None, keep=None, num_retries=None):
938 if into_collection is not None:
939 if len(into_collection) > 0:
940 raise ArgumentError("Can only import manifest into an empty collection")
943 c = Collection(api_client=api_client, keep_client=keep, num_retries=num_retries)
952 for n in re.finditer(r'(\S+)(\s+|$)', manifest_text):
956 if state == STREAM_NAME:
957 # starting a new stream
958 stream_name = tok.replace('\\040', ' ')
966 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
968 blocksize = long(s.group(1))
969 blocks.append(Range(tok, streamoffset, blocksize))
970 streamoffset += blocksize
974 if state == SEGMENTS:
975 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
977 pos = long(s.group(1))
978 size = long(s.group(2))
979 name = s.group(3).replace('\\040', ' ')
980 f = c.find("%s/%s" % (stream_name, name), create=True)
981 f.add_segment(blocks, pos, size)
984 raise errors.SyntaxError("Invalid manifest format")
993 def export_manifest(item, stream_name=".", portable_locators=False):
995 if isinstance(item, Collection):
997 sorted_keys = sorted(item.keys())
998 for k in [s for s in sorted_keys if isinstance(item[s], ArvadosFile)]:
1001 for s in v._segments:
1003 if loc.startswith("bufferblock"):
1004 loc = v.parent._my_block_manager()._bufferblocks[loc].locator()
1005 st.append(LocatorAndRange(loc, locator_block_size(loc),
1006 s.segment_offset, s.range_size))
1009 buf += ' '.join(normalize_stream(stream_name, stream))
1011 for k in [s for s in sorted_keys if isinstance(item[s], Collection)]:
1012 buf += export_manifest(item[k], stream_name=os.path.join(stream_name, k))
1013 elif isinstance(item, ArvadosFile):
1015 for s in item._segments:
1017 if loc.startswith("bufferblock"):
1018 loc = item._bufferblocks[loc].calculate_locator()
1019 st.append(LocatorAndRange(loc, locator_block_size(loc),
1020 s.segment_offset, s.range_size))
1021 stream[stream_name] = st
1022 buf += ' '.join(normalize_stream(stream_name, stream))