5 from collections import deque
8 from .arvfile import ArvadosFileBase
10 from .stream import StreamReader, split
15 _logger = logging.getLogger('arvados.collection')
17 def normalize_stream(s, stream):
19 sortedfiles = list(stream.keys())
26 if b[arvados.LOCATOR] not in blocks:
27 stream_tokens.append(b[arvados.LOCATOR])
28 blocks[b[arvados.LOCATOR]] = streamoffset
29 streamoffset += b[arvados.BLOCKSIZE]
31 if len(stream_tokens) == 1:
32 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
36 fout = f.replace(' ', '\\040')
37 for segment in stream[f]:
38 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
39 if current_span is None:
40 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
42 if segmentoffset == current_span[1]:
43 current_span[1] += segment[arvados.SEGMENTSIZE]
45 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
46 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
48 if current_span is not None:
49 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
52 stream_tokens.append("0:0:{0}".format(fout))
57 class CollectionBase(object):
61 def __exit__(self, exc_type, exc_value, traceback):
65 if self._keep_client is None:
66 self._keep_client = KeepClient(api_client=self._api_client,
67 num_retries=self.num_retries)
68 return self._keep_client
70 def stripped_manifest(self):
72 Return the manifest for the current collection with all
73 non-portable hints (i.e., permission signatures and other
74 hints other than size hints) removed from the locators.
76 raw = self.manifest_text()
78 for line in raw.split("\n"):
81 clean_fields = fields[:1] + [
82 (re.sub(r'\+[^\d][^\+]*', '', x)
83 if re.match(util.keep_locator_pattern, x)
86 clean += [' '.join(clean_fields), "\n"]
90 class CollectionReader(CollectionBase):
91 def __init__(self, manifest_locator_or_text, api_client=None,
92 keep_client=None, num_retries=0):
93 """Instantiate a CollectionReader.
95 This class parses Collection manifests to provide a simple interface
96 to read its underlying files.
99 * manifest_locator_or_text: One of a Collection UUID, portable data
100 hash, or full manifest text.
101 * api_client: The API client to use to look up Collections. If not
102 provided, CollectionReader will build one from available Arvados
104 * keep_client: The KeepClient to use to download Collection data.
105 If not provided, CollectionReader will build one from available
106 Arvados configuration.
107 * num_retries: The default number of times to retry failed
108 service requests. Default 0. You may change this value
109 after instantiation, but note those changes may not
110 propagate to related objects like the Keep client.
112 self._api_client = api_client
113 self._keep_client = keep_client
114 self.num_retries = num_retries
115 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
116 self._manifest_locator = manifest_locator_or_text
117 self._manifest_text = None
118 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
119 self._manifest_locator = manifest_locator_or_text
120 self._manifest_text = None
121 elif re.match(util.manifest_pattern, manifest_locator_or_text):
122 self._manifest_text = manifest_locator_or_text
123 self._manifest_locator = None
125 raise errors.ArgumentError(
126 "Argument to CollectionReader must be a manifest or a collection UUID")
129 def _populate_from_api_server(self):
130 # As in KeepClient itself, we must wait until the last
131 # possible moment to instantiate an API client, in order to
132 # avoid tripping up clients that don't have access to an API
133 # server. If we do build one, make sure our Keep client uses
134 # it. If instantiation fails, we'll fall back to the except
135 # clause, just like any other Collection lookup
136 # failure. Return an exception, or None if successful.
138 if self._api_client is None:
139 self._api_client = arvados.api('v1')
140 self._keep_client = None # Make a new one with the new api.
141 c = self._api_client.collections().get(
142 uuid=self._manifest_locator).execute(
143 num_retries=self.num_retries)
144 self._manifest_text = c['manifest_text']
146 except Exception as e:
149 def _populate_from_keep(self):
150 # Retrieve a manifest directly from Keep. This has a chance of
151 # working if [a] the locator includes a permission signature
152 # or [b] the Keep services are operating in world-readable
153 # mode. Return an exception, or None if successful.
155 self._manifest_text = self._my_keep().get(
156 self._manifest_locator, num_retries=self.num_retries)
157 except Exception as e:
161 if self._streams is not None:
164 error_via_keep = None
165 should_try_keep = ((self._manifest_text is None) and
166 util.keep_locator_pattern.match(
167 self._manifest_locator))
168 if ((self._manifest_text is None) and
169 util.signed_locator_pattern.match(self._manifest_locator)):
170 error_via_keep = self._populate_from_keep()
171 if self._manifest_text is None:
172 error_via_api = self._populate_from_api_server()
173 if error_via_api is not None and not should_try_keep:
175 if ((self._manifest_text is None) and
176 not error_via_keep and
178 # Looks like a keep locator, and we didn't already try keep above
179 error_via_keep = self._populate_from_keep()
180 if self._manifest_text is None:
182 raise arvados.errors.NotFoundError(
183 ("Failed to retrieve collection '{}' " +
184 "from either API server ({}) or Keep ({})."
186 self._manifest_locator,
189 self._streams = [sline.split()
190 for sline in self._manifest_text.split("\n")
198 for s in self.all_streams():
199 for f in s.all_files():
200 streamname, filename = split(s.name() + "/" + f.name())
201 if streamname not in streams:
202 streams[streamname] = {}
203 if filename not in streams[streamname]:
204 streams[streamname][filename] = []
206 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
208 self._streams = [normalize_stream(s, streams[s])
209 for s in sorted(streams)]
211 # Regenerate the manifest text based on the normalized streams
212 self._manifest_text = ''.join(
213 [StreamReader(stream, keep=self._my_keep()).manifest_text()
214 for stream in self._streams])
216 def open(self, streampath, filename=None):
217 """open(streampath[, filename]) -> file-like object
219 Pass in the path of a file to read from the Collection, either as a
220 single string or as two separate stream name and file name arguments.
221 This method returns a file-like object to read that file.
225 streampath, filename = split(streampath)
226 keep_client = self._my_keep()
227 for stream_s in self._streams:
228 stream = StreamReader(stream_s, keep_client,
229 num_retries=self.num_retries)
230 if stream.name() == streampath:
233 raise ValueError("stream '{}' not found in Collection".
236 return stream.files()[filename]
238 raise ValueError("file '{}' not found in Collection stream '{}'".
239 format(filename, streampath))
241 def all_streams(self):
243 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
244 for s in self._streams]
247 for s in self.all_streams():
248 for f in s.all_files():
251 def manifest_text(self, strip=False, normalize=False):
253 cr = CollectionReader(self.manifest_text())
255 return cr.manifest_text(strip=strip, normalize=False)
257 return self.stripped_manifest()
260 return self._manifest_text
263 class _WriterFile(ArvadosFileBase):
264 def __init__(self, coll_writer, name):
265 super(_WriterFile, self).__init__(name, 'wb')
266 self.dest = coll_writer
269 super(_WriterFile, self).close()
270 self.dest.finish_current_file()
272 @ArvadosFileBase._before_close
273 def write(self, data):
274 self.dest.write(data)
276 @ArvadosFileBase._before_close
277 def writelines(self, seq):
281 @ArvadosFileBase._before_close
283 self.dest.flush_data()
286 class CollectionWriter(CollectionBase):
287 KEEP_BLOCK_SIZE = 2**26
289 def __init__(self, api_client=None, num_retries=0):
290 """Instantiate a CollectionWriter.
292 CollectionWriter lets you build a new Arvados Collection from scratch.
293 Write files to it. The CollectionWriter will upload data to Keep as
294 appropriate, and provide you with the Collection manifest text when
298 * api_client: The API client to use to look up Collections. If not
299 provided, CollectionReader will build one from available Arvados
301 * num_retries: The default number of times to retry failed
302 service requests. Default 0. You may change this value
303 after instantiation, but note those changes may not
304 propagate to related objects like the Keep client.
306 self._api_client = api_client
307 self.num_retries = num_retries
308 self._keep_client = None
309 self._data_buffer = []
310 self._data_buffer_len = 0
311 self._current_stream_files = []
312 self._current_stream_length = 0
313 self._current_stream_locators = []
314 self._current_stream_name = '.'
315 self._current_file_name = None
316 self._current_file_pos = 0
317 self._finished_streams = []
318 self._close_file = None
319 self._queued_file = None
320 self._queued_dirents = deque()
321 self._queued_trees = deque()
322 self._last_open = None
324 def __exit__(self, exc_type, exc_value, traceback):
328 def do_queued_work(self):
329 # The work queue consists of three pieces:
330 # * _queued_file: The file object we're currently writing to the
332 # * _queued_dirents: Entries under the current directory
333 # (_queued_trees[0]) that we want to write or recurse through.
334 # This may contain files from subdirectories if
335 # max_manifest_depth == 0 for this directory.
336 # * _queued_trees: Directories that should be written as separate
337 # streams to the Collection.
338 # This function handles the smallest piece of work currently queued
339 # (current file, then current directory, then next directory) until
340 # no work remains. The _work_THING methods each do a unit of work on
341 # THING. _queue_THING methods add a THING to the work queue.
343 if self._queued_file:
345 elif self._queued_dirents:
347 elif self._queued_trees:
352 def _work_file(self):
354 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
358 self.finish_current_file()
360 self._queued_file.close()
361 self._close_file = None
362 self._queued_file = None
364 def _work_dirents(self):
365 path, stream_name, max_manifest_depth = self._queued_trees[0]
366 if stream_name != self.current_stream_name():
367 self.start_new_stream(stream_name)
368 while self._queued_dirents:
369 dirent = self._queued_dirents.popleft()
370 target = os.path.join(path, dirent)
371 if os.path.isdir(target):
372 self._queue_tree(target,
373 os.path.join(stream_name, dirent),
374 max_manifest_depth - 1)
376 self._queue_file(target, dirent)
378 if not self._queued_dirents:
379 self._queued_trees.popleft()
381 def _work_trees(self):
382 path, stream_name, max_manifest_depth = self._queued_trees[0]
383 d = util.listdir_recursive(
384 path, max_depth = (None if max_manifest_depth == 0 else 0))
386 self._queue_dirents(stream_name, d)
388 self._queued_trees.popleft()
390 def _queue_file(self, source, filename=None):
391 assert (self._queued_file is None), "tried to queue more than one file"
392 if not hasattr(source, 'read'):
393 source = open(source, 'rb')
394 self._close_file = True
396 self._close_file = False
398 filename = os.path.basename(source.name)
399 self.start_new_file(filename)
400 self._queued_file = source
402 def _queue_dirents(self, stream_name, dirents):
403 assert (not self._queued_dirents), "tried to queue more than one tree"
404 self._queued_dirents = deque(sorted(dirents))
406 def _queue_tree(self, path, stream_name, max_manifest_depth):
407 self._queued_trees.append((path, stream_name, max_manifest_depth))
409 def write_file(self, source, filename=None):
410 self._queue_file(source, filename)
411 self.do_queued_work()
413 def write_directory_tree(self,
414 path, stream_name='.', max_manifest_depth=-1):
415 self._queue_tree(path, stream_name, max_manifest_depth)
416 self.do_queued_work()
418 def write(self, newdata):
419 if hasattr(newdata, '__iter__'):
423 self._data_buffer.append(newdata)
424 self._data_buffer_len += len(newdata)
425 self._current_stream_length += len(newdata)
426 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
429 def open(self, streampath, filename=None):
430 """open(streampath[, filename]) -> file-like object
432 Pass in the path of a file to write to the Collection, either as a
433 single string or as two separate stream name and file name arguments.
434 This method returns a file-like object you can write to add it to the
437 You may only have one file object from the Collection open at a time,
438 so be sure to close the object when you're done. Using the object in
439 a with statement makes that easy::
441 with cwriter.open('./doc/page1.txt') as outfile:
442 outfile.write(page1_data)
443 with cwriter.open('./doc/page2.txt') as outfile:
444 outfile.write(page2_data)
447 streampath, filename = split(streampath)
448 if self._last_open and not self._last_open.closed:
449 raise errors.AssertionError(
450 "can't open '{}' when '{}' is still open".format(
451 filename, self._last_open.name))
452 if streampath != self.current_stream_name():
453 self.start_new_stream(streampath)
454 self.set_current_file_name(filename)
455 self._last_open = _WriterFile(self, filename)
456 return self._last_open
458 def flush_data(self):
459 data_buffer = ''.join(self._data_buffer)
461 self._current_stream_locators.append(
462 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
463 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
464 self._data_buffer_len = len(self._data_buffer[0])
466 def start_new_file(self, newfilename=None):
467 self.finish_current_file()
468 self.set_current_file_name(newfilename)
470 def set_current_file_name(self, newfilename):
471 if re.search(r'[\t\n]', newfilename):
472 raise errors.AssertionError(
473 "Manifest filenames cannot contain whitespace: %s" %
475 self._current_file_name = newfilename
477 def current_file_name(self):
478 return self._current_file_name
480 def finish_current_file(self):
481 if self._current_file_name is None:
482 if self._current_file_pos == self._current_stream_length:
484 raise errors.AssertionError(
485 "Cannot finish an unnamed file " +
486 "(%d bytes at offset %d in '%s' stream)" %
487 (self._current_stream_length - self._current_file_pos,
488 self._current_file_pos,
489 self._current_stream_name))
490 self._current_stream_files.append([
491 self._current_file_pos,
492 self._current_stream_length - self._current_file_pos,
493 self._current_file_name])
494 self._current_file_pos = self._current_stream_length
495 self._current_file_name = None
497 def start_new_stream(self, newstreamname='.'):
498 self.finish_current_stream()
499 self.set_current_stream_name(newstreamname)
501 def set_current_stream_name(self, newstreamname):
502 if re.search(r'[\t\n]', newstreamname):
503 raise errors.AssertionError(
504 "Manifest stream names cannot contain whitespace")
505 self._current_stream_name = '.' if newstreamname=='' else newstreamname
507 def current_stream_name(self):
508 return self._current_stream_name
510 def finish_current_stream(self):
511 self.finish_current_file()
513 if not self._current_stream_files:
515 elif self._current_stream_name is None:
516 raise errors.AssertionError(
517 "Cannot finish an unnamed stream (%d bytes in %d files)" %
518 (self._current_stream_length, len(self._current_stream_files)))
520 if not self._current_stream_locators:
521 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
522 self._finished_streams.append([self._current_stream_name,
523 self._current_stream_locators,
524 self._current_stream_files])
525 self._current_stream_files = []
526 self._current_stream_length = 0
527 self._current_stream_locators = []
528 self._current_stream_name = None
529 self._current_file_pos = 0
530 self._current_file_name = None
533 # Store the manifest in Keep and return its locator.
534 return self._my_keep().put(self.manifest_text())
536 def portable_data_hash(self):
537 stripped = self.stripped_manifest()
538 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
540 def manifest_text(self):
541 self.finish_current_stream()
544 for stream in self._finished_streams:
545 if not re.search(r'^\.(/.*)?$', stream[0]):
547 manifest += stream[0].replace(' ', '\\040')
548 manifest += ' ' + ' '.join(stream[1])
549 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
554 def data_locators(self):
556 for name, locators, files in self._finished_streams:
561 class ResumableCollectionWriter(CollectionWriter):
562 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
563 '_current_stream_locators', '_current_stream_name',
564 '_current_file_name', '_current_file_pos', '_close_file',
565 '_data_buffer', '_dependencies', '_finished_streams',
566 '_queued_dirents', '_queued_trees']
568 def __init__(self, api_client=None, num_retries=0):
569 self._dependencies = {}
570 super(ResumableCollectionWriter, self).__init__(
571 api_client, num_retries=num_retries)
574 def from_state(cls, state, *init_args, **init_kwargs):
575 # Try to build a new writer from scratch with the given state.
576 # If the state is not suitable to resume (because files have changed,
577 # been deleted, aren't predictable, etc.), raise a
578 # StaleWriterStateError. Otherwise, return the initialized writer.
579 # The caller is responsible for calling writer.do_queued_work()
580 # appropriately after it's returned.
581 writer = cls(*init_args, **init_kwargs)
582 for attr_name in cls.STATE_PROPS:
583 attr_value = state[attr_name]
584 attr_class = getattr(writer, attr_name).__class__
585 # Coerce the value into the same type as the initial value, if
587 if attr_class not in (type(None), attr_value.__class__):
588 attr_value = attr_class(attr_value)
589 setattr(writer, attr_name, attr_value)
590 # Check dependencies before we try to resume anything.
591 if any(KeepLocator(ls).permission_expired()
592 for ls in writer._current_stream_locators):
593 raise errors.StaleWriterStateError(
594 "locators include expired permission hint")
595 writer.check_dependencies()
596 if state['_current_file'] is not None:
597 path, pos = state['_current_file']
599 writer._queued_file = open(path, 'rb')
600 writer._queued_file.seek(pos)
601 except IOError as error:
602 raise errors.StaleWriterStateError(
603 "failed to reopen active file {}: {}".format(path, error))
606 def check_dependencies(self):
607 for path, orig_stat in self._dependencies.items():
608 if not S_ISREG(orig_stat[ST_MODE]):
609 raise errors.StaleWriterStateError("{} not file".format(path))
611 now_stat = tuple(os.stat(path))
612 except OSError as error:
613 raise errors.StaleWriterStateError(
614 "failed to stat {}: {}".format(path, error))
615 if ((not S_ISREG(now_stat[ST_MODE])) or
616 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
617 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
618 raise errors.StaleWriterStateError("{} changed".format(path))
620 def dump_state(self, copy_func=lambda x: x):
621 state = {attr: copy_func(getattr(self, attr))
622 for attr in self.STATE_PROPS}
623 if self._queued_file is None:
624 state['_current_file'] = None
626 state['_current_file'] = (os.path.realpath(self._queued_file.name),
627 self._queued_file.tell())
630 def _queue_file(self, source, filename=None):
632 src_path = os.path.realpath(source)
634 raise errors.AssertionError("{} not a file path".format(source))
636 path_stat = os.stat(src_path)
637 except OSError as stat_error:
639 super(ResumableCollectionWriter, self)._queue_file(source, filename)
640 fd_stat = os.fstat(self._queued_file.fileno())
641 if not S_ISREG(fd_stat.st_mode):
642 # We won't be able to resume from this cache anyway, so don't
643 # worry about further checks.
644 self._dependencies[source] = tuple(fd_stat)
645 elif path_stat is None:
646 raise errors.AssertionError(
647 "could not stat {}: {}".format(source, stat_error))
648 elif path_stat.st_ino != fd_stat.st_ino:
649 raise errors.AssertionError(
650 "{} changed between open and stat calls".format(source))
652 self._dependencies[src_path] = tuple(fd_stat)
654 def write(self, data):
655 if self._queued_file is None:
656 raise errors.AssertionError(
657 "resumable writer can't accept unsourced data")
658 return super(ResumableCollectionWriter, self).write(data)