5 from collections import deque
8 from .arvfile import ArvadosFileBase
10 from .stream import StreamReader, split
15 _logger = logging.getLogger('arvados.collection')
17 def normalize_stream(s, stream):
19 sortedfiles = list(stream.keys())
26 if b[arvados.LOCATOR] not in blocks:
27 stream_tokens.append(b[arvados.LOCATOR])
28 blocks[b[arvados.LOCATOR]] = streamoffset
29 streamoffset += b[arvados.BLOCKSIZE]
31 if len(stream_tokens) == 1:
32 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
36 fout = f.replace(' ', '\\040')
37 for segment in stream[f]:
38 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
39 if current_span is None:
40 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
42 if segmentoffset == current_span[1]:
43 current_span[1] += segment[arvados.SEGMENTSIZE]
45 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
46 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
48 if current_span is not None:
49 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52 stream_tokens.append("0:0:{0}".format(fout))
57 class CollectionBase(object):
61 def __exit__(self, exc_type, exc_value, traceback):
65 if self._keep_client is None:
66 self._keep_client = KeepClient(api_client=self._api_client,
67 num_retries=self.num_retries)
68 return self._keep_client
70 def stripped_manifest(self):
72 Return the manifest for the current collection with all
73 non-portable hints (i.e., permission signatures and other
74 hints other than size hints) removed from the locators.
76 raw = self.manifest_text()
78 for line in raw.split("\n"):
81 clean_fields = fields[:1] + [
82 (re.sub(r'\+[^\d][^\+]*', '', x)
83 if re.match(util.keep_locator_pattern, x)
86 clean += [' '.join(clean_fields), "\n"]
90 class CollectionReader(CollectionBase):
91 def __init__(self, manifest_locator_or_text, api_client=None,
92 keep_client=None, num_retries=0):
93 """Instantiate a CollectionReader.
95 This class parses Collection manifests to provide a simple interface
96 to read its underlying files.
99 * manifest_locator_or_text: One of a Collection UUID, portable data
100 hash, or full manifest text.
101 * api_client: The API client to use to look up Collections. If not
102 provided, CollectionReader will build one from available Arvados
104 * keep_client: The KeepClient to use to download Collection data.
105 If not provided, CollectionReader will build one from available
106 Arvados configuration.
107 * num_retries: The default number of times to retry failed
108 service requests. Default 0. You may change this value
109 after instantiation, but note those changes may not
110 propagate to related objects like the Keep client.
112 self._api_client = api_client
113 self._keep_client = keep_client
114 self.num_retries = num_retries
115 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
116 self._manifest_locator = manifest_locator_or_text
117 self._manifest_text = None
118 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
119 self._manifest_locator = manifest_locator_or_text
120 self._manifest_text = None
121 elif re.match(util.manifest_pattern, manifest_locator_or_text):
122 self._manifest_text = manifest_locator_or_text
123 self._manifest_locator = None
125 raise errors.ArgumentError(
126 "Argument to CollectionReader must be a manifest or a collection UUID")
129 def _populate_from_api_server(self):
130 # As in KeepClient itself, we must wait until the last
131 # possible moment to instantiate an API client, in order to
132 # avoid tripping up clients that don't have access to an API
133 # server. If we do build one, make sure our Keep client uses
134 # it. If instantiation fails, we'll fall back to the except
135 # clause, just like any other Collection lookup
136 # failure. Return an exception, or None if successful.
138 if self._api_client is None:
139 self._api_client = arvados.api('v1')
140 self._keep_client = None # Make a new one with the new api.
141 c = self._api_client.collections().get(
142 uuid=self._manifest_locator).execute(
143 num_retries=self.num_retries)
144 self._manifest_text = c['manifest_text']
146 except Exception as e:
149 def _populate_from_keep(self):
150 # Retrieve a manifest directly from Keep. This has a chance of
151 # working if [a] the locator includes a permission signature
152 # or [b] the Keep services are operating in world-readable
153 # mode. Return an exception, or None if successful.
155 self._manifest_text = self._my_keep().get(
156 self._manifest_locator, num_retries=self.num_retries)
157 except Exception as e:
161 if self._streams is not None:
164 error_via_keep = None
165 should_try_keep = ((self._manifest_text is None) and
166 util.keep_locator_pattern.match(
167 self._manifest_locator))
168 if ((self._manifest_text is None) and
169 util.signed_locator_pattern.match(self._manifest_locator)):
170 error_via_keep = self._populate_from_keep()
171 if self._manifest_text is None:
172 error_via_api = self._populate_from_api_server()
173 if error_via_api is not None and not should_try_keep:
175 if ((self._manifest_text is None) and
176 not error_via_keep and
178 # Looks like a keep locator, and we didn't already try keep above
179 error_via_keep = self._populate_from_keep()
180 if self._manifest_text is None:
182 raise arvados.errors.NotFoundError(
183 ("Failed to retrieve collection '{}' " +
184 "from either API server ({}) or Keep ({})."
186 self._manifest_locator,
189 self._streams = [sline.split()
190 for sline in self._manifest_text.split("\n")
198 for s in self.all_streams():
199 for f in s.all_files():
200 streamname, filename = split(s.name() + "/" + f.name())
201 if streamname not in streams:
202 streams[streamname] = {}
203 if filename not in streams[streamname]:
204 streams[streamname][filename] = []
206 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
208 self._streams = [normalize_stream(s, streams[s])
209 for s in sorted(streams)]
211 # Regenerate the manifest text based on the normalized streams
212 self._manifest_text = ''.join(
213 [StreamReader(stream, keep=self._my_keep()).manifest_text()
214 for stream in self._streams])
216 def open(self, streampath, filename=None):
217 """open(streampath[, filename]) -> file-like object
219 Pass in the path of a file to read from the Collection, either as a
220 single string or as two separate stream name and file name arguments.
221 This method returns a file-like object to read that file.
225 streampath, filename = split(streampath)
226 keep_client = self._my_keep()
227 for stream_s in self._streams:
228 stream = StreamReader(stream_s, keep_client,
229 num_retries=self.num_retries)
230 if stream.name() == streampath:
233 raise ValueError("stream '{}' not found in Collection".
236 return stream.files()[filename]
238 raise ValueError("file '{}' not found in Collection stream '{}'".
239 format(filename, streampath))
241 def all_streams(self):
243 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
244 for s in self._streams]
247 for s in self.all_streams():
248 for f in s.all_files():
251 def manifest_text(self, strip=False, normalize=False):
253 cr = CollectionReader(self.manifest_text())
255 return cr.manifest_text(strip=strip, normalize=False)
257 return self.stripped_manifest()
260 return self._manifest_text
263 class _WriterFile(ArvadosFileBase):
264 def __init__(self, coll_writer, name):
265 super(_WriterFile, self).__init__(name, 'wb')
266 self.dest = coll_writer
269 super(_WriterFile, self).close()
270 self.dest.finish_current_file()
272 @ArvadosFileBase._before_close
273 def write(self, data):
274 self.dest.write(data)
276 @ArvadosFileBase._before_close
277 def writelines(self, seq):
281 @ArvadosFileBase._before_close
283 self.dest.flush_data()
286 class CollectionWriter(CollectionBase):
287 KEEP_BLOCK_SIZE = 2**26
289 def __init__(self, api_client=None, num_retries=0):
290 """Instantiate a CollectionWriter.
292 CollectionWriter lets you build a new Arvados Collection from scratch.
293 Write files to it. The CollectionWriter will upload data to Keep as
294 appropriate, and provide you with the Collection manifest text when
298 * api_client: The API client to use to look up Collections. If not
299 provided, CollectionReader will build one from available Arvados
301 * num_retries: The default number of times to retry failed
302 service requests. Default 0. You may change this value
303 after instantiation, but note those changes may not
304 propagate to related objects like the Keep client.
306 self._api_client = api_client
307 self.num_retries = num_retries
308 self._keep_client = None
309 self._data_buffer = []
310 self._data_buffer_len = 0
311 self._current_stream_files = []
312 self._current_stream_length = 0
313 self._current_stream_locators = []
314 self._current_stream_name = '.'
315 self._current_file_name = None
316 self._current_file_pos = 0
317 self._finished_streams = []
318 self._close_file = None
319 self._queued_file = None
320 self._queued_dirents = deque()
321 self._queued_trees = deque()
322 self._last_open = None
324 def __exit__(self, exc_type, exc_value, traceback):
328 def do_queued_work(self):
329 # The work queue consists of three pieces:
330 # * _queued_file: The file object we're currently writing to the
332 # * _queued_dirents: Entries under the current directory
333 # (_queued_trees[0]) that we want to write or recurse through.
334 # This may contain files from subdirectories if
335 # max_manifest_depth == 0 for this directory.
336 # * _queued_trees: Directories that should be written as separate
337 # streams to the Collection.
338 # This function handles the smallest piece of work currently queued
339 # (current file, then current directory, then next directory) until
340 # no work remains. The _work_THING methods each do a unit of work on
341 # THING. _queue_THING methods add a THING to the work queue.
343 if self._queued_file:
345 elif self._queued_dirents:
347 elif self._queued_trees:
352 def _work_file(self):
354 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
358 self.finish_current_file()
360 self._queued_file.close()
361 self._close_file = None
362 self._queued_file = None
364 def _work_dirents(self):
365 path, stream_name, max_manifest_depth = self._queued_trees[0]
366 if stream_name != self.current_stream_name():
367 self.start_new_stream(stream_name)
368 while self._queued_dirents:
369 dirent = self._queued_dirents.popleft()
370 target = os.path.join(path, dirent)
371 if os.path.isdir(target):
372 self._queue_tree(target,
373 os.path.join(stream_name, dirent),
374 max_manifest_depth - 1)
376 self._queue_file(target, dirent)
378 if not self._queued_dirents:
379 self._queued_trees.popleft()
381 def _work_trees(self):
382 path, stream_name, max_manifest_depth = self._queued_trees[0]
383 d = util.listdir_recursive(
384 path, max_depth = (None if max_manifest_depth == 0 else 0))
386 self._queue_dirents(stream_name, d)
388 self._queued_trees.popleft()
390 def _queue_file(self, source, filename=None):
391 assert (self._queued_file is None), "tried to queue more than one file"
392 if not hasattr(source, 'read'):
393 source = open(source, 'rb')
394 self._close_file = True
396 self._close_file = False
398 filename = os.path.basename(source.name)
399 self.start_new_file(filename)
400 self._queued_file = source
402 def _queue_dirents(self, stream_name, dirents):
403 assert (not self._queued_dirents), "tried to queue more than one tree"
404 self._queued_dirents = deque(sorted(dirents))
406 def _queue_tree(self, path, stream_name, max_manifest_depth):
407 self._queued_trees.append((path, stream_name, max_manifest_depth))
409 def write_file(self, source, filename=None):
410 self._queue_file(source, filename)
411 self.do_queued_work()
413 def write_directory_tree(self,
414 path, stream_name='.', max_manifest_depth=-1):
415 self._queue_tree(path, stream_name, max_manifest_depth)
416 self.do_queued_work()
418 def write(self, newdata):
419 if hasattr(newdata, '__iter__'):
423 self._data_buffer.append(newdata)
424 self._data_buffer_len += len(newdata)
425 self._current_stream_length += len(newdata)
426 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
429 def open(self, streampath, filename=None):
430 """open(streampath[, filename]) -> file-like object
432 Pass in the path of a file to write to the Collection, either as a
433 single string or as two separate stream name and file name arguments.
434 This method returns a file-like object you can write to add it to the
437 You may only have one file object from the Collection open at a time,
438 so be sure to close the object when you're done. Using the object in
439 a with statement makes that easy::
441 with cwriter.open('./doc/page1.txt') as outfile:
442 outfile.write(page1_data)
443 with cwriter.open('./doc/page2.txt') as outfile:
444 outfile.write(page2_data)
447 streampath, filename = split(streampath)
448 if self._last_open and not self._last_open.closed:
449 raise errors.AssertionError(
450 "can't open '{}' when '{}' is still open".format(
451 filename, self._last_open.name))
452 if streampath != self.current_stream_name():
453 self.start_new_stream(streampath)
454 self.set_current_file_name(filename)
455 self._last_open = _WriterFile(self, filename)
456 return self._last_open
458 def flush_data(self):
459 data_buffer = ''.join(self._data_buffer)
461 self._current_stream_locators.append(
462 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
463 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
464 self._data_buffer_len = len(self._data_buffer[0])
466 def start_new_file(self, newfilename=None):
467 self.finish_current_file()
468 self.set_current_file_name(newfilename)
470 def set_current_file_name(self, newfilename):
471 if re.search(r'[\t\n]', newfilename):
472 raise errors.AssertionError(
473 "Manifest filenames cannot contain whitespace: %s" %
475 elif re.search(r'\x00', newfilename):
476 raise errors.AssertionError(
477 "Manifest filenames cannot contain NUL characters: %s" %
479 self._current_file_name = newfilename
481 def current_file_name(self):
482 return self._current_file_name
484 def finish_current_file(self):
485 if self._current_file_name is None:
486 if self._current_file_pos == self._current_stream_length:
488 raise errors.AssertionError(
489 "Cannot finish an unnamed file " +
490 "(%d bytes at offset %d in '%s' stream)" %
491 (self._current_stream_length - self._current_file_pos,
492 self._current_file_pos,
493 self._current_stream_name))
494 self._current_stream_files.append([
495 self._current_file_pos,
496 self._current_stream_length - self._current_file_pos,
497 self._current_file_name])
498 self._current_file_pos = self._current_stream_length
499 self._current_file_name = None
501 def start_new_stream(self, newstreamname='.'):
502 self.finish_current_stream()
503 self.set_current_stream_name(newstreamname)
505 def set_current_stream_name(self, newstreamname):
506 if re.search(r'[\t\n]', newstreamname):
507 raise errors.AssertionError(
508 "Manifest stream names cannot contain whitespace")
509 self._current_stream_name = '.' if newstreamname=='' else newstreamname
511 def current_stream_name(self):
512 return self._current_stream_name
514 def finish_current_stream(self):
515 self.finish_current_file()
517 if not self._current_stream_files:
519 elif self._current_stream_name is None:
520 raise errors.AssertionError(
521 "Cannot finish an unnamed stream (%d bytes in %d files)" %
522 (self._current_stream_length, len(self._current_stream_files)))
524 if not self._current_stream_locators:
525 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
526 self._finished_streams.append([self._current_stream_name,
527 self._current_stream_locators,
528 self._current_stream_files])
529 self._current_stream_files = []
530 self._current_stream_length = 0
531 self._current_stream_locators = []
532 self._current_stream_name = None
533 self._current_file_pos = 0
534 self._current_file_name = None
537 # Store the manifest in Keep and return its locator.
538 return self._my_keep().put(self.manifest_text())
540 def portable_data_hash(self):
541 stripped = self.stripped_manifest()
542 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
544 def manifest_text(self):
545 self.finish_current_stream()
548 for stream in self._finished_streams:
549 if not re.search(r'^\.(/.*)?$', stream[0]):
551 manifest += stream[0].replace(' ', '\\040')
552 manifest += ' ' + ' '.join(stream[1])
553 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
558 def data_locators(self):
560 for name, locators, files in self._finished_streams:
565 class ResumableCollectionWriter(CollectionWriter):
566 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
567 '_current_stream_locators', '_current_stream_name',
568 '_current_file_name', '_current_file_pos', '_close_file',
569 '_data_buffer', '_dependencies', '_finished_streams',
570 '_queued_dirents', '_queued_trees']
572 def __init__(self, api_client=None, num_retries=0):
573 self._dependencies = {}
574 super(ResumableCollectionWriter, self).__init__(
575 api_client, num_retries=num_retries)
578 def from_state(cls, state, *init_args, **init_kwargs):
579 # Try to build a new writer from scratch with the given state.
580 # If the state is not suitable to resume (because files have changed,
581 # been deleted, aren't predictable, etc.), raise a
582 # StaleWriterStateError. Otherwise, return the initialized writer.
583 # The caller is responsible for calling writer.do_queued_work()
584 # appropriately after it's returned.
585 writer = cls(*init_args, **init_kwargs)
586 for attr_name in cls.STATE_PROPS:
587 attr_value = state[attr_name]
588 attr_class = getattr(writer, attr_name).__class__
589 # Coerce the value into the same type as the initial value, if
591 if attr_class not in (type(None), attr_value.__class__):
592 attr_value = attr_class(attr_value)
593 setattr(writer, attr_name, attr_value)
594 # Check dependencies before we try to resume anything.
595 if any(KeepLocator(ls).permission_expired()
596 for ls in writer._current_stream_locators):
597 raise errors.StaleWriterStateError(
598 "locators include expired permission hint")
599 writer.check_dependencies()
600 if state['_current_file'] is not None:
601 path, pos = state['_current_file']
603 writer._queued_file = open(path, 'rb')
604 writer._queued_file.seek(pos)
605 except IOError as error:
606 raise errors.StaleWriterStateError(
607 "failed to reopen active file {}: {}".format(path, error))
610 def check_dependencies(self):
611 for path, orig_stat in self._dependencies.items():
612 if not S_ISREG(orig_stat[ST_MODE]):
613 raise errors.StaleWriterStateError("{} not file".format(path))
615 now_stat = tuple(os.stat(path))
616 except OSError as error:
617 raise errors.StaleWriterStateError(
618 "failed to stat {}: {}".format(path, error))
619 if ((not S_ISREG(now_stat[ST_MODE])) or
620 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
621 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
622 raise errors.StaleWriterStateError("{} changed".format(path))
624 def dump_state(self, copy_func=lambda x: x):
625 state = {attr: copy_func(getattr(self, attr))
626 for attr in self.STATE_PROPS}
627 if self._queued_file is None:
628 state['_current_file'] = None
630 state['_current_file'] = (os.path.realpath(self._queued_file.name),
631 self._queued_file.tell())
634 def _queue_file(self, source, filename=None):
636 src_path = os.path.realpath(source)
638 raise errors.AssertionError("{} not a file path".format(source))
640 path_stat = os.stat(src_path)
641 except OSError as stat_error:
643 super(ResumableCollectionWriter, self)._queue_file(source, filename)
644 fd_stat = os.fstat(self._queued_file.fileno())
645 if not S_ISREG(fd_stat.st_mode):
646 # We won't be able to resume from this cache anyway, so don't
647 # worry about further checks.
648 self._dependencies[source] = tuple(fd_stat)
649 elif path_stat is None:
650 raise errors.AssertionError(
651 "could not stat {}: {}".format(source, stat_error))
652 elif path_stat.st_ino != fd_stat.st_ino:
653 raise errors.AssertionError(
654 "{} changed between open and stat calls".format(source))
656 self._dependencies[src_path] = tuple(fd_stat)
658 def write(self, data):
659 if self._queued_file is None:
660 raise errors.AssertionError(
661 "resumable writer can't accept unsourced data")
662 return super(ResumableCollectionWriter, self).write(data)