6 from collections import deque
9 from .arvfile import ArvadosFileBase
11 from .stream import StreamReader, split
16 _logger = logging.getLogger('arvados.collection')
18 def normalize_stream(s, stream):
20 sortedfiles = list(stream.keys())
27 if b[arvados.LOCATOR] not in blocks:
28 stream_tokens.append(b[arvados.LOCATOR])
29 blocks[b[arvados.LOCATOR]] = streamoffset
30 streamoffset += b[arvados.BLOCKSIZE]
32 if len(stream_tokens) == 1:
33 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
37 fout = f.replace(' ', '\\040')
38 for segment in stream[f]:
39 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
40 if current_span is None:
41 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
43 if segmentoffset == current_span[1]:
44 current_span[1] += segment[arvados.SEGMENTSIZE]
46 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
49 if current_span is not None:
50 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53 stream_tokens.append("0:0:{0}".format(fout))
58 class CollectionBase(object):
62 def __exit__(self, exc_type, exc_value, traceback):
66 if self._keep_client is None:
67 self._keep_client = KeepClient(api_client=self._api_client,
68 num_retries=self.num_retries)
69 return self._keep_client
71 def stripped_manifest(self):
73 Return the manifest for the current collection with all
74 non-portable hints (i.e., permission signatures and other
75 hints other than size hints) removed from the locators.
77 raw = self.manifest_text()
79 for line in raw.split("\n"):
82 clean_fields = fields[:1] + [
83 (re.sub(r'\+[^\d][^\+]*', '', x)
84 if re.match(util.keep_locator_pattern, x)
87 clean += [' '.join(clean_fields), "\n"]
91 class CollectionReader(CollectionBase):
92 def __init__(self, manifest_locator_or_text, api_client=None,
93 keep_client=None, num_retries=0):
94 """Instantiate a CollectionReader.
96 This class parses Collection manifests to provide a simple interface
97 to read its underlying files.
100 * manifest_locator_or_text: One of a Collection UUID, portable data
101 hash, or full manifest text.
102 * api_client: The API client to use to look up Collections. If not
103 provided, CollectionReader will build one from available Arvados
105 * keep_client: The KeepClient to use to download Collection data.
106 If not provided, CollectionReader will build one from available
107 Arvados configuration.
108 * num_retries: The default number of times to retry failed
109 service requests. Default 0. You may change this value
110 after instantiation, but note those changes may not
111 propagate to related objects like the Keep client.
113 self._api_client = api_client
114 self._keep_client = keep_client
115 self.num_retries = num_retries
116 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
117 self._manifest_locator = manifest_locator_or_text
118 self._manifest_text = None
119 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
120 self._manifest_locator = manifest_locator_or_text
121 self._manifest_text = None
122 elif re.match(util.manifest_pattern, manifest_locator_or_text):
123 self._manifest_text = manifest_locator_or_text
124 self._manifest_locator = None
126 raise errors.ArgumentError(
127 "Argument to CollectionReader must be a manifest or a collection UUID")
128 self._api_response = None
131 def _populate_from_api_server(self):
132 # As in KeepClient itself, we must wait until the last
133 # possible moment to instantiate an API client, in order to
134 # avoid tripping up clients that don't have access to an API
135 # server. If we do build one, make sure our Keep client uses
136 # it. If instantiation fails, we'll fall back to the except
137 # clause, just like any other Collection lookup
138 # failure. Return an exception, or None if successful.
140 if self._api_client is None:
141 self._api_client = arvados.api('v1')
142 self._keep_client = None # Make a new one with the new api.
143 self._api_response = self._api_client.collections().get(
144 uuid=self._manifest_locator).execute(
145 num_retries=self.num_retries)
146 self._manifest_text = self._api_response['manifest_text']
148 except Exception as e:
151 def _populate_from_keep(self):
152 # Retrieve a manifest directly from Keep. This has a chance of
153 # working if [a] the locator includes a permission signature
154 # or [b] the Keep services are operating in world-readable
155 # mode. Return an exception, or None if successful.
157 self._manifest_text = self._my_keep().get(
158 self._manifest_locator, num_retries=self.num_retries)
159 except Exception as e:
164 error_via_keep = None
165 should_try_keep = ((self._manifest_text is None) and
166 util.keep_locator_pattern.match(
167 self._manifest_locator))
168 if ((self._manifest_text is None) and
169 util.signed_locator_pattern.match(self._manifest_locator)):
170 error_via_keep = self._populate_from_keep()
171 if self._manifest_text is None:
172 error_via_api = self._populate_from_api_server()
173 if error_via_api is not None and not should_try_keep:
175 if ((self._manifest_text is None) and
176 not error_via_keep and
178 # Looks like a keep locator, and we didn't already try keep above
179 error_via_keep = self._populate_from_keep()
180 if self._manifest_text is None:
182 raise arvados.errors.NotFoundError(
183 ("Failed to retrieve collection '{}' " +
184 "from either API server ({}) or Keep ({})."
186 self._manifest_locator,
189 self._streams = [sline.split()
190 for sline in self._manifest_text.split("\n")
193 def _populate_first(orig_func):
194 # Decorator for methods that read actual Collection data.
195 @functools.wraps(orig_func)
196 def wrapper(self, *args, **kwargs):
197 if self._streams is None:
199 return orig_func(self, *args, **kwargs)
203 def api_response(self):
204 """api_response() -> dict or None
206 Returns information about this Collection fetched from the API server.
207 If the Collection exists in Keep but not the API server, currently
208 returns None. Future versions may provide a synthetic response.
210 return self._api_response
216 for s in self.all_streams():
217 for f in s.all_files():
218 streamname, filename = split(s.name() + "/" + f.name())
219 if streamname not in streams:
220 streams[streamname] = {}
221 if filename not in streams[streamname]:
222 streams[streamname][filename] = []
224 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
226 self._streams = [normalize_stream(s, streams[s])
227 for s in sorted(streams)]
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join(
231 [StreamReader(stream, keep=self._my_keep()).manifest_text()
232 for stream in self._streams])
235 def open(self, streampath, filename=None):
236 """open(streampath[, filename]) -> file-like object
238 Pass in the path of a file to read from the Collection, either as a
239 single string or as two separate stream name and file name arguments.
240 This method returns a file-like object to read that file.
243 streampath, filename = split(streampath)
244 keep_client = self._my_keep()
245 for stream_s in self._streams:
246 stream = StreamReader(stream_s, keep_client,
247 num_retries=self.num_retries)
248 if stream.name() == streampath:
251 raise ValueError("stream '{}' not found in Collection".
254 return stream.files()[filename]
256 raise ValueError("file '{}' not found in Collection stream '{}'".
257 format(filename, streampath))
260 def all_streams(self):
261 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
262 for s in self._streams]
265 for s in self.all_streams():
266 for f in s.all_files():
270 def manifest_text(self, strip=False, normalize=False):
272 cr = CollectionReader(self.manifest_text())
274 return cr.manifest_text(strip=strip, normalize=False)
276 return self.stripped_manifest()
278 return self._manifest_text
281 class _WriterFile(ArvadosFileBase):
282 def __init__(self, coll_writer, name):
283 super(_WriterFile, self).__init__(name, 'wb')
284 self.dest = coll_writer
287 super(_WriterFile, self).close()
288 self.dest.finish_current_file()
290 @ArvadosFileBase._before_close
291 def write(self, data):
292 self.dest.write(data)
294 @ArvadosFileBase._before_close
295 def writelines(self, seq):
299 @ArvadosFileBase._before_close
301 self.dest.flush_data()
304 class CollectionWriter(CollectionBase):
305 KEEP_BLOCK_SIZE = 2**26
307 def __init__(self, api_client=None, num_retries=0, replication=None):
308 """Instantiate a CollectionWriter.
310 CollectionWriter lets you build a new Arvados Collection from scratch.
311 Write files to it. The CollectionWriter will upload data to Keep as
312 appropriate, and provide you with the Collection manifest text when
316 * api_client: The API client to use to look up Collections. If not
317 provided, CollectionReader will build one from available Arvados
319 * num_retries: The default number of times to retry failed
320 service requests. Default 0. You may change this value
321 after instantiation, but note those changes may not
322 propagate to related objects like the Keep client.
323 * replication: The number of copies of each block to store.
324 If this argument is None or not supplied, replication is
325 the server-provided default if available, otherwise 2.
327 self._api_client = api_client
328 self.num_retries = num_retries
329 self.replication = (2 if replication is None else replication)
330 self._keep_client = None
331 self._data_buffer = []
332 self._data_buffer_len = 0
333 self._current_stream_files = []
334 self._current_stream_length = 0
335 self._current_stream_locators = []
336 self._current_stream_name = '.'
337 self._current_file_name = None
338 self._current_file_pos = 0
339 self._finished_streams = []
340 self._close_file = None
341 self._queued_file = None
342 self._queued_dirents = deque()
343 self._queued_trees = deque()
344 self._last_open = None
346 def __exit__(self, exc_type, exc_value, traceback):
350 def do_queued_work(self):
351 # The work queue consists of three pieces:
352 # * _queued_file: The file object we're currently writing to the
354 # * _queued_dirents: Entries under the current directory
355 # (_queued_trees[0]) that we want to write or recurse through.
356 # This may contain files from subdirectories if
357 # max_manifest_depth == 0 for this directory.
358 # * _queued_trees: Directories that should be written as separate
359 # streams to the Collection.
360 # This function handles the smallest piece of work currently queued
361 # (current file, then current directory, then next directory) until
362 # no work remains. The _work_THING methods each do a unit of work on
363 # THING. _queue_THING methods add a THING to the work queue.
365 if self._queued_file:
367 elif self._queued_dirents:
369 elif self._queued_trees:
374 def _work_file(self):
376 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
380 self.finish_current_file()
382 self._queued_file.close()
383 self._close_file = None
384 self._queued_file = None
386 def _work_dirents(self):
387 path, stream_name, max_manifest_depth = self._queued_trees[0]
388 if stream_name != self.current_stream_name():
389 self.start_new_stream(stream_name)
390 while self._queued_dirents:
391 dirent = self._queued_dirents.popleft()
392 target = os.path.join(path, dirent)
393 if os.path.isdir(target):
394 self._queue_tree(target,
395 os.path.join(stream_name, dirent),
396 max_manifest_depth - 1)
398 self._queue_file(target, dirent)
400 if not self._queued_dirents:
401 self._queued_trees.popleft()
403 def _work_trees(self):
404 path, stream_name, max_manifest_depth = self._queued_trees[0]
405 d = util.listdir_recursive(
406 path, max_depth = (None if max_manifest_depth == 0 else 0))
408 self._queue_dirents(stream_name, d)
410 self._queued_trees.popleft()
412 def _queue_file(self, source, filename=None):
413 assert (self._queued_file is None), "tried to queue more than one file"
414 if not hasattr(source, 'read'):
415 source = open(source, 'rb')
416 self._close_file = True
418 self._close_file = False
420 filename = os.path.basename(source.name)
421 self.start_new_file(filename)
422 self._queued_file = source
424 def _queue_dirents(self, stream_name, dirents):
425 assert (not self._queued_dirents), "tried to queue more than one tree"
426 self._queued_dirents = deque(sorted(dirents))
428 def _queue_tree(self, path, stream_name, max_manifest_depth):
429 self._queued_trees.append((path, stream_name, max_manifest_depth))
431 def write_file(self, source, filename=None):
432 self._queue_file(source, filename)
433 self.do_queued_work()
435 def write_directory_tree(self,
436 path, stream_name='.', max_manifest_depth=-1):
437 self._queue_tree(path, stream_name, max_manifest_depth)
438 self.do_queued_work()
440 def write(self, newdata):
441 if hasattr(newdata, '__iter__'):
445 self._data_buffer.append(newdata)
446 self._data_buffer_len += len(newdata)
447 self._current_stream_length += len(newdata)
448 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
451 def open(self, streampath, filename=None):
452 """open(streampath[, filename]) -> file-like object
454 Pass in the path of a file to write to the Collection, either as a
455 single string or as two separate stream name and file name arguments.
456 This method returns a file-like object you can write to add it to the
459 You may only have one file object from the Collection open at a time,
460 so be sure to close the object when you're done. Using the object in
461 a with statement makes that easy::
463 with cwriter.open('./doc/page1.txt') as outfile:
464 outfile.write(page1_data)
465 with cwriter.open('./doc/page2.txt') as outfile:
466 outfile.write(page2_data)
469 streampath, filename = split(streampath)
470 if self._last_open and not self._last_open.closed:
471 raise errors.AssertionError(
472 "can't open '{}' when '{}' is still open".format(
473 filename, self._last_open.name))
474 if streampath != self.current_stream_name():
475 self.start_new_stream(streampath)
476 self.set_current_file_name(filename)
477 self._last_open = _WriterFile(self, filename)
478 return self._last_open
480 def flush_data(self):
481 data_buffer = ''.join(self._data_buffer)
483 self._current_stream_locators.append(
485 data_buffer[0:self.KEEP_BLOCK_SIZE],
486 copies=self.replication))
487 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
488 self._data_buffer_len = len(self._data_buffer[0])
490 def start_new_file(self, newfilename=None):
491 self.finish_current_file()
492 self.set_current_file_name(newfilename)
494 def set_current_file_name(self, newfilename):
495 if re.search(r'[\t\n]', newfilename):
496 raise errors.AssertionError(
497 "Manifest filenames cannot contain whitespace: %s" %
499 elif re.search(r'\x00', newfilename):
500 raise errors.AssertionError(
501 "Manifest filenames cannot contain NUL characters: %s" %
503 self._current_file_name = newfilename
505 def current_file_name(self):
506 return self._current_file_name
508 def finish_current_file(self):
509 if self._current_file_name is None:
510 if self._current_file_pos == self._current_stream_length:
512 raise errors.AssertionError(
513 "Cannot finish an unnamed file " +
514 "(%d bytes at offset %d in '%s' stream)" %
515 (self._current_stream_length - self._current_file_pos,
516 self._current_file_pos,
517 self._current_stream_name))
518 self._current_stream_files.append([
519 self._current_file_pos,
520 self._current_stream_length - self._current_file_pos,
521 self._current_file_name])
522 self._current_file_pos = self._current_stream_length
523 self._current_file_name = None
525 def start_new_stream(self, newstreamname='.'):
526 self.finish_current_stream()
527 self.set_current_stream_name(newstreamname)
529 def set_current_stream_name(self, newstreamname):
530 if re.search(r'[\t\n]', newstreamname):
531 raise errors.AssertionError(
532 "Manifest stream names cannot contain whitespace")
533 self._current_stream_name = '.' if newstreamname=='' else newstreamname
535 def current_stream_name(self):
536 return self._current_stream_name
538 def finish_current_stream(self):
539 self.finish_current_file()
541 if not self._current_stream_files:
543 elif self._current_stream_name is None:
544 raise errors.AssertionError(
545 "Cannot finish an unnamed stream (%d bytes in %d files)" %
546 (self._current_stream_length, len(self._current_stream_files)))
548 if not self._current_stream_locators:
549 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
550 self._finished_streams.append([self._current_stream_name,
551 self._current_stream_locators,
552 self._current_stream_files])
553 self._current_stream_files = []
554 self._current_stream_length = 0
555 self._current_stream_locators = []
556 self._current_stream_name = None
557 self._current_file_pos = 0
558 self._current_file_name = None
561 """Store the manifest in Keep and return its locator.
563 This is useful for storing manifest fragments (task outputs)
564 temporarily in Keep during a Crunch job.
566 In other cases you should make a collection instead, by
567 sending manifest_text() to the API server's "create
568 collection" endpoint.
570 return self._my_keep().put(self.manifest_text(), copies=self.replication)
572 def portable_data_hash(self):
573 stripped = self.stripped_manifest()
574 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
576 def manifest_text(self):
577 self.finish_current_stream()
580 for stream in self._finished_streams:
581 if not re.search(r'^\.(/.*)?$', stream[0]):
583 manifest += stream[0].replace(' ', '\\040')
584 manifest += ' ' + ' '.join(stream[1])
585 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
590 def data_locators(self):
592 for name, locators, files in self._finished_streams:
597 class ResumableCollectionWriter(CollectionWriter):
598 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
599 '_current_stream_locators', '_current_stream_name',
600 '_current_file_name', '_current_file_pos', '_close_file',
601 '_data_buffer', '_dependencies', '_finished_streams',
602 '_queued_dirents', '_queued_trees']
604 def __init__(self, api_client=None, **kwargs):
605 self._dependencies = {}
606 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
609 def from_state(cls, state, *init_args, **init_kwargs):
610 # Try to build a new writer from scratch with the given state.
611 # If the state is not suitable to resume (because files have changed,
612 # been deleted, aren't predictable, etc.), raise a
613 # StaleWriterStateError. Otherwise, return the initialized writer.
614 # The caller is responsible for calling writer.do_queued_work()
615 # appropriately after it's returned.
616 writer = cls(*init_args, **init_kwargs)
617 for attr_name in cls.STATE_PROPS:
618 attr_value = state[attr_name]
619 attr_class = getattr(writer, attr_name).__class__
620 # Coerce the value into the same type as the initial value, if
622 if attr_class not in (type(None), attr_value.__class__):
623 attr_value = attr_class(attr_value)
624 setattr(writer, attr_name, attr_value)
625 # Check dependencies before we try to resume anything.
626 if any(KeepLocator(ls).permission_expired()
627 for ls in writer._current_stream_locators):
628 raise errors.StaleWriterStateError(
629 "locators include expired permission hint")
630 writer.check_dependencies()
631 if state['_current_file'] is not None:
632 path, pos = state['_current_file']
634 writer._queued_file = open(path, 'rb')
635 writer._queued_file.seek(pos)
636 except IOError as error:
637 raise errors.StaleWriterStateError(
638 "failed to reopen active file {}: {}".format(path, error))
641 def check_dependencies(self):
642 for path, orig_stat in self._dependencies.items():
643 if not S_ISREG(orig_stat[ST_MODE]):
644 raise errors.StaleWriterStateError("{} not file".format(path))
646 now_stat = tuple(os.stat(path))
647 except OSError as error:
648 raise errors.StaleWriterStateError(
649 "failed to stat {}: {}".format(path, error))
650 if ((not S_ISREG(now_stat[ST_MODE])) or
651 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
652 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
653 raise errors.StaleWriterStateError("{} changed".format(path))
655 def dump_state(self, copy_func=lambda x: x):
656 state = {attr: copy_func(getattr(self, attr))
657 for attr in self.STATE_PROPS}
658 if self._queued_file is None:
659 state['_current_file'] = None
661 state['_current_file'] = (os.path.realpath(self._queued_file.name),
662 self._queued_file.tell())
665 def _queue_file(self, source, filename=None):
667 src_path = os.path.realpath(source)
669 raise errors.AssertionError("{} not a file path".format(source))
671 path_stat = os.stat(src_path)
672 except OSError as stat_error:
674 super(ResumableCollectionWriter, self)._queue_file(source, filename)
675 fd_stat = os.fstat(self._queued_file.fileno())
676 if not S_ISREG(fd_stat.st_mode):
677 # We won't be able to resume from this cache anyway, so don't
678 # worry about further checks.
679 self._dependencies[source] = tuple(fd_stat)
680 elif path_stat is None:
681 raise errors.AssertionError(
682 "could not stat {}: {}".format(source, stat_error))
683 elif path_stat.st_ino != fd_stat.st_ino:
684 raise errors.AssertionError(
685 "{} changed between open and stat calls".format(source))
687 self._dependencies[src_path] = tuple(fd_stat)
689 def write(self, data):
690 if self._queued_file is None:
691 raise errors.AssertionError(
692 "resumable writer can't accept unsourced data")
693 return super(ResumableCollectionWriter, self).write(data)