6 from collections import deque
9 from .arvfile import ArvadosFileBase, split
11 from .stream import StreamReader, normalize_stream
16 _logger = logging.getLogger('arvados.collection')
18 class CollectionBase(object):
22 def __exit__(self, exc_type, exc_value, traceback):
26 if self._keep_client is None:
27 self._keep_client = KeepClient(api_client=self._api_client,
28 num_retries=self.num_retries)
29 return self._keep_client
31 def stripped_manifest(self):
33 Return the manifest for the current collection with all
34 non-portable hints (i.e., permission signatures and other
35 hints other than size hints) removed from the locators.
37 raw = self.manifest_text()
39 for line in raw.split("\n"):
42 clean_fields = fields[:1] + [
43 (re.sub(r'\+[^\d][^\+]*', '', x)
44 if re.match(util.keep_locator_pattern, x)
47 clean += [' '.join(clean_fields), "\n"]
51 class CollectionReader(CollectionBase):
52 def __init__(self, manifest_locator_or_text, api_client=None,
53 keep_client=None, num_retries=0):
54 """Instantiate a CollectionReader.
56 This class parses Collection manifests to provide a simple interface
57 to read its underlying files.
60 * manifest_locator_or_text: One of a Collection UUID, portable data
61 hash, or full manifest text.
62 * api_client: The API client to use to look up Collections. If not
63 provided, CollectionReader will build one from available Arvados
65 * keep_client: The KeepClient to use to download Collection data.
66 If not provided, CollectionReader will build one from available
67 Arvados configuration.
68 * num_retries: The default number of times to retry failed
69 service requests. Default 0. You may change this value
70 after instantiation, but note those changes may not
71 propagate to related objects like the Keep client.
73 self._api_client = api_client
74 self._keep_client = keep_client
75 self.num_retries = num_retries
76 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
77 self._manifest_locator = manifest_locator_or_text
78 self._manifest_text = None
79 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
80 self._manifest_locator = manifest_locator_or_text
81 self._manifest_text = None
82 elif re.match(util.manifest_pattern, manifest_locator_or_text):
83 self._manifest_text = manifest_locator_or_text
84 self._manifest_locator = None
86 raise errors.ArgumentError(
87 "Argument to CollectionReader must be a manifest or a collection UUID")
88 self._api_response = None
91 def _populate_from_api_server(self):
92 # As in KeepClient itself, we must wait until the last
93 # possible moment to instantiate an API client, in order to
94 # avoid tripping up clients that don't have access to an API
95 # server. If we do build one, make sure our Keep client uses
96 # it. If instantiation fails, we'll fall back to the except
97 # clause, just like any other Collection lookup
98 # failure. Return an exception, or None if successful.
100 if self._api_client is None:
101 self._api_client = arvados.api('v1')
102 self._keep_client = None # Make a new one with the new api.
103 self._api_response = self._api_client.collections().get(
104 uuid=self._manifest_locator).execute(
105 num_retries=self.num_retries)
106 self._manifest_text = self._api_response['manifest_text']
108 except Exception as e:
111 def _populate_from_keep(self):
112 # Retrieve a manifest directly from Keep. This has a chance of
113 # working if [a] the locator includes a permission signature
114 # or [b] the Keep services are operating in world-readable
115 # mode. Return an exception, or None if successful.
117 self._manifest_text = self._my_keep().get(
118 self._manifest_locator, num_retries=self.num_retries)
119 except Exception as e:
124 error_via_keep = None
125 should_try_keep = ((self._manifest_text is None) and
126 util.keep_locator_pattern.match(
127 self._manifest_locator))
128 if ((self._manifest_text is None) and
129 util.signed_locator_pattern.match(self._manifest_locator)):
130 error_via_keep = self._populate_from_keep()
131 if self._manifest_text is None:
132 error_via_api = self._populate_from_api_server()
133 if error_via_api is not None and not should_try_keep:
135 if ((self._manifest_text is None) and
136 not error_via_keep and
138 # Looks like a keep locator, and we didn't already try keep above
139 error_via_keep = self._populate_from_keep()
140 if self._manifest_text is None:
142 raise arvados.errors.NotFoundError(
143 ("Failed to retrieve collection '{}' " +
144 "from either API server ({}) or Keep ({})."
146 self._manifest_locator,
149 self._streams = [sline.split()
150 for sline in self._manifest_text.split("\n")
153 def _populate_first(orig_func):
154 # Decorator for methods that read actual Collection data.
155 @functools.wraps(orig_func)
156 def wrapper(self, *args, **kwargs):
157 if self._streams is None:
159 return orig_func(self, *args, **kwargs)
163 def api_response(self):
164 """api_response() -> dict or None
166 Returns information about this Collection fetched from the API server.
167 If the Collection exists in Keep but not the API server, currently
168 returns None. Future versions may provide a synthetic response.
170 return self._api_response
176 for s in self.all_streams():
177 for f in s.all_files():
178 streamname, filename = split(s.name() + "/" + f.name())
179 if streamname not in streams:
180 streams[streamname] = {}
181 if filename not in streams[streamname]:
182 streams[streamname][filename] = []
184 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
186 self._streams = [normalize_stream(s, streams[s])
187 for s in sorted(streams)]
189 # Regenerate the manifest text based on the normalized streams
190 self._manifest_text = ''.join(
191 [StreamReader(stream, keep=self._my_keep()).manifest_text()
192 for stream in self._streams])
195 def open(self, streampath, filename=None):
196 """open(streampath[, filename]) -> file-like object
198 Pass in the path of a file to read from the Collection, either as a
199 single string or as two separate stream name and file name arguments.
200 This method returns a file-like object to read that file.
203 streampath, filename = split(streampath)
204 keep_client = self._my_keep()
205 for stream_s in self._streams:
206 stream = StreamReader(stream_s, keep_client,
207 num_retries=self.num_retries)
208 if stream.name() == streampath:
211 raise ValueError("stream '{}' not found in Collection".
214 return stream.files()[filename]
216 raise ValueError("file '{}' not found in Collection stream '{}'".
217 format(filename, streampath))
220 def all_streams(self):
221 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
222 for s in self._streams]
225 for s in self.all_streams():
226 for f in s.all_files():
230 def manifest_text(self, strip=False, normalize=False):
232 cr = CollectionReader(self.manifest_text())
234 return cr.manifest_text(strip=strip, normalize=False)
236 return self.stripped_manifest()
238 return self._manifest_text
241 class _WriterFile(ArvadosFileBase):
242 def __init__(self, coll_writer, name):
243 super(_WriterFile, self).__init__(name, 'wb')
244 self.dest = coll_writer
247 super(_WriterFile, self).close()
248 self.dest.finish_current_file()
250 @ArvadosFileBase._before_close
251 def write(self, data):
252 self.dest.write(data)
254 @ArvadosFileBase._before_close
255 def writelines(self, seq):
259 @ArvadosFileBase._before_close
261 self.dest.flush_data()
264 class CollectionWriter(CollectionBase):
265 def __init__(self, api_client=None, num_retries=0):
266 """Instantiate a CollectionWriter.
268 CollectionWriter lets you build a new Arvados Collection from scratch.
269 Write files to it. The CollectionWriter will upload data to Keep as
270 appropriate, and provide you with the Collection manifest text when
274 * api_client: The API client to use to look up Collections. If not
275 provided, CollectionReader will build one from available Arvados
277 * num_retries: The default number of times to retry failed
278 service requests. Default 0. You may change this value
279 after instantiation, but note those changes may not
280 propagate to related objects like the Keep client.
282 self._api_client = api_client
283 self.num_retries = num_retries
284 self._keep_client = None
285 self._data_buffer = []
286 self._data_buffer_len = 0
287 self._current_stream_files = []
288 self._current_stream_length = 0
289 self._current_stream_locators = []
290 self._current_stream_name = '.'
291 self._current_file_name = None
292 self._current_file_pos = 0
293 self._finished_streams = []
294 self._close_file = None
295 self._queued_file = None
296 self._queued_dirents = deque()
297 self._queued_trees = deque()
298 self._last_open = None
300 def __exit__(self, exc_type, exc_value, traceback):
304 def do_queued_work(self):
305 # The work queue consists of three pieces:
306 # * _queued_file: The file object we're currently writing to the
308 # * _queued_dirents: Entries under the current directory
309 # (_queued_trees[0]) that we want to write or recurse through.
310 # This may contain files from subdirectories if
311 # max_manifest_depth == 0 for this directory.
312 # * _queued_trees: Directories that should be written as separate
313 # streams to the Collection.
314 # This function handles the smallest piece of work currently queued
315 # (current file, then current directory, then next directory) until
316 # no work remains. The _work_THING methods each do a unit of work on
317 # THING. _queue_THING methods add a THING to the work queue.
319 if self._queued_file:
321 elif self._queued_dirents:
323 elif self._queued_trees:
328 def _work_file(self):
330 buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
334 self.finish_current_file()
336 self._queued_file.close()
337 self._close_file = None
338 self._queued_file = None
340 def _work_dirents(self):
341 path, stream_name, max_manifest_depth = self._queued_trees[0]
342 if stream_name != self.current_stream_name():
343 self.start_new_stream(stream_name)
344 while self._queued_dirents:
345 dirent = self._queued_dirents.popleft()
346 target = os.path.join(path, dirent)
347 if os.path.isdir(target):
348 self._queue_tree(target,
349 os.path.join(stream_name, dirent),
350 max_manifest_depth - 1)
352 self._queue_file(target, dirent)
354 if not self._queued_dirents:
355 self._queued_trees.popleft()
357 def _work_trees(self):
358 path, stream_name, max_manifest_depth = self._queued_trees[0]
359 d = util.listdir_recursive(
360 path, max_depth = (None if max_manifest_depth == 0 else 0))
362 self._queue_dirents(stream_name, d)
364 self._queued_trees.popleft()
366 def _queue_file(self, source, filename=None):
367 assert (self._queued_file is None), "tried to queue more than one file"
368 if not hasattr(source, 'read'):
369 source = open(source, 'rb')
370 self._close_file = True
372 self._close_file = False
374 filename = os.path.basename(source.name)
375 self.start_new_file(filename)
376 self._queued_file = source
378 def _queue_dirents(self, stream_name, dirents):
379 assert (not self._queued_dirents), "tried to queue more than one tree"
380 self._queued_dirents = deque(sorted(dirents))
382 def _queue_tree(self, path, stream_name, max_manifest_depth):
383 self._queued_trees.append((path, stream_name, max_manifest_depth))
385 def write_file(self, source, filename=None):
386 self._queue_file(source, filename)
387 self.do_queued_work()
389 def write_directory_tree(self,
390 path, stream_name='.', max_manifest_depth=-1):
391 self._queue_tree(path, stream_name, max_manifest_depth)
392 self.do_queued_work()
394 def write(self, newdata):
395 if hasattr(newdata, '__iter__'):
399 self._data_buffer.append(newdata)
400 self._data_buffer_len += len(newdata)
401 self._current_stream_length += len(newdata)
402 while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
405 def open(self, streampath, filename=None):
406 """open(streampath[, filename]) -> file-like object
408 Pass in the path of a file to write to the Collection, either as a
409 single string or as two separate stream name and file name arguments.
410 This method returns a file-like object you can write to add it to the
413 You may only have one file object from the Collection open at a time,
414 so be sure to close the object when you're done. Using the object in
415 a with statement makes that easy::
417 with cwriter.open('./doc/page1.txt') as outfile:
418 outfile.write(page1_data)
419 with cwriter.open('./doc/page2.txt') as outfile:
420 outfile.write(page2_data)
423 streampath, filename = split(streampath)
424 if self._last_open and not self._last_open.closed:
425 raise errors.AssertionError(
426 "can't open '{}' when '{}' is still open".format(
427 filename, self._last_open.name))
428 if streampath != self.current_stream_name():
429 self.start_new_stream(streampath)
430 self.set_current_file_name(filename)
431 self._last_open = _WriterFile(self, filename)
432 return self._last_open
434 def flush_data(self):
435 data_buffer = ''.join(self._data_buffer)
437 self._current_stream_locators.append(
438 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
439 self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
440 self._data_buffer_len = len(self._data_buffer[0])
442 def start_new_file(self, newfilename=None):
443 self.finish_current_file()
444 self.set_current_file_name(newfilename)
446 def set_current_file_name(self, newfilename):
447 if re.search(r'[\t\n]', newfilename):
448 raise errors.AssertionError(
449 "Manifest filenames cannot contain whitespace: %s" %
451 elif re.search(r'\x00', newfilename):
452 raise errors.AssertionError(
453 "Manifest filenames cannot contain NUL characters: %s" %
455 self._current_file_name = newfilename
457 def current_file_name(self):
458 return self._current_file_name
460 def finish_current_file(self):
461 if self._current_file_name is None:
462 if self._current_file_pos == self._current_stream_length:
464 raise errors.AssertionError(
465 "Cannot finish an unnamed file " +
466 "(%d bytes at offset %d in '%s' stream)" %
467 (self._current_stream_length - self._current_file_pos,
468 self._current_file_pos,
469 self._current_stream_name))
470 self._current_stream_files.append([
471 self._current_file_pos,
472 self._current_stream_length - self._current_file_pos,
473 self._current_file_name])
474 self._current_file_pos = self._current_stream_length
475 self._current_file_name = None
477 def start_new_stream(self, newstreamname='.'):
478 self.finish_current_stream()
479 self.set_current_stream_name(newstreamname)
481 def set_current_stream_name(self, newstreamname):
482 if re.search(r'[\t\n]', newstreamname):
483 raise errors.AssertionError(
484 "Manifest stream names cannot contain whitespace")
485 self._current_stream_name = '.' if newstreamname=='' else newstreamname
487 def current_stream_name(self):
488 return self._current_stream_name
490 def finish_current_stream(self):
491 self.finish_current_file()
493 if not self._current_stream_files:
495 elif self._current_stream_name is None:
496 raise errors.AssertionError(
497 "Cannot finish an unnamed stream (%d bytes in %d files)" %
498 (self._current_stream_length, len(self._current_stream_files)))
500 if not self._current_stream_locators:
501 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
502 self._finished_streams.append([self._current_stream_name,
503 self._current_stream_locators,
504 self._current_stream_files])
505 self._current_stream_files = []
506 self._current_stream_length = 0
507 self._current_stream_locators = []
508 self._current_stream_name = None
509 self._current_file_pos = 0
510 self._current_file_name = None
513 # Store the manifest in Keep and return its locator.
514 return self._my_keep().put(self.manifest_text())
516 def portable_data_hash(self):
517 stripped = self.stripped_manifest()
518 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
520 def manifest_text(self):
521 self.finish_current_stream()
524 for stream in self._finished_streams:
525 if not re.search(r'^\.(/.*)?$', stream[0]):
527 manifest += stream[0].replace(' ', '\\040')
528 manifest += ' ' + ' '.join(stream[1])
529 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
534 def data_locators(self):
536 for name, locators, files in self._finished_streams:
541 class ResumableCollectionWriter(CollectionWriter):
542 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
543 '_current_stream_locators', '_current_stream_name',
544 '_current_file_name', '_current_file_pos', '_close_file',
545 '_data_buffer', '_dependencies', '_finished_streams',
546 '_queued_dirents', '_queued_trees']
548 def __init__(self, api_client=None, num_retries=0):
549 self._dependencies = {}
550 super(ResumableCollectionWriter, self).__init__(
551 api_client, num_retries=num_retries)
554 def from_state(cls, state, *init_args, **init_kwargs):
555 # Try to build a new writer from scratch with the given state.
556 # If the state is not suitable to resume (because files have changed,
557 # been deleted, aren't predictable, etc.), raise a
558 # StaleWriterStateError. Otherwise, return the initialized writer.
559 # The caller is responsible for calling writer.do_queued_work()
560 # appropriately after it's returned.
561 writer = cls(*init_args, **init_kwargs)
562 for attr_name in cls.STATE_PROPS:
563 attr_value = state[attr_name]
564 attr_class = getattr(writer, attr_name).__class__
565 # Coerce the value into the same type as the initial value, if
567 if attr_class not in (type(None), attr_value.__class__):
568 attr_value = attr_class(attr_value)
569 setattr(writer, attr_name, attr_value)
570 # Check dependencies before we try to resume anything.
571 if any(KeepLocator(ls).permission_expired()
572 for ls in writer._current_stream_locators):
573 raise errors.StaleWriterStateError(
574 "locators include expired permission hint")
575 writer.check_dependencies()
576 if state['_current_file'] is not None:
577 path, pos = state['_current_file']
579 writer._queued_file = open(path, 'rb')
580 writer._queued_file.seek(pos)
581 except IOError as error:
582 raise errors.StaleWriterStateError(
583 "failed to reopen active file {}: {}".format(path, error))
586 def check_dependencies(self):
587 for path, orig_stat in self._dependencies.items():
588 if not S_ISREG(orig_stat[ST_MODE]):
589 raise errors.StaleWriterStateError("{} not file".format(path))
591 now_stat = tuple(os.stat(path))
592 except OSError as error:
593 raise errors.StaleWriterStateError(
594 "failed to stat {}: {}".format(path, error))
595 if ((not S_ISREG(now_stat[ST_MODE])) or
596 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
597 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
598 raise errors.StaleWriterStateError("{} changed".format(path))
600 def dump_state(self, copy_func=lambda x: x):
601 state = {attr: copy_func(getattr(self, attr))
602 for attr in self.STATE_PROPS}
603 if self._queued_file is None:
604 state['_current_file'] = None
606 state['_current_file'] = (os.path.realpath(self._queued_file.name),
607 self._queued_file.tell())
610 def _queue_file(self, source, filename=None):
612 src_path = os.path.realpath(source)
614 raise errors.AssertionError("{} not a file path".format(source))
616 path_stat = os.stat(src_path)
617 except OSError as stat_error:
619 super(ResumableCollectionWriter, self)._queue_file(source, filename)
620 fd_stat = os.fstat(self._queued_file.fileno())
621 if not S_ISREG(fd_stat.st_mode):
622 # We won't be able to resume from this cache anyway, so don't
623 # worry about further checks.
624 self._dependencies[source] = tuple(fd_stat)
625 elif path_stat is None:
626 raise errors.AssertionError(
627 "could not stat {}: {}".format(source, stat_error))
628 elif path_stat.st_ino != fd_stat.st_ino:
629 raise errors.AssertionError(
630 "{} changed between open and stat calls".format(source))
632 self._dependencies[src_path] = tuple(fd_stat)
634 def write(self, data):
635 if self._queued_file is None:
636 raise errors.AssertionError(
637 "resumable writer can't accept unsourced data")
638 return super(ResumableCollectionWriter, self).write(data)