6 from collections import deque
9 from .arvfile import ArvadosFileBase
11 from .stream import StreamReader, split
16 _logger = logging.getLogger('arvados.collection')
18 def normalize_stream(s, stream):
20 sortedfiles = list(stream.keys())
27 if b[arvados.LOCATOR] not in blocks:
28 stream_tokens.append(b[arvados.LOCATOR])
29 blocks[b[arvados.LOCATOR]] = streamoffset
30 streamoffset += b[arvados.BLOCKSIZE]
32 if len(stream_tokens) == 1:
33 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
37 fout = f.replace(' ', '\\040')
38 for segment in stream[f]:
39 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
40 if current_span is None:
41 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
43 if segmentoffset == current_span[1]:
44 current_span[1] += segment[arvados.SEGMENTSIZE]
46 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
49 if current_span is not None:
50 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
53 stream_tokens.append("0:0:{0}".format(fout))
58 class CollectionBase(object):
62 def __exit__(self, exc_type, exc_value, traceback):
66 if self._keep_client is None:
67 self._keep_client = KeepClient(api_client=self._api_client,
68 num_retries=self.num_retries)
69 return self._keep_client
71 def stripped_manifest(self):
73 Return the manifest for the current collection with all
74 non-portable hints (i.e., permission signatures and other
75 hints other than size hints) removed from the locators.
77 raw = self.manifest_text()
79 for line in raw.split("\n"):
82 clean_fields = fields[:1] + [
83 (re.sub(r'\+[^\d][^\+]*', '', x)
84 if re.match(util.keep_locator_pattern, x)
87 clean += [' '.join(clean_fields), "\n"]
91 class CollectionReader(CollectionBase):
92 def __init__(self, manifest_locator_or_text, api_client=None,
93 keep_client=None, num_retries=0):
94 """Instantiate a CollectionReader.
96 This class parses Collection manifests to provide a simple interface
97 to read its underlying files.
100 * manifest_locator_or_text: One of a Collection UUID, portable data
101 hash, or full manifest text.
102 * api_client: The API client to use to look up Collections. If not
103 provided, CollectionReader will build one from available Arvados
105 * keep_client: The KeepClient to use to download Collection data.
106 If not provided, CollectionReader will build one from available
107 Arvados configuration.
108 * num_retries: The default number of times to retry failed
109 service requests. Default 0. You may change this value
110 after instantiation, but note those changes may not
111 propagate to related objects like the Keep client.
113 self._api_client = api_client
114 self._keep_client = keep_client
115 self.num_retries = num_retries
116 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
117 self._manifest_locator = manifest_locator_or_text
118 self._manifest_text = None
119 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
120 self._manifest_locator = manifest_locator_or_text
121 self._manifest_text = None
122 elif re.match(util.manifest_pattern, manifest_locator_or_text):
123 self._manifest_text = manifest_locator_or_text
124 self._manifest_locator = None
126 raise errors.ArgumentError(
127 "Argument to CollectionReader must be a manifest or a collection UUID")
128 self._api_response = None
131 def _populate_from_api_server(self):
132 # As in KeepClient itself, we must wait until the last
133 # possible moment to instantiate an API client, in order to
134 # avoid tripping up clients that don't have access to an API
135 # server. If we do build one, make sure our Keep client uses
136 # it. If instantiation fails, we'll fall back to the except
137 # clause, just like any other Collection lookup
138 # failure. Return an exception, or None if successful.
140 if self._api_client is None:
141 self._api_client = arvados.api('v1')
142 self._keep_client = None # Make a new one with the new api.
143 self._api_response = self._api_client.collections().get(
144 uuid=self._manifest_locator).execute(
145 num_retries=self.num_retries)
146 self._manifest_text = self._api_response['manifest_text']
148 except Exception as e:
151 def _populate_from_keep(self):
152 # Retrieve a manifest directly from Keep. This has a chance of
153 # working if [a] the locator includes a permission signature
154 # or [b] the Keep services are operating in world-readable
155 # mode. Return an exception, or None if successful.
157 self._manifest_text = self._my_keep().get(
158 self._manifest_locator, num_retries=self.num_retries)
159 except Exception as e:
164 error_via_keep = None
165 should_try_keep = ((self._manifest_text is None) and
166 util.keep_locator_pattern.match(
167 self._manifest_locator))
168 if ((self._manifest_text is None) and
169 util.signed_locator_pattern.match(self._manifest_locator)):
170 error_via_keep = self._populate_from_keep()
171 if self._manifest_text is None:
172 error_via_api = self._populate_from_api_server()
173 if error_via_api is not None and not should_try_keep:
175 if ((self._manifest_text is None) and
176 not error_via_keep and
178 # Looks like a keep locator, and we didn't already try keep above
179 error_via_keep = self._populate_from_keep()
180 if self._manifest_text is None:
182 raise arvados.errors.NotFoundError(
183 ("Failed to retrieve collection '{}' " +
184 "from either API server ({}) or Keep ({})."
186 self._manifest_locator,
189 self._streams = [sline.split()
190 for sline in self._manifest_text.split("\n")
193 def _populate_first(orig_func):
194 # Decorator for methods that read actual Collection data.
195 @functools.wraps(orig_func)
196 def wrapper(self, *args, **kwargs):
197 if self._streams is None:
199 return orig_func(self, *args, **kwargs)
203 def api_response(self):
204 """api_response() -> dict or None
206 Returns information about this Collection fetched from the API server.
207 If the Collection exists in Keep but not the API server, currently
208 returns None. Future versions may provide a synthetic response.
210 return self._api_response
216 for s in self.all_streams():
217 for f in s.all_files():
218 streamname, filename = split(s.name() + "/" + f.name())
219 if streamname not in streams:
220 streams[streamname] = {}
221 if filename not in streams[streamname]:
222 streams[streamname][filename] = []
224 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
226 self._streams = [normalize_stream(s, streams[s])
227 for s in sorted(streams)]
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join(
231 [StreamReader(stream, keep=self._my_keep()).manifest_text()
232 for stream in self._streams])
235 def open(self, streampath, filename=None):
236 """open(streampath[, filename]) -> file-like object
238 Pass in the path of a file to read from the Collection, either as a
239 single string or as two separate stream name and file name arguments.
240 This method returns a file-like object to read that file.
243 streampath, filename = split(streampath)
244 keep_client = self._my_keep()
245 for stream_s in self._streams:
246 stream = StreamReader(stream_s, keep_client,
247 num_retries=self.num_retries)
248 if stream.name() == streampath:
251 raise ValueError("stream '{}' not found in Collection".
254 return stream.files()[filename]
256 raise ValueError("file '{}' not found in Collection stream '{}'".
257 format(filename, streampath))
260 def all_streams(self):
261 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
262 for s in self._streams]
265 for s in self.all_streams():
266 for f in s.all_files():
270 def manifest_text(self, strip=False, normalize=False):
272 cr = CollectionReader(self.manifest_text())
274 return cr.manifest_text(strip=strip, normalize=False)
276 return self.stripped_manifest()
278 return self._manifest_text
281 class _WriterFile(ArvadosFileBase):
282 def __init__(self, coll_writer, name):
283 super(_WriterFile, self).__init__(name, 'wb')
284 self.dest = coll_writer
287 super(_WriterFile, self).close()
288 self.dest.finish_current_file()
290 @ArvadosFileBase._before_close
291 def write(self, data):
292 self.dest.write(data)
294 @ArvadosFileBase._before_close
295 def writelines(self, seq):
299 @ArvadosFileBase._before_close
301 self.dest.flush_data()
304 class CollectionWriter(CollectionBase):
305 KEEP_BLOCK_SIZE = 2**26
307 def __init__(self, api_client=None, num_retries=0):
308 """Instantiate a CollectionWriter.
310 CollectionWriter lets you build a new Arvados Collection from scratch.
311 Write files to it. The CollectionWriter will upload data to Keep as
312 appropriate, and provide you with the Collection manifest text when
316 * api_client: The API client to use to look up Collections. If not
317 provided, CollectionReader will build one from available Arvados
319 * num_retries: The default number of times to retry failed
320 service requests. Default 0. You may change this value
321 after instantiation, but note those changes may not
322 propagate to related objects like the Keep client.
324 self._api_client = api_client
325 self.num_retries = num_retries
326 self._keep_client = None
327 self._data_buffer = []
328 self._data_buffer_len = 0
329 self._current_stream_files = []
330 self._current_stream_length = 0
331 self._current_stream_locators = []
332 self._current_stream_name = '.'
333 self._current_file_name = None
334 self._current_file_pos = 0
335 self._finished_streams = []
336 self._close_file = None
337 self._queued_file = None
338 self._queued_dirents = deque()
339 self._queued_trees = deque()
340 self._last_open = None
342 def __exit__(self, exc_type, exc_value, traceback):
346 def do_queued_work(self):
347 # The work queue consists of three pieces:
348 # * _queued_file: The file object we're currently writing to the
350 # * _queued_dirents: Entries under the current directory
351 # (_queued_trees[0]) that we want to write or recurse through.
352 # This may contain files from subdirectories if
353 # max_manifest_depth == 0 for this directory.
354 # * _queued_trees: Directories that should be written as separate
355 # streams to the Collection.
356 # This function handles the smallest piece of work currently queued
357 # (current file, then current directory, then next directory) until
358 # no work remains. The _work_THING methods each do a unit of work on
359 # THING. _queue_THING methods add a THING to the work queue.
361 if self._queued_file:
363 elif self._queued_dirents:
365 elif self._queued_trees:
370 def _work_file(self):
372 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
376 self.finish_current_file()
378 self._queued_file.close()
379 self._close_file = None
380 self._queued_file = None
382 def _work_dirents(self):
383 path, stream_name, max_manifest_depth = self._queued_trees[0]
384 if stream_name != self.current_stream_name():
385 self.start_new_stream(stream_name)
386 while self._queued_dirents:
387 dirent = self._queued_dirents.popleft()
388 target = os.path.join(path, dirent)
389 if os.path.isdir(target):
390 self._queue_tree(target,
391 os.path.join(stream_name, dirent),
392 max_manifest_depth - 1)
394 self._queue_file(target, dirent)
396 if not self._queued_dirents:
397 self._queued_trees.popleft()
399 def _work_trees(self):
400 path, stream_name, max_manifest_depth = self._queued_trees[0]
401 d = util.listdir_recursive(
402 path, max_depth = (None if max_manifest_depth == 0 else 0))
404 self._queue_dirents(stream_name, d)
406 self._queued_trees.popleft()
408 def _queue_file(self, source, filename=None):
409 assert (self._queued_file is None), "tried to queue more than one file"
410 if not hasattr(source, 'read'):
411 source = open(source, 'rb')
412 self._close_file = True
414 self._close_file = False
416 filename = os.path.basename(source.name)
417 self.start_new_file(filename)
418 self._queued_file = source
420 def _queue_dirents(self, stream_name, dirents):
421 assert (not self._queued_dirents), "tried to queue more than one tree"
422 self._queued_dirents = deque(sorted(dirents))
424 def _queue_tree(self, path, stream_name, max_manifest_depth):
425 self._queued_trees.append((path, stream_name, max_manifest_depth))
427 def write_file(self, source, filename=None):
428 self._queue_file(source, filename)
429 self.do_queued_work()
431 def write_directory_tree(self,
432 path, stream_name='.', max_manifest_depth=-1):
433 self._queue_tree(path, stream_name, max_manifest_depth)
434 self.do_queued_work()
436 def write(self, newdata):
437 if hasattr(newdata, '__iter__'):
441 self._data_buffer.append(newdata)
442 self._data_buffer_len += len(newdata)
443 self._current_stream_length += len(newdata)
444 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
447 def open(self, streampath, filename=None):
448 """open(streampath[, filename]) -> file-like object
450 Pass in the path of a file to write to the Collection, either as a
451 single string or as two separate stream name and file name arguments.
452 This method returns a file-like object you can write to add it to the
455 You may only have one file object from the Collection open at a time,
456 so be sure to close the object when you're done. Using the object in
457 a with statement makes that easy::
459 with cwriter.open('./doc/page1.txt') as outfile:
460 outfile.write(page1_data)
461 with cwriter.open('./doc/page2.txt') as outfile:
462 outfile.write(page2_data)
465 streampath, filename = split(streampath)
466 if self._last_open and not self._last_open.closed:
467 raise errors.AssertionError(
468 "can't open '{}' when '{}' is still open".format(
469 filename, self._last_open.name))
470 if streampath != self.current_stream_name():
471 self.start_new_stream(streampath)
472 self.set_current_file_name(filename)
473 self._last_open = _WriterFile(self, filename)
474 return self._last_open
476 def flush_data(self):
477 data_buffer = ''.join(self._data_buffer)
479 self._current_stream_locators.append(
480 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
481 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
482 self._data_buffer_len = len(self._data_buffer[0])
484 def start_new_file(self, newfilename=None):
485 self.finish_current_file()
486 self.set_current_file_name(newfilename)
488 def set_current_file_name(self, newfilename):
489 if re.search(r'[\t\n]', newfilename):
490 raise errors.AssertionError(
491 "Manifest filenames cannot contain whitespace: %s" %
493 elif re.search(r'\x00', newfilename):
494 raise errors.AssertionError(
495 "Manifest filenames cannot contain NUL characters: %s" %
497 self._current_file_name = newfilename
499 def current_file_name(self):
500 return self._current_file_name
502 def finish_current_file(self):
503 if self._current_file_name is None:
504 if self._current_file_pos == self._current_stream_length:
506 raise errors.AssertionError(
507 "Cannot finish an unnamed file " +
508 "(%d bytes at offset %d in '%s' stream)" %
509 (self._current_stream_length - self._current_file_pos,
510 self._current_file_pos,
511 self._current_stream_name))
512 self._current_stream_files.append([
513 self._current_file_pos,
514 self._current_stream_length - self._current_file_pos,
515 self._current_file_name])
516 self._current_file_pos = self._current_stream_length
517 self._current_file_name = None
519 def start_new_stream(self, newstreamname='.'):
520 self.finish_current_stream()
521 self.set_current_stream_name(newstreamname)
523 def set_current_stream_name(self, newstreamname):
524 if re.search(r'[\t\n]', newstreamname):
525 raise errors.AssertionError(
526 "Manifest stream names cannot contain whitespace")
527 self._current_stream_name = '.' if newstreamname=='' else newstreamname
529 def current_stream_name(self):
530 return self._current_stream_name
532 def finish_current_stream(self):
533 self.finish_current_file()
535 if not self._current_stream_files:
537 elif self._current_stream_name is None:
538 raise errors.AssertionError(
539 "Cannot finish an unnamed stream (%d bytes in %d files)" %
540 (self._current_stream_length, len(self._current_stream_files)))
542 if not self._current_stream_locators:
543 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
544 self._finished_streams.append([self._current_stream_name,
545 self._current_stream_locators,
546 self._current_stream_files])
547 self._current_stream_files = []
548 self._current_stream_length = 0
549 self._current_stream_locators = []
550 self._current_stream_name = None
551 self._current_file_pos = 0
552 self._current_file_name = None
555 # Store the manifest in Keep and return its locator.
556 return self._my_keep().put(self.manifest_text())
558 def portable_data_hash(self):
559 stripped = self.stripped_manifest()
560 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
562 def manifest_text(self):
563 self.finish_current_stream()
566 for stream in self._finished_streams:
567 if not re.search(r'^\.(/.*)?$', stream[0]):
569 manifest += stream[0].replace(' ', '\\040')
570 manifest += ' ' + ' '.join(stream[1])
571 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
576 def data_locators(self):
578 for name, locators, files in self._finished_streams:
583 class ResumableCollectionWriter(CollectionWriter):
584 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
585 '_current_stream_locators', '_current_stream_name',
586 '_current_file_name', '_current_file_pos', '_close_file',
587 '_data_buffer', '_dependencies', '_finished_streams',
588 '_queued_dirents', '_queued_trees']
590 def __init__(self, api_client=None, num_retries=0):
591 self._dependencies = {}
592 super(ResumableCollectionWriter, self).__init__(
593 api_client, num_retries=num_retries)
596 def from_state(cls, state, *init_args, **init_kwargs):
597 # Try to build a new writer from scratch with the given state.
598 # If the state is not suitable to resume (because files have changed,
599 # been deleted, aren't predictable, etc.), raise a
600 # StaleWriterStateError. Otherwise, return the initialized writer.
601 # The caller is responsible for calling writer.do_queued_work()
602 # appropriately after it's returned.
603 writer = cls(*init_args, **init_kwargs)
604 for attr_name in cls.STATE_PROPS:
605 attr_value = state[attr_name]
606 attr_class = getattr(writer, attr_name).__class__
607 # Coerce the value into the same type as the initial value, if
609 if attr_class not in (type(None), attr_value.__class__):
610 attr_value = attr_class(attr_value)
611 setattr(writer, attr_name, attr_value)
612 # Check dependencies before we try to resume anything.
613 if any(KeepLocator(ls).permission_expired()
614 for ls in writer._current_stream_locators):
615 raise errors.StaleWriterStateError(
616 "locators include expired permission hint")
617 writer.check_dependencies()
618 if state['_current_file'] is not None:
619 path, pos = state['_current_file']
621 writer._queued_file = open(path, 'rb')
622 writer._queued_file.seek(pos)
623 except IOError as error:
624 raise errors.StaleWriterStateError(
625 "failed to reopen active file {}: {}".format(path, error))
628 def check_dependencies(self):
629 for path, orig_stat in self._dependencies.items():
630 if not S_ISREG(orig_stat[ST_MODE]):
631 raise errors.StaleWriterStateError("{} not file".format(path))
633 now_stat = tuple(os.stat(path))
634 except OSError as error:
635 raise errors.StaleWriterStateError(
636 "failed to stat {}: {}".format(path, error))
637 if ((not S_ISREG(now_stat[ST_MODE])) or
638 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
639 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
640 raise errors.StaleWriterStateError("{} changed".format(path))
642 def dump_state(self, copy_func=lambda x: x):
643 state = {attr: copy_func(getattr(self, attr))
644 for attr in self.STATE_PROPS}
645 if self._queued_file is None:
646 state['_current_file'] = None
648 state['_current_file'] = (os.path.realpath(self._queued_file.name),
649 self._queued_file.tell())
652 def _queue_file(self, source, filename=None):
654 src_path = os.path.realpath(source)
656 raise errors.AssertionError("{} not a file path".format(source))
658 path_stat = os.stat(src_path)
659 except OSError as stat_error:
661 super(ResumableCollectionWriter, self)._queue_file(source, filename)
662 fd_stat = os.fstat(self._queued_file.fileno())
663 if not S_ISREG(fd_stat.st_mode):
664 # We won't be able to resume from this cache anyway, so don't
665 # worry about further checks.
666 self._dependencies[source] = tuple(fd_stat)
667 elif path_stat is None:
668 raise errors.AssertionError(
669 "could not stat {}: {}".format(source, stat_error))
670 elif path_stat.st_ino != fd_stat.st_ino:
671 raise errors.AssertionError(
672 "{} changed between open and stat calls".format(source))
674 self._dependencies[src_path] = tuple(fd_stat)
676 def write(self, data):
677 if self._queued_file is None:
678 raise errors.AssertionError(
679 "resumable writer can't accept unsourced data")
680 return super(ResumableCollectionWriter, self).write(data)