6 from collections import deque
9 from .arvfile import ArvadosFileBase, split, ArvadosFile
11 from .stream import StreamReader, normalize_stream
12 from .ranges import Range
17 _logger = logging.getLogger('arvados.collection')
19 class CollectionBase(object):
23 def __exit__(self, exc_type, exc_value, traceback):
27 if self._keep_client is None:
28 self._keep_client = KeepClient(api_client=self._api_client,
29 num_retries=self.num_retries)
30 return self._keep_client
32 def stripped_manifest(self):
34 Return the manifest for the current collection with all
35 non-portable hints (i.e., permission signatures and other
36 hints other than size hints) removed from the locators.
38 raw = self.manifest_text()
40 for line in raw.split("\n"):
43 clean_fields = fields[:1] + [
44 (re.sub(r'\+[^\d][^\+]*', '', x)
45 if re.match(util.keep_locator_pattern, x)
48 clean += [' '.join(clean_fields), "\n"]
52 class CollectionReader(CollectionBase):
53 def __init__(self, manifest_locator_or_text, api_client=None,
54 keep_client=None, num_retries=0):
55 """Instantiate a CollectionReader.
57 This class parses Collection manifests to provide a simple interface
58 to read its underlying files.
61 * manifest_locator_or_text: One of a Collection UUID, portable data
62 hash, or full manifest text.
63 * api_client: The API client to use to look up Collections. If not
64 provided, CollectionReader will build one from available Arvados
66 * keep_client: The KeepClient to use to download Collection data.
67 If not provided, CollectionReader will build one from available
68 Arvados configuration.
69 * num_retries: The default number of times to retry failed
70 service requests. Default 0. You may change this value
71 after instantiation, but note those changes may not
72 propagate to related objects like the Keep client.
74 self._api_client = api_client
75 self._keep_client = keep_client
76 self.num_retries = num_retries
77 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
78 self._manifest_locator = manifest_locator_or_text
79 self._manifest_text = None
80 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
81 self._manifest_locator = manifest_locator_or_text
82 self._manifest_text = None
83 elif re.match(util.manifest_pattern, manifest_locator_or_text):
84 self._manifest_text = manifest_locator_or_text
85 self._manifest_locator = None
87 raise errors.ArgumentError(
88 "Argument to CollectionReader must be a manifest or a collection UUID")
89 self._api_response = None
92 def _populate_from_api_server(self):
93 # As in KeepClient itself, we must wait until the last
94 # possible moment to instantiate an API client, in order to
95 # avoid tripping up clients that don't have access to an API
96 # server. If we do build one, make sure our Keep client uses
97 # it. If instantiation fails, we'll fall back to the except
98 # clause, just like any other Collection lookup
99 # failure. Return an exception, or None if successful.
101 if self._api_client is None:
102 self._api_client = arvados.api('v1')
103 self._keep_client = None # Make a new one with the new api.
104 self._api_response = self._api_client.collections().get(
105 uuid=self._manifest_locator).execute(
106 num_retries=self.num_retries)
107 self._manifest_text = self._api_response['manifest_text']
109 except Exception as e:
112 def _populate_from_keep(self):
113 # Retrieve a manifest directly from Keep. This has a chance of
114 # working if [a] the locator includes a permission signature
115 # or [b] the Keep services are operating in world-readable
116 # mode. Return an exception, or None if successful.
118 self._manifest_text = self._my_keep().get(
119 self._manifest_locator, num_retries=self.num_retries)
120 except Exception as e:
125 error_via_keep = None
126 should_try_keep = ((self._manifest_text is None) and
127 util.keep_locator_pattern.match(
128 self._manifest_locator))
129 if ((self._manifest_text is None) and
130 util.signed_locator_pattern.match(self._manifest_locator)):
131 error_via_keep = self._populate_from_keep()
132 if self._manifest_text is None:
133 error_via_api = self._populate_from_api_server()
134 if error_via_api is not None and not should_try_keep:
136 if ((self._manifest_text is None) and
137 not error_via_keep and
139 # Looks like a keep locator, and we didn't already try keep above
140 error_via_keep = self._populate_from_keep()
141 if self._manifest_text is None:
143 raise arvados.errors.NotFoundError(
144 ("Failed to retrieve collection '{}' " +
145 "from either API server ({}) or Keep ({})."
147 self._manifest_locator,
150 self._streams = [sline.split()
151 for sline in self._manifest_text.split("\n")
154 def _populate_first(orig_func):
155 # Decorator for methods that read actual Collection data.
156 @functools.wraps(orig_func)
157 def wrapper(self, *args, **kwargs):
158 if self._streams is None:
160 return orig_func(self, *args, **kwargs)
164 def api_response(self):
165 """api_response() -> dict or None
167 Returns information about this Collection fetched from the API server.
168 If the Collection exists in Keep but not the API server, currently
169 returns None. Future versions may provide a synthetic response.
171 return self._api_response
177 for s in self.all_streams():
178 for f in s.all_files():
179 streamname, filename = split(s.name() + "/" + f.name())
180 if streamname not in streams:
181 streams[streamname] = {}
182 if filename not in streams[streamname]:
183 streams[streamname][filename] = []
185 streams[streamname][filename].extend(s.locators_and_ranges(r.locator, r.range_size))
187 self._streams = [normalize_stream(s, streams[s])
188 for s in sorted(streams)]
190 # Regenerate the manifest text based on the normalized streams
191 self._manifest_text = ''.join(
192 [StreamReader(stream, keep=self._my_keep()).manifest_text()
193 for stream in self._streams])
196 def open(self, streampath, filename=None):
197 """open(streampath[, filename]) -> file-like object
199 Pass in the path of a file to read from the Collection, either as a
200 single string or as two separate stream name and file name arguments.
201 This method returns a file-like object to read that file.
204 streampath, filename = split(streampath)
205 keep_client = self._my_keep()
206 for stream_s in self._streams:
207 stream = StreamReader(stream_s, keep_client,
208 num_retries=self.num_retries)
209 if stream.name() == streampath:
212 raise ValueError("stream '{}' not found in Collection".
215 return stream.files()[filename]
217 raise ValueError("file '{}' not found in Collection stream '{}'".
218 format(filename, streampath))
221 def all_streams(self):
222 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
223 for s in self._streams]
226 for s in self.all_streams():
227 for f in s.all_files():
231 def manifest_text(self, strip=False, normalize=False):
233 cr = CollectionReader(self.manifest_text())
235 return cr.manifest_text(strip=strip, normalize=False)
237 return self.stripped_manifest()
239 return self._manifest_text
242 class _WriterFile(ArvadosFileBase):
243 def __init__(self, coll_writer, name):
244 super(_WriterFile, self).__init__(name, 'wb')
245 self.dest = coll_writer
248 super(_WriterFile, self).close()
249 self.dest.finish_current_file()
251 @ArvadosFileBase._before_close
252 def write(self, data):
253 self.dest.write(data)
255 @ArvadosFileBase._before_close
256 def writelines(self, seq):
260 @ArvadosFileBase._before_close
262 self.dest.flush_data()
265 class CollectionWriter(CollectionBase):
266 def __init__(self, api_client=None, num_retries=0):
267 """Instantiate a CollectionWriter.
269 CollectionWriter lets you build a new Arvados Collection from scratch.
270 Write files to it. The CollectionWriter will upload data to Keep as
271 appropriate, and provide you with the Collection manifest text when
275 * api_client: The API client to use to look up Collections. If not
276 provided, CollectionReader will build one from available Arvados
278 * num_retries: The default number of times to retry failed
279 service requests. Default 0. You may change this value
280 after instantiation, but note those changes may not
281 propagate to related objects like the Keep client.
283 self._api_client = api_client
284 self.num_retries = num_retries
285 self._keep_client = None
286 self._data_buffer = []
287 self._data_buffer_len = 0
288 self._current_stream_files = []
289 self._current_stream_length = 0
290 self._current_stream_locators = []
291 self._current_stream_name = '.'
292 self._current_file_name = None
293 self._current_file_pos = 0
294 self._finished_streams = []
295 self._close_file = None
296 self._queued_file = None
297 self._queued_dirents = deque()
298 self._queued_trees = deque()
299 self._last_open = None
301 def __exit__(self, exc_type, exc_value, traceback):
305 def do_queued_work(self):
306 # The work queue consists of three pieces:
307 # * _queued_file: The file object we're currently writing to the
309 # * _queued_dirents: Entries under the current directory
310 # (_queued_trees[0]) that we want to write or recurse through.
311 # This may contain files from subdirectories if
312 # max_manifest_depth == 0 for this directory.
313 # * _queued_trees: Directories that should be written as separate
314 # streams to the Collection.
315 # This function handles the smallest piece of work currently queued
316 # (current file, then current directory, then next directory) until
317 # no work remains. The _work_THING methods each do a unit of work on
318 # THING. _queue_THING methods add a THING to the work queue.
320 if self._queued_file:
322 elif self._queued_dirents:
324 elif self._queued_trees:
329 def _work_file(self):
331 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
335 self.finish_current_file()
337 self._queued_file.close()
338 self._close_file = None
339 self._queued_file = None
341 def _work_dirents(self):
342 path, stream_name, max_manifest_depth = self._queued_trees[0]
343 if stream_name != self.current_stream_name():
344 self.start_new_stream(stream_name)
345 while self._queued_dirents:
346 dirent = self._queued_dirents.popleft()
347 target = os.path.join(path, dirent)
348 if os.path.isdir(target):
349 self._queue_tree(target,
350 os.path.join(stream_name, dirent),
351 max_manifest_depth - 1)
353 self._queue_file(target, dirent)
355 if not self._queued_dirents:
356 self._queued_trees.popleft()
358 def _work_trees(self):
359 path, stream_name, max_manifest_depth = self._queued_trees[0]
360 d = util.listdir_recursive(
361 path, max_depth = (None if max_manifest_depth == 0 else 0))
363 self._queue_dirents(stream_name, d)
365 self._queued_trees.popleft()
367 def _queue_file(self, source, filename=None):
368 assert (self._queued_file is None), "tried to queue more than one file"
369 if not hasattr(source, 'read'):
370 source = open(source, 'rb')
371 self._close_file = True
373 self._close_file = False
375 filename = os.path.basename(source.name)
376 self.start_new_file(filename)
377 self._queued_file = source
379 def _queue_dirents(self, stream_name, dirents):
380 assert (not self._queued_dirents), "tried to queue more than one tree"
381 self._queued_dirents = deque(sorted(dirents))
383 def _queue_tree(self, path, stream_name, max_manifest_depth):
384 self._queued_trees.append((path, stream_name, max_manifest_depth))
386 def write_file(self, source, filename=None):
387 self._queue_file(source, filename)
388 self.do_queued_work()
390 def write_directory_tree(self,
391 path, stream_name='.', max_manifest_depth=-1):
392 self._queue_tree(path, stream_name, max_manifest_depth)
393 self.do_queued_work()
395 def write(self, newdata):
396 if hasattr(newdata, '__iter__'):
400 self._data_buffer.append(newdata)
401 self._data_buffer_len += len(newdata)
402 self._current_stream_length += len(newdata)
403 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
406 def open(self, streampath, filename=None):
407 """open(streampath[, filename]) -> file-like object
409 Pass in the path of a file to write to the Collection, either as a
410 single string or as two separate stream name and file name arguments.
411 This method returns a file-like object you can write to add it to the
414 You may only have one file object from the Collection open at a time,
415 so be sure to close the object when you're done. Using the object in
416 a with statement makes that easy::
418 with cwriter.open('./doc/page1.txt') as outfile:
419 outfile.write(page1_data)
420 with cwriter.open('./doc/page2.txt') as outfile:
421 outfile.write(page2_data)
424 streampath, filename = split(streampath)
425 if self._last_open and not self._last_open.closed:
426 raise errors.AssertionError(
427 "can't open '{}' when '{}' is still open".format(
428 filename, self._last_open.name))
429 if streampath != self.current_stream_name():
430 self.start_new_stream(streampath)
431 self.set_current_file_name(filename)
432 self._last_open = _WriterFile(self, filename)
433 return self._last_open
435 def flush_data(self):
436 data_buffer = ''.join(self._data_buffer)
438 self._current_stream_locators.append(
439 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
440 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
441 self._data_buffer_len = len(self._data_buffer[0])
443 def start_new_file(self, newfilename=None):
444 self.finish_current_file()
445 self.set_current_file_name(newfilename)
447 def set_current_file_name(self, newfilename):
448 if re.search(r'[\t\n]', newfilename):
449 raise errors.AssertionError(
450 "Manifest filenames cannot contain whitespace: %s" %
452 elif re.search(r'\x00', newfilename):
453 raise errors.AssertionError(
454 "Manifest filenames cannot contain NUL characters: %s" %
456 self._current_file_name = newfilename
458 def current_file_name(self):
459 return self._current_file_name
461 def finish_current_file(self):
462 if self._current_file_name is None:
463 if self._current_file_pos == self._current_stream_length:
465 raise errors.AssertionError(
466 "Cannot finish an unnamed file " +
467 "(%d bytes at offset %d in '%s' stream)" %
468 (self._current_stream_length - self._current_file_pos,
469 self._current_file_pos,
470 self._current_stream_name))
471 self._current_stream_files.append([
472 self._current_file_pos,
473 self._current_stream_length - self._current_file_pos,
474 self._current_file_name])
475 self._current_file_pos = self._current_stream_length
476 self._current_file_name = None
478 def start_new_stream(self, newstreamname='.'):
479 self.finish_current_stream()
480 self.set_current_stream_name(newstreamname)
482 def set_current_stream_name(self, newstreamname):
483 if re.search(r'[\t\n]', newstreamname):
484 raise errors.AssertionError(
485 "Manifest stream names cannot contain whitespace")
486 self._current_stream_name = '.' if newstreamname=='' else newstreamname
488 def current_stream_name(self):
489 return self._current_stream_name
491 def finish_current_stream(self):
492 self.finish_current_file()
494 if not self._current_stream_files:
496 elif self._current_stream_name is None:
497 raise errors.AssertionError(
498 "Cannot finish an unnamed stream (%d bytes in %d files)" %
499 (self._current_stream_length, len(self._current_stream_files)))
501 if not self._current_stream_locators:
502 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
503 self._finished_streams.append([self._current_stream_name,
504 self._current_stream_locators,
505 self._current_stream_files])
506 self._current_stream_files = []
507 self._current_stream_length = 0
508 self._current_stream_locators = []
509 self._current_stream_name = None
510 self._current_file_pos = 0
511 self._current_file_name = None
514 # Store the manifest in Keep and return its locator.
515 return self._my_keep().put(self.manifest_text())
517 def portable_data_hash(self):
518 stripped = self.stripped_manifest()
519 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
521 def manifest_text(self):
522 self.finish_current_stream()
525 for stream in self._finished_streams:
526 if not re.search(r'^\.(/.*)?$', stream[0]):
528 manifest += stream[0].replace(' ', '\\040')
529 manifest += ' ' + ' '.join(stream[1])
530 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
535 def data_locators(self):
537 for name, locators, files in self._finished_streams:
542 class ResumableCollectionWriter(CollectionWriter):
543 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
544 '_current_stream_locators', '_current_stream_name',
545 '_current_file_name', '_current_file_pos', '_close_file',
546 '_data_buffer', '_dependencies', '_finished_streams',
547 '_queued_dirents', '_queued_trees']
549 def __init__(self, api_client=None, num_retries=0):
550 self._dependencies = {}
551 super(ResumableCollectionWriter, self).__init__(
552 api_client, num_retries=num_retries)
555 def from_state(cls, state, *init_args, **init_kwargs):
556 # Try to build a new writer from scratch with the given state.
557 # If the state is not suitable to resume (because files have changed,
558 # been deleted, aren't predictable, etc.), raise a
559 # StaleWriterStateError. Otherwise, return the initialized writer.
560 # The caller is responsible for calling writer.do_queued_work()
561 # appropriately after it's returned.
562 writer = cls(*init_args, **init_kwargs)
563 for attr_name in cls.STATE_PROPS:
564 attr_value = state[attr_name]
565 attr_class = getattr(writer, attr_name).__class__
566 # Coerce the value into the same type as the initial value, if
568 if attr_class not in (type(None), attr_value.__class__):
569 attr_value = attr_class(attr_value)
570 setattr(writer, attr_name, attr_value)
571 # Check dependencies before we try to resume anything.
572 if any(KeepLocator(ls).permission_expired()
573 for ls in writer._current_stream_locators):
574 raise errors.StaleWriterStateError(
575 "locators include expired permission hint")
576 writer.check_dependencies()
577 if state['_current_file'] is not None:
578 path, pos = state['_current_file']
580 writer._queued_file = open(path, 'rb')
581 writer._queued_file.seek(pos)
582 except IOError as error:
583 raise errors.StaleWriterStateError(
584 "failed to reopen active file {}: {}".format(path, error))
587 def check_dependencies(self):
588 for path, orig_stat in self._dependencies.items():
589 if not S_ISREG(orig_stat[ST_MODE]):
590 raise errors.StaleWriterStateError("{} not file".format(path))
592 now_stat = tuple(os.stat(path))
593 except OSError as error:
594 raise errors.StaleWriterStateError(
595 "failed to stat {}: {}".format(path, error))
596 if ((not S_ISREG(now_stat[ST_MODE])) or
597 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
598 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
599 raise errors.StaleWriterStateError("{} changed".format(path))
601 def dump_state(self, copy_func=lambda x: x):
602 state = {attr: copy_func(getattr(self, attr))
603 for attr in self.STATE_PROPS}
604 if self._queued_file is None:
605 state['_current_file'] = None
607 state['_current_file'] = (os.path.realpath(self._queued_file.name),
608 self._queued_file.tell())
611 def _queue_file(self, source, filename=None):
613 src_path = os.path.realpath(source)
615 raise errors.AssertionError("{} not a file path".format(source))
617 path_stat = os.stat(src_path)
618 except OSError as stat_error:
620 super(ResumableCollectionWriter, self)._queue_file(source, filename)
621 fd_stat = os.fstat(self._queued_file.fileno())
622 if not S_ISREG(fd_stat.st_mode):
623 # We won't be able to resume from this cache anyway, so don't
624 # worry about further checks.
625 self._dependencies[source] = tuple(fd_stat)
626 elif path_stat is None:
627 raise errors.AssertionError(
628 "could not stat {}: {}".format(source, stat_error))
629 elif path_stat.st_ino != fd_stat.st_ino:
630 raise errors.AssertionError(
631 "{} changed between open and stat calls".format(source))
633 self._dependencies[src_path] = tuple(fd_stat)
635 def write(self, data):
636 if self._queued_file is None:
637 raise errors.AssertionError(
638 "resumable writer can't accept unsourced data")
639 return super(ResumableCollectionWriter, self).write(data)
642 class Collection(object):
646 def find_or_create(self, path):
652 item = self.items.get(p[0])
654 # item must be a file
657 item = ArvadosFile(p[0], 'wb', [], [])
658 self.items[p[0]] = item
662 # create new collection
664 self.items[p[0]] = item
666 return item.find_or_create("/".join(p))
671 def import_manifest(manifest_text):
681 for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
684 if state == STREAM_NAME:
685 # starting a new stream
686 stream_name = tok.replace('\\040', ' ')
694 s = re.match(r'[0-9a-f]{32}\+(\d+)(\+\S+)*', tok)
696 blocksize = long(s.group(1))
697 blocks.append(Range(tok, streamoffset, blocksize))
698 streamoffset += blocksize
702 if state == SEGMENTS:
703 s = re.search(r'^(\d+):(\d+):(\S+)', tok)
705 pos = long(s.group(1))
706 size = long(s.group(2))
707 name = s.group(3).replace('\\040', ' ')
708 f = c.find_or_create("%s/%s" % (stream_name, name))
709 f.add_segment(blocks, pos, size)
712 raise errors.SyntaxError("Invalid manifest format")
720 def export_manifest(item, stream_name="."):
722 if isinstance(item, Collection):
724 for k,v in item.items.items():
725 if isinstance(item, Collection):
726 buf += export_manifest(v, stream_name)
728 if isinstance(item, ArvadosFile):
729 buf += str(item.segments)
730 #stream[k] = [[s.locator, s[4], s[], s[]] for s in item.segments]
734 buf += str(item.segments)