21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span is None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span is not None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
67 stream_tokens.append("0:0:{0}".format(fout))
72 class CollectionBase(object):
80 if self._keep_client is None:
81 self._keep_client = KeepClient(api_client=self._api_client,
82 num_retries=self.num_retries)
83 return self._keep_client
85 def stripped_manifest(self):
87 Return the manifest for the current collection with all
88 non-portable hints (i.e., permission signatures and other
89 hints other than size hints) removed from the locators.
91 raw = self.manifest_text()
93 for line in raw.split("\n"):
96 clean_fields = fields[:1] + [
97 (re.sub(r'\+[^\d][^\+]*', '', x)
98 if re.match(util.keep_locator_pattern, x)
101 clean += [' '.join(clean_fields), "\n"]
102 return ''.join(clean)
105 class CollectionReader(CollectionBase):
106 def __init__(self, manifest_locator_or_text, api_client=None,
107 keep_client=None, num_retries=0):
108 """Instantiate a CollectionReader.
110 This class parses Collection manifests to provide a simple interface
111 to read its underlying files.
114 * manifest_locator_or_text: One of a Collection UUID, portable data
115 hash, or full manifest text.
116 * api_client: The API client to use to look up Collections. If not
117 provided, CollectionReader will build one from available Arvados
119 * keep_client: The KeepClient to use to download Collection data.
120 If not provided, CollectionReader will build one from available
121 Arvados configuration.
122 * num_retries: The default number of times to retry failed
123 service requests. Default 0. You may change this value
124 after instantiation, but note those changes may not
125 propagate to related objects like the Keep client.
127 self._api_client = api_client
128 self._keep_client = keep_client
129 self.num_retries = num_retries
130 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
131 self._manifest_locator = manifest_locator_or_text
132 self._manifest_text = None
133 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
134 self._manifest_locator = manifest_locator_or_text
135 self._manifest_text = None
136 elif re.match(util.manifest_pattern, manifest_locator_or_text):
137 self._manifest_text = manifest_locator_or_text
138 self._manifest_locator = None
140 raise errors.ArgumentError(
141 "Argument to CollectionReader must be a manifest or a collection UUID")
144 def _populate_from_api_server(self):
145 # As in KeepClient itself, we must wait until the last
146 # possible moment to instantiate an API client, in order to
147 # avoid tripping up clients that don't have access to an API
148 # server. If we do build one, make sure our Keep client uses
149 # it. If instantiation fails, we'll fall back to the except
150 # clause, just like any other Collection lookup
151 # failure. Return an exception, or None if successful.
153 if self._api_client is None:
154 self._api_client = arvados.api('v1')
155 self._keep_client = None # Make a new one with the new api.
156 c = self._api_client.collections().get(
157 uuid=self._manifest_locator).execute(
158 num_retries=self.num_retries)
159 self._manifest_text = c['manifest_text']
161 except Exception as e:
164 def _populate_from_keep(self):
165 # Retrieve a manifest directly from Keep. This has a chance of
166 # working if [a] the locator includes a permission signature
167 # or [b] the Keep services are operating in world-readable
168 # mode. Return an exception, or None if successful.
170 self._manifest_text = self._my_keep().get(
171 self._manifest_locator, num_retries=self.num_retries)
172 except Exception as e:
176 if self._streams is not None:
179 error_via_keep = None
180 should_try_keep = (not self._manifest_text and
181 util.keep_locator_pattern.match(
182 self._manifest_locator))
183 if (not self._manifest_text and
184 util.signed_locator_pattern.match(self._manifest_locator)):
185 error_via_keep = self._populate_from_keep()
186 if not self._manifest_text:
187 error_via_api = self._populate_from_api_server()
188 if error_via_api is not None and not should_try_keep:
190 if (not self._manifest_text and
191 not error_via_keep and
193 # Looks like a keep locator, and we didn't already try keep above
194 error_via_keep = self._populate_from_keep()
195 if not self._manifest_text:
197 raise arvados.errors.NotFoundError(
198 ("Failed to retrieve collection '{}' " +
199 "from either API server ({}) or Keep ({})."
201 self._manifest_locator,
204 self._streams = [sline.split()
205 for sline in self._manifest_text.split("\n")
213 for s in self.all_streams():
214 for f in s.all_files():
215 filestream = s.name() + "/" + f.name()
216 r = filestream.rindex("/")
217 streamname = filestream[:r]
218 filename = filestream[r+1:]
219 if streamname not in streams:
220 streams[streamname] = {}
221 if filename not in streams[streamname]:
222 streams[streamname][filename] = []
224 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
226 self._streams = [normalize_stream(s, streams[s])
227 for s in sorted(streams)]
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join(
231 [StreamReader(stream, keep=self._my_keep()).manifest_text()
232 for stream in self._streams])
234 def all_streams(self):
236 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237 for s in self._streams]
240 for s in self.all_streams():
241 for f in s.all_files():
244 def manifest_text(self, strip=False, normalize=False):
246 cr = CollectionReader(self.manifest_text())
248 return cr.manifest_text(strip=strip, normalize=False)
250 return self.stripped_manifest()
253 return self._manifest_text
256 class CollectionWriter(CollectionBase):
257 KEEP_BLOCK_SIZE = 2**26
259 def __init__(self, api_client=None, num_retries=0):
260 """Instantiate a CollectionWriter.
262 CollectionWriter lets you build a new Arvados Collection from scratch.
263 Write files to it. The CollectionWriter will upload data to Keep as
264 appropriate, and provide you with the Collection manifest text when
268 * api_client: The API client to use to look up Collections. If not
269 provided, CollectionReader will build one from available Arvados
271 * num_retries: The default number of times to retry failed
272 service requests. Default 0. You may change this value
273 after instantiation, but note those changes may not
274 propagate to related objects like the Keep client.
276 self._api_client = api_client
277 self.num_retries = num_retries
278 self._keep_client = None
279 self._data_buffer = []
280 self._data_buffer_len = 0
281 self._current_stream_files = []
282 self._current_stream_length = 0
283 self._current_stream_locators = []
284 self._current_stream_name = '.'
285 self._current_file_name = None
286 self._current_file_pos = 0
287 self._finished_streams = []
288 self._close_file = None
289 self._queued_file = None
290 self._queued_dirents = deque()
291 self._queued_trees = deque()
296 def do_queued_work(self):
297 # The work queue consists of three pieces:
298 # * _queued_file: The file object we're currently writing to the
300 # * _queued_dirents: Entries under the current directory
301 # (_queued_trees[0]) that we want to write or recurse through.
302 # This may contain files from subdirectories if
303 # max_manifest_depth == 0 for this directory.
304 # * _queued_trees: Directories that should be written as separate
305 # streams to the Collection.
306 # This function handles the smallest piece of work currently queued
307 # (current file, then current directory, then next directory) until
308 # no work remains. The _work_THING methods each do a unit of work on
309 # THING. _queue_THING methods add a THING to the work queue.
311 if self._queued_file:
313 elif self._queued_dirents:
315 elif self._queued_trees:
320 def _work_file(self):
322 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
326 self.finish_current_file()
328 self._queued_file.close()
329 self._close_file = None
330 self._queued_file = None
332 def _work_dirents(self):
333 path, stream_name, max_manifest_depth = self._queued_trees[0]
334 if stream_name != self.current_stream_name():
335 self.start_new_stream(stream_name)
336 while self._queued_dirents:
337 dirent = self._queued_dirents.popleft()
338 target = os.path.join(path, dirent)
339 if os.path.isdir(target):
340 self._queue_tree(target,
341 os.path.join(stream_name, dirent),
342 max_manifest_depth - 1)
344 self._queue_file(target, dirent)
346 if not self._queued_dirents:
347 self._queued_trees.popleft()
349 def _work_trees(self):
350 path, stream_name, max_manifest_depth = self._queued_trees[0]
351 d = util.listdir_recursive(
352 path, max_depth = (None if max_manifest_depth == 0 else 0))
354 self._queue_dirents(stream_name, d)
356 self._queued_trees.popleft()
358 def _queue_file(self, source, filename=None):
359 assert (self._queued_file is None), "tried to queue more than one file"
360 if not hasattr(source, 'read'):
361 source = open(source, 'rb')
362 self._close_file = True
364 self._close_file = False
366 filename = os.path.basename(source.name)
367 self.start_new_file(filename)
368 self._queued_file = source
370 def _queue_dirents(self, stream_name, dirents):
371 assert (not self._queued_dirents), "tried to queue more than one tree"
372 self._queued_dirents = deque(sorted(dirents))
374 def _queue_tree(self, path, stream_name, max_manifest_depth):
375 self._queued_trees.append((path, stream_name, max_manifest_depth))
377 def write_file(self, source, filename=None):
378 self._queue_file(source, filename)
379 self.do_queued_work()
381 def write_directory_tree(self,
382 path, stream_name='.', max_manifest_depth=-1):
383 self._queue_tree(path, stream_name, max_manifest_depth)
384 self.do_queued_work()
386 def write(self, newdata):
387 if hasattr(newdata, '__iter__'):
391 self._data_buffer.append(newdata)
392 self._data_buffer_len += len(newdata)
393 self._current_stream_length += len(newdata)
394 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
397 def flush_data(self):
398 data_buffer = ''.join(self._data_buffer)
400 self._current_stream_locators.append(
401 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
402 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
403 self._data_buffer_len = len(self._data_buffer[0])
405 def start_new_file(self, newfilename=None):
406 self.finish_current_file()
407 self.set_current_file_name(newfilename)
409 def set_current_file_name(self, newfilename):
410 if re.search(r'[\t\n]', newfilename):
411 raise errors.AssertionError(
412 "Manifest filenames cannot contain whitespace: %s" %
414 self._current_file_name = newfilename
416 def current_file_name(self):
417 return self._current_file_name
419 def finish_current_file(self):
420 if self._current_file_name is None:
421 if self._current_file_pos == self._current_stream_length:
423 raise errors.AssertionError(
424 "Cannot finish an unnamed file " +
425 "(%d bytes at offset %d in '%s' stream)" %
426 (self._current_stream_length - self._current_file_pos,
427 self._current_file_pos,
428 self._current_stream_name))
429 self._current_stream_files.append([
430 self._current_file_pos,
431 self._current_stream_length - self._current_file_pos,
432 self._current_file_name])
433 self._current_file_pos = self._current_stream_length
434 self._current_file_name = None
436 def start_new_stream(self, newstreamname='.'):
437 self.finish_current_stream()
438 self.set_current_stream_name(newstreamname)
440 def set_current_stream_name(self, newstreamname):
441 if re.search(r'[\t\n]', newstreamname):
442 raise errors.AssertionError(
443 "Manifest stream names cannot contain whitespace")
444 self._current_stream_name = '.' if newstreamname=='' else newstreamname
446 def current_stream_name(self):
447 return self._current_stream_name
449 def finish_current_stream(self):
450 self.finish_current_file()
452 if not self._current_stream_files:
454 elif self._current_stream_name is None:
455 raise errors.AssertionError(
456 "Cannot finish an unnamed stream (%d bytes in %d files)" %
457 (self._current_stream_length, len(self._current_stream_files)))
459 if not self._current_stream_locators:
460 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
461 self._finished_streams.append([self._current_stream_name,
462 self._current_stream_locators,
463 self._current_stream_files])
464 self._current_stream_files = []
465 self._current_stream_length = 0
466 self._current_stream_locators = []
467 self._current_stream_name = None
468 self._current_file_pos = 0
469 self._current_file_name = None
472 # Store the manifest in Keep and return its locator.
473 return self._my_keep().put(self.manifest_text())
475 def portable_data_hash(self):
476 stripped = self.stripped_manifest()
477 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
479 def manifest_text(self):
480 self.finish_current_stream()
483 for stream in self._finished_streams:
484 if not re.search(r'^\.(/.*)?$', stream[0]):
486 manifest += stream[0].replace(' ', '\\040')
487 manifest += ' ' + ' '.join(stream[1])
488 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
493 def data_locators(self):
495 for name, locators, files in self._finished_streams:
500 class ResumableCollectionWriter(CollectionWriter):
501 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
502 '_current_stream_locators', '_current_stream_name',
503 '_current_file_name', '_current_file_pos', '_close_file',
504 '_data_buffer', '_dependencies', '_finished_streams',
505 '_queued_dirents', '_queued_trees']
507 def __init__(self, api_client=None, num_retries=0):
508 self._dependencies = {}
509 super(ResumableCollectionWriter, self).__init__(
510 api_client, num_retries=num_retries)
513 def from_state(cls, state, *init_args, **init_kwargs):
514 # Try to build a new writer from scratch with the given state.
515 # If the state is not suitable to resume (because files have changed,
516 # been deleted, aren't predictable, etc.), raise a
517 # StaleWriterStateError. Otherwise, return the initialized writer.
518 # The caller is responsible for calling writer.do_queued_work()
519 # appropriately after it's returned.
520 writer = cls(*init_args, **init_kwargs)
521 for attr_name in cls.STATE_PROPS:
522 attr_value = state[attr_name]
523 attr_class = getattr(writer, attr_name).__class__
524 # Coerce the value into the same type as the initial value, if
526 if attr_class not in (type(None), attr_value.__class__):
527 attr_value = attr_class(attr_value)
528 setattr(writer, attr_name, attr_value)
529 # Check dependencies before we try to resume anything.
530 if any(KeepLocator(ls).permission_expired()
531 for ls in writer._current_stream_locators):
532 raise errors.StaleWriterStateError(
533 "locators include expired permission hint")
534 writer.check_dependencies()
535 if state['_current_file'] is not None:
536 path, pos = state['_current_file']
538 writer._queued_file = open(path, 'rb')
539 writer._queued_file.seek(pos)
540 except IOError as error:
541 raise errors.StaleWriterStateError(
542 "failed to reopen active file {}: {}".format(path, error))
545 def check_dependencies(self):
546 for path, orig_stat in self._dependencies.items():
547 if not S_ISREG(orig_stat[ST_MODE]):
548 raise errors.StaleWriterStateError("{} not file".format(path))
550 now_stat = tuple(os.stat(path))
551 except OSError as error:
552 raise errors.StaleWriterStateError(
553 "failed to stat {}: {}".format(path, error))
554 if ((not S_ISREG(now_stat[ST_MODE])) or
555 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
556 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
557 raise errors.StaleWriterStateError("{} changed".format(path))
559 def dump_state(self, copy_func=lambda x: x):
560 state = {attr: copy_func(getattr(self, attr))
561 for attr in self.STATE_PROPS}
562 if self._queued_file is None:
563 state['_current_file'] = None
565 state['_current_file'] = (os.path.realpath(self._queued_file.name),
566 self._queued_file.tell())
569 def _queue_file(self, source, filename=None):
571 src_path = os.path.realpath(source)
573 raise errors.AssertionError("{} not a file path".format(source))
575 path_stat = os.stat(src_path)
576 except OSError as stat_error:
578 super(ResumableCollectionWriter, self)._queue_file(source, filename)
579 fd_stat = os.fstat(self._queued_file.fileno())
580 if not S_ISREG(fd_stat.st_mode):
581 # We won't be able to resume from this cache anyway, so don't
582 # worry about further checks.
583 self._dependencies[source] = tuple(fd_stat)
584 elif path_stat is None:
585 raise errors.AssertionError(
586 "could not stat {}: {}".format(source, stat_error))
587 elif path_stat.st_ino != fd_stat.st_ino:
588 raise errors.AssertionError(
589 "{} changed between open and stat calls".format(source))
591 self._dependencies[src_path] = tuple(fd_stat)
593 def write(self, data):
594 if self._queued_file is None:
595 raise errors.AssertionError(
596 "resumable writer can't accept unsourced data")
597 return super(ResumableCollectionWriter, self).write(data)