6 from collections import deque
9 from .arvfile import ArvadosFileBase
11 from .stream import StreamReader, split
16 _logger = logging.getLogger('arvados.collection')
18 def normalize_stream(s, stream):
20 sortedfiles = list(stream.keys())
27 if b[arvados.LOCATOR] not in blocks:
28 stream_tokens.append(b[arvados.LOCATOR])
29 blocks[b[arvados.LOCATOR]] = streamoffset
30 streamoffset += b[arvados.BLOCKSIZE]
32 if len(stream_tokens) == 1:
33 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
37 fout = f.replace(' ', '\\040')
38 for segment in stream[f]:
39 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
40 if current_span is None:
41 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
43 if segmentoffset == current_span[1]:
44 current_span[1] += segment[arvados.SEGMENTSIZE]
46 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
49 if current_span is not None:
50 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53 stream_tokens.append("0:0:{0}".format(fout))
58 class CollectionBase(object):
62 def __exit__(self, exc_type, exc_value, traceback):
66 if self._keep_client is None:
67 self._keep_client = KeepClient(api_client=self._api_client,
68 num_retries=self.num_retries)
69 return self._keep_client
71 def stripped_manifest(self):
73 Return the manifest for the current collection with all
74 non-portable hints (i.e., permission signatures and other
75 hints other than size hints) removed from the locators.
77 raw = self.manifest_text()
79 for line in raw.split("\n"):
82 clean_fields = fields[:1] + [
83 (re.sub(r'\+[^\d][^\+]*', '', x)
84 if re.match(util.keep_locator_pattern, x)
87 clean += [' '.join(clean_fields), "\n"]
91 class CollectionReader(CollectionBase):
92 def __init__(self, manifest_locator_or_text, api_client=None,
93 keep_client=None, num_retries=0):
94 """Instantiate a CollectionReader.
96 This class parses Collection manifests to provide a simple interface
97 to read its underlying files.
100 * manifest_locator_or_text: One of a Collection UUID, portable data
101 hash, or full manifest text.
102 * api_client: The API client to use to look up Collections. If not
103 provided, CollectionReader will build one from available Arvados
105 * keep_client: The KeepClient to use to download Collection data.
106 If not provided, CollectionReader will build one from available
107 Arvados configuration.
108 * num_retries: The default number of times to retry failed
109 service requests. Default 0. You may change this value
110 after instantiation, but note those changes may not
111 propagate to related objects like the Keep client.
113 self._api_client = api_client
114 self._keep_client = keep_client
115 self.num_retries = num_retries
116 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
117 self._manifest_locator = manifest_locator_or_text
118 self._manifest_text = None
119 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
120 self._manifest_locator = manifest_locator_or_text
121 self._manifest_text = None
122 elif re.match(util.manifest_pattern, manifest_locator_or_text):
123 self._manifest_text = manifest_locator_or_text
124 self._manifest_locator = None
126 raise errors.ArgumentError(
127 "Argument to CollectionReader must be a manifest or a collection UUID")
128 self._api_response = None
131 def _populate_from_api_server(self):
132 # As in KeepClient itself, we must wait until the last
133 # possible moment to instantiate an API client, in order to
134 # avoid tripping up clients that don't have access to an API
135 # server. If we do build one, make sure our Keep client uses
136 # it. If instantiation fails, we'll fall back to the except
137 # clause, just like any other Collection lookup
138 # failure. Return an exception, or None if successful.
140 if self._api_client is None:
141 self._api_client = arvados.api('v1')
142 self._keep_client = None # Make a new one with the new api.
143 self._api_response = self._api_client.collections().get(
144 uuid=self._manifest_locator).execute(
145 num_retries=self.num_retries)
146 self._manifest_text = self._api_response['manifest_text']
148 except Exception as e:
151 def _populate_from_keep(self):
152 # Retrieve a manifest directly from Keep. This has a chance of
153 # working if [a] the locator includes a permission signature
154 # or [b] the Keep services are operating in world-readable
155 # mode. Return an exception, or None if successful.
157 self._manifest_text = self._my_keep().get(
158 self._manifest_locator, num_retries=self.num_retries)
159 except Exception as e:
164 error_via_keep = None
165 should_try_keep = ((self._manifest_text is None) and
166 util.keep_locator_pattern.match(
167 self._manifest_locator))
168 if ((self._manifest_text is None) and
169 util.signed_locator_pattern.match(self._manifest_locator)):
170 error_via_keep = self._populate_from_keep()
171 if self._manifest_text is None:
172 error_via_api = self._populate_from_api_server()
173 if error_via_api is not None and not should_try_keep:
175 if ((self._manifest_text is None) and
176 not error_via_keep and
178 # Looks like a keep locator, and we didn't already try keep above
179 error_via_keep = self._populate_from_keep()
180 if self._manifest_text is None:
182 raise arvados.errors.NotFoundError(
183 ("Failed to retrieve collection '{}' " +
184 "from either API server ({}) or Keep ({})."
186 self._manifest_locator,
189 self._streams = [sline.split()
190 for sline in self._manifest_text.split("\n")
193 def _populate_first(orig_func):
194 # Decorator for methods that read actual Collection data.
195 @functools.wraps(orig_func)
196 def wrapper(self, *args, **kwargs):
197 if self._streams is None:
199 return orig_func(self, *args, **kwargs)
203 def api_response(self):
204 """api_response() -> dict or None
206 Returns information about this Collection fetched from the API server.
207 If the Collection exists in Keep but not the API server, currently
208 returns None. Future versions may provide a synthetic response.
210 return self._api_response
216 for s in self.all_streams():
217 for f in s.all_files():
218 streamname, filename = split(s.name() + "/" + f.name())
219 if streamname not in streams:
220 streams[streamname] = {}
221 if filename not in streams[streamname]:
222 streams[streamname][filename] = []
224 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
226 self._streams = [normalize_stream(s, streams[s])
227 for s in sorted(streams)]
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join(
231 [StreamReader(stream, keep=self._my_keep()).manifest_text()
232 for stream in self._streams])
235 def open(self, streampath, filename=None):
236 """open(streampath[, filename]) -> file-like object
238 Pass in the path of a file to read from the Collection, either as a
239 single string or as two separate stream name and file name arguments.
240 This method returns a file-like object to read that file.
243 streampath, filename = split(streampath)
244 keep_client = self._my_keep()
245 for stream_s in self._streams:
246 stream = StreamReader(stream_s, keep_client,
247 num_retries=self.num_retries)
248 if stream.name() == streampath:
251 raise ValueError("stream '{}' not found in Collection".
254 return stream.files()[filename]
256 raise ValueError("file '{}' not found in Collection stream '{}'".
257 format(filename, streampath))
260 def all_streams(self):
261 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
262 for s in self._streams]
265 for s in self.all_streams():
266 for f in s.all_files():
270 def manifest_text(self, strip=False, normalize=False):
272 cr = CollectionReader(self.manifest_text())
274 return cr.manifest_text(strip=strip, normalize=False)
276 return self.stripped_manifest()
278 return self._manifest_text
281 class _WriterFile(ArvadosFileBase):
282 def __init__(self, coll_writer, name):
283 super(_WriterFile, self).__init__(name, 'wb')
284 self.dest = coll_writer
287 super(_WriterFile, self).close()
288 self.dest.finish_current_file()
290 @ArvadosFileBase._before_close
291 def write(self, data):
292 self.dest.write(data)
294 @ArvadosFileBase._before_close
295 def writelines(self, seq):
299 @ArvadosFileBase._before_close
301 self.dest.flush_data()
304 class CollectionWriter(CollectionBase):
305 KEEP_BLOCK_SIZE = 2**26
307 def __init__(self, api_client=None, num_retries=0, replication=0):
308 """Instantiate a CollectionWriter.
310 CollectionWriter lets you build a new Arvados Collection from scratch.
311 Write files to it. The CollectionWriter will upload data to Keep as
312 appropriate, and provide you with the Collection manifest text when
316 * api_client: The API client to use to look up Collections. If not
317 provided, CollectionReader will build one from available Arvados
319 * num_retries: The default number of times to retry failed
320 service requests. Default 0. You may change this value
321 after instantiation, but note those changes may not
322 propagate to related objects like the Keep client.
323 * replication: The number of copies of each block to store.
324 If this argument is 0 or not supplied, replication is
325 the server-provided default if available, otherwise 2.
327 self._api_client = api_client
328 self.num_retries = num_retries
329 self.replication = (replication if replication>0 else 2)
330 self._keep_client = None
331 self._data_buffer = []
332 self._data_buffer_len = 0
333 self._current_stream_files = []
334 self._current_stream_length = 0
335 self._current_stream_locators = []
336 self._current_stream_name = '.'
337 self._current_file_name = None
338 self._current_file_pos = 0
339 self._finished_streams = []
340 self._close_file = None
341 self._queued_file = None
342 self._queued_dirents = deque()
343 self._queued_trees = deque()
344 self._last_open = None
346 def __exit__(self, exc_type, exc_value, traceback):
350 def do_queued_work(self):
351 # The work queue consists of three pieces:
352 # * _queued_file: The file object we're currently writing to the
354 # * _queued_dirents: Entries under the current directory
355 # (_queued_trees[0]) that we want to write or recurse through.
356 # This may contain files from subdirectories if
357 # max_manifest_depth == 0 for this directory.
358 # * _queued_trees: Directories that should be written as separate
359 # streams to the Collection.
360 # This function handles the smallest piece of work currently queued
361 # (current file, then current directory, then next directory) until
362 # no work remains. The _work_THING methods each do a unit of work on
363 # THING. _queue_THING methods add a THING to the work queue.
365 if self._queued_file:
367 elif self._queued_dirents:
369 elif self._queued_trees:
374 def _work_file(self):
376 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
380 self.finish_current_file()
382 self._queued_file.close()
383 self._close_file = None
384 self._queued_file = None
386 def _work_dirents(self):
387 path, stream_name, max_manifest_depth = self._queued_trees[0]
388 if stream_name != self.current_stream_name():
389 self.start_new_stream(stream_name)
390 while self._queued_dirents:
391 dirent = self._queued_dirents.popleft()
392 target = os.path.join(path, dirent)
393 if os.path.isdir(target):
394 self._queue_tree(target,
395 os.path.join(stream_name, dirent),
396 max_manifest_depth - 1)
398 self._queue_file(target, dirent)
400 if not self._queued_dirents:
401 self._queued_trees.popleft()
403 def _work_trees(self):
404 path, stream_name, max_manifest_depth = self._queued_trees[0]
405 d = util.listdir_recursive(
406 path, max_depth = (None if max_manifest_depth == 0 else 0))
408 self._queue_dirents(stream_name, d)
410 self._queued_trees.popleft()
412 def _queue_file(self, source, filename=None):
413 assert (self._queued_file is None), "tried to queue more than one file"
414 if not hasattr(source, 'read'):
415 source = open(source, 'rb')
416 self._close_file = True
418 self._close_file = False
420 filename = os.path.basename(source.name)
421 self.start_new_file(filename)
422 self._queued_file = source
424 def _queue_dirents(self, stream_name, dirents):
425 assert (not self._queued_dirents), "tried to queue more than one tree"
426 self._queued_dirents = deque(sorted(dirents))
428 def _queue_tree(self, path, stream_name, max_manifest_depth):
429 self._queued_trees.append((path, stream_name, max_manifest_depth))
431 def write_file(self, source, filename=None):
432 self._queue_file(source, filename)
433 self.do_queued_work()
435 def write_directory_tree(self,
436 path, stream_name='.', max_manifest_depth=-1):
437 self._queue_tree(path, stream_name, max_manifest_depth)
438 self.do_queued_work()
440 def write(self, newdata):
441 if hasattr(newdata, '__iter__'):
445 self._data_buffer.append(newdata)
446 self._data_buffer_len += len(newdata)
447 self._current_stream_length += len(newdata)
448 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
451 def open(self, streampath, filename=None):
452 """open(streampath[, filename]) -> file-like object
454 Pass in the path of a file to write to the Collection, either as a
455 single string or as two separate stream name and file name arguments.
456 This method returns a file-like object you can write to add it to the
459 You may only have one file object from the Collection open at a time,
460 so be sure to close the object when you're done. Using the object in
461 a with statement makes that easy::
463 with cwriter.open('./doc/page1.txt') as outfile:
464 outfile.write(page1_data)
465 with cwriter.open('./doc/page2.txt') as outfile:
466 outfile.write(page2_data)
469 streampath, filename = split(streampath)
470 if self._last_open and not self._last_open.closed:
471 raise errors.AssertionError(
472 "can't open '{}' when '{}' is still open".format(
473 filename, self._last_open.name))
474 if streampath != self.current_stream_name():
475 self.start_new_stream(streampath)
476 self.set_current_file_name(filename)
477 self._last_open = _WriterFile(self, filename)
478 return self._last_open
480 def flush_data(self):
481 data_buffer = ''.join(self._data_buffer)
483 self._current_stream_locators.append(
485 data_buffer[0:self.KEEP_BLOCK_SIZE],
486 copies=self.replication))
487 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
488 self._data_buffer_len = len(self._data_buffer[0])
490 def start_new_file(self, newfilename=None):
491 self.finish_current_file()
492 self.set_current_file_name(newfilename)
494 def set_current_file_name(self, newfilename):
495 if re.search(r'[\t\n]', newfilename):
496 raise errors.AssertionError(
497 "Manifest filenames cannot contain whitespace: %s" %
499 elif re.search(r'\x00', newfilename):
500 raise errors.AssertionError(
501 "Manifest filenames cannot contain NUL characters: %s" %
503 self._current_file_name = newfilename
505 def current_file_name(self):
506 return self._current_file_name
508 def finish_current_file(self):
509 if self._current_file_name is None:
510 if self._current_file_pos == self._current_stream_length:
512 raise errors.AssertionError(
513 "Cannot finish an unnamed file " +
514 "(%d bytes at offset %d in '%s' stream)" %
515 (self._current_stream_length - self._current_file_pos,
516 self._current_file_pos,
517 self._current_stream_name))
518 self._current_stream_files.append([
519 self._current_file_pos,
520 self._current_stream_length - self._current_file_pos,
521 self._current_file_name])
522 self._current_file_pos = self._current_stream_length
523 self._current_file_name = None
525 def start_new_stream(self, newstreamname='.'):
526 self.finish_current_stream()
527 self.set_current_stream_name(newstreamname)
529 def set_current_stream_name(self, newstreamname):
530 if re.search(r'[\t\n]', newstreamname):
531 raise errors.AssertionError(
532 "Manifest stream names cannot contain whitespace")
533 self._current_stream_name = '.' if newstreamname=='' else newstreamname
535 def current_stream_name(self):
536 return self._current_stream_name
538 def finish_current_stream(self):
539 self.finish_current_file()
541 if not self._current_stream_files:
543 elif self._current_stream_name is None:
544 raise errors.AssertionError(
545 "Cannot finish an unnamed stream (%d bytes in %d files)" %
546 (self._current_stream_length, len(self._current_stream_files)))
548 if not self._current_stream_locators:
549 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
550 self._finished_streams.append([self._current_stream_name,
551 self._current_stream_locators,
552 self._current_stream_files])
553 self._current_stream_files = []
554 self._current_stream_length = 0
555 self._current_stream_locators = []
556 self._current_stream_name = None
557 self._current_file_pos = 0
558 self._current_file_name = None
561 # Store the manifest in Keep and return its locator. Beware,
562 # this is only useful in special cases like storing manifest
563 # fragments temporarily in Keep during a Crunch job. In most
564 # cases you should make a collection instead, by sending
565 # manifest_text() to the API server's "create collection"
567 return self._my_keep().put(self.manifest_text(), copies=self.replication)
569 def portable_data_hash(self):
570 stripped = self.stripped_manifest()
571 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
573 def manifest_text(self):
574 self.finish_current_stream()
577 for stream in self._finished_streams:
578 if not re.search(r'^\.(/.*)?$', stream[0]):
580 manifest += stream[0].replace(' ', '\\040')
581 manifest += ' ' + ' '.join(stream[1])
582 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
587 def data_locators(self):
589 for name, locators, files in self._finished_streams:
594 class ResumableCollectionWriter(CollectionWriter):
595 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
596 '_current_stream_locators', '_current_stream_name',
597 '_current_file_name', '_current_file_pos', '_close_file',
598 '_data_buffer', '_dependencies', '_finished_streams',
599 '_queued_dirents', '_queued_trees']
601 def __init__(self, api_client=None, **kwargs):
602 self._dependencies = {}
603 super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
606 def from_state(cls, state, *init_args, **init_kwargs):
607 # Try to build a new writer from scratch with the given state.
608 # If the state is not suitable to resume (because files have changed,
609 # been deleted, aren't predictable, etc.), raise a
610 # StaleWriterStateError. Otherwise, return the initialized writer.
611 # The caller is responsible for calling writer.do_queued_work()
612 # appropriately after it's returned.
613 writer = cls(*init_args, **init_kwargs)
614 for attr_name in cls.STATE_PROPS:
615 attr_value = state[attr_name]
616 attr_class = getattr(writer, attr_name).__class__
617 # Coerce the value into the same type as the initial value, if
619 if attr_class not in (type(None), attr_value.__class__):
620 attr_value = attr_class(attr_value)
621 setattr(writer, attr_name, attr_value)
622 # Check dependencies before we try to resume anything.
623 if any(KeepLocator(ls).permission_expired()
624 for ls in writer._current_stream_locators):
625 raise errors.StaleWriterStateError(
626 "locators include expired permission hint")
627 writer.check_dependencies()
628 if state['_current_file'] is not None:
629 path, pos = state['_current_file']
631 writer._queued_file = open(path, 'rb')
632 writer._queued_file.seek(pos)
633 except IOError as error:
634 raise errors.StaleWriterStateError(
635 "failed to reopen active file {}: {}".format(path, error))
638 def check_dependencies(self):
639 for path, orig_stat in self._dependencies.items():
640 if not S_ISREG(orig_stat[ST_MODE]):
641 raise errors.StaleWriterStateError("{} not file".format(path))
643 now_stat = tuple(os.stat(path))
644 except OSError as error:
645 raise errors.StaleWriterStateError(
646 "failed to stat {}: {}".format(path, error))
647 if ((not S_ISREG(now_stat[ST_MODE])) or
648 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
649 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
650 raise errors.StaleWriterStateError("{} changed".format(path))
652 def dump_state(self, copy_func=lambda x: x):
653 state = {attr: copy_func(getattr(self, attr))
654 for attr in self.STATE_PROPS}
655 if self._queued_file is None:
656 state['_current_file'] = None
658 state['_current_file'] = (os.path.realpath(self._queued_file.name),
659 self._queued_file.tell())
662 def _queue_file(self, source, filename=None):
664 src_path = os.path.realpath(source)
666 raise errors.AssertionError("{} not a file path".format(source))
668 path_stat = os.stat(src_path)
669 except OSError as stat_error:
671 super(ResumableCollectionWriter, self)._queue_file(source, filename)
672 fd_stat = os.fstat(self._queued_file.fileno())
673 if not S_ISREG(fd_stat.st_mode):
674 # We won't be able to resume from this cache anyway, so don't
675 # worry about further checks.
676 self._dependencies[source] = tuple(fd_stat)
677 elif path_stat is None:
678 raise errors.AssertionError(
679 "could not stat {}: {}".format(source, stat_error))
680 elif path_stat.st_ino != fd_stat.st_ino:
681 raise errors.AssertionError(
682 "{} changed between open and stat calls".format(source))
684 self._dependencies[src_path] = tuple(fd_stat)
686 def write(self, data):
687 if self._queued_file is None:
688 raise errors.AssertionError(
689 "resumable writer can't accept unsourced data")
690 return super(ResumableCollectionWriter, self).write(data)