21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
67 stream_tokens.append("0:0:{0}".format(fout))
72 class CollectionBase(object):
80 if self._keep_client is None:
81 self._keep_client = KeepClient(api_client=self._api_client,
82 num_retries=self.num_retries)
83 return self._keep_client
85 def stripped_manifest(self):
87 Return the manifest for the current collection with all
88 non-portable hints (i.e., permission signatures and other
89 hints other than size hints) removed from the locators.
91 raw = self.manifest_text()
93 for line in raw.split("\n"):
96 clean_fields = fields[:1] + [
97 (re.sub(r'\+[^\d][^\+]*', '', x)
98 if re.match(util.keep_locator_pattern, x)
101 clean += [' '.join(clean_fields), "\n"]
102 return ''.join(clean)
105 class CollectionReader(CollectionBase):
106 def __init__(self, manifest_locator_or_text, api_client=None,
107 keep_client=None, num_retries=0):
108 """Instantiate a CollectionReader.
110 This class parses Collection manifests to provide a simple interface
111 to read its underlying files.
114 * manifest_locator_or_text: One of a Collection UUID, portable data
115 hash, or full manifest text.
116 * api_client: The API client to use to look up Collections. If not
117 provided, CollectionReader will build one from available Arvados
119 * keep_client: The KeepClient to use to download Collection data.
120 If not provided, CollectionReader will build one from available
121 Arvados configuration.
122 * num_retries: The default number of times to retry failed
123 service requests. Default 0. You may change this value
124 after instantiation, but note those changes may not
125 propagate to related objects like the Keep client.
127 self._api_client = api_client
128 self._keep_client = keep_client
129 self.num_retries = num_retries
130 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
131 self._manifest_locator = manifest_locator_or_text
132 self._manifest_text = None
133 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
134 self._manifest_locator = manifest_locator_or_text
135 self._manifest_text = None
136 elif re.match(util.manifest_pattern, manifest_locator_or_text):
137 self._manifest_text = manifest_locator_or_text
138 self._manifest_locator = None
140 raise errors.ArgumentError(
141 "Argument to CollectionReader must be a manifest or a collection UUID")
144 def _populate_from_api_server(self):
145 # As in KeepClient itself, we must wait until the last
146 # possible moment to instantiate an API client, in order to
147 # avoid tripping up clients that don't have access to an API
148 # server. If we do build one, make sure our Keep client uses
149 # it. If instantiation fails, we'll fall back to the except
150 # clause, just like any other Collection lookup
151 # failure. Return an exception, or None if successful.
153 if self._api_client is None:
154 self._api_client = arvados.api('v1')
155 self._keep_client = None # Make a new one with the new api.
156 c = self._api_client.collections().get(
157 uuid=self._manifest_locator).execute(
158 num_retries=self.num_retries)
159 self._manifest_text = c['manifest_text']
161 except Exception as e:
164 def _populate_from_keep(self):
165 # Retrieve a manifest directly from Keep. This has a chance of
166 # working if [a] the locator includes a permission signature
167 # or [b] the Keep services are operating in world-readable
168 # mode. Return an exception, or None if successful.
170 self._manifest_text = self._my_keep().get(
171 self._manifest_locator, num_retries=self.num_retries)
172 except Exception as e:
176 if self._streams is not None:
179 error_via_keep = None
180 should_try_keep = (not self._manifest_text and
181 util.keep_locator_pattern.match(
182 self._manifest_locator))
183 if (not self._manifest_text and
184 util.signed_locator_pattern.match(self._manifest_locator)):
185 error_via_keep = self._populate_from_keep()
186 if not self._manifest_text:
187 error_via_api = self._populate_from_api_server()
188 if error_via_api != None and not should_try_keep:
190 if (not self._manifest_text and
191 not error_via_keep and
193 # Looks like a keep locator, and we didn't already try keep above
194 error_via_keep = self._populate_from_keep()
195 if not self._manifest_text:
197 raise arvados.errors.NotFoundError(
198 ("Failed to retrieve collection '{}' " +
199 "from either API server ({}) or Keep ({})."
201 self._manifest_locator,
204 self._streams = [sline.split()
205 for sline in self._manifest_text.split("\n")
213 for s in self.all_streams():
214 for f in s.all_files():
215 filestream = s.name() + "/" + f.name()
216 r = filestream.rindex("/")
217 streamname = filestream[:r]
218 filename = filestream[r+1:]
219 if streamname not in streams:
220 streams[streamname] = {}
221 if filename not in streams[streamname]:
222 streams[streamname][filename] = []
224 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
226 self._streams = [normalize_stream(s, streams[s])
227 for s in sorted(streams)]
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join(
231 [StreamReader(stream, keep=self._my_keep()).manifest_text()
232 for stream in self._streams])
234 def all_streams(self):
236 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237 for s in self._streams]
240 for s in self.all_streams():
241 for f in s.all_files():
244 def manifest_text(self, strip=False, normalize=False):
246 cr = CollectionReader(self.manifest_text())
248 return cr.manifest_text(strip=strip, normalize=False)
250 return self.stripped_manifest()
253 return self._manifest_text
256 class CollectionWriter(CollectionBase):
257 KEEP_BLOCK_SIZE = 2**26
259 def __init__(self, api_client=None, num_retries=0):
260 """Instantiate a CollectionWriter.
262 CollectionWriter lets you build a new Arvados Collection from scratch.
263 Write files to it. The CollectionWriter will upload data to Keep as
264 appropriate, and provide you with the Collection manifest text when
268 * api_client: The API client to use to look up Collections. If not
269 provided, CollectionReader will build one from available Arvados
271 * num_retries: The default number of times to retry failed
272 service requests. Default 0. You may change this value
273 after instantiation, but note those changes may not
274 propagate to related objects like the Keep client.
276 self._api_client = api_client
277 self.num_retries = num_retries
278 self._keep_client = None
279 self._data_buffer = []
280 self._data_buffer_len = 0
281 self._current_stream_files = []
282 self._current_stream_length = 0
283 self._current_stream_locators = []
284 self._current_stream_name = '.'
285 self._current_file_name = None
286 self._current_file_pos = 0
287 self._finished_streams = []
288 self._close_file = None
289 self._queued_file = None
290 self._queued_dirents = deque()
291 self._queued_trees = deque()
296 def do_queued_work(self):
297 # The work queue consists of three pieces:
298 # * _queued_file: The file object we're currently writing to the
300 # * _queued_dirents: Entries under the current directory
301 # (_queued_trees[0]) that we want to write or recurse through.
302 # This may contain files from subdirectories if
303 # max_manifest_depth == 0 for this directory.
304 # * _queued_trees: Directories that should be written as separate
305 # streams to the Collection.
306 # This function handles the smallest piece of work currently queued
307 # (current file, then current directory, then next directory) until
308 # no work remains. The _work_THING methods each do a unit of work on
309 # THING. _queue_THING methods add a THING to the work queue.
311 if self._queued_file:
313 elif self._queued_dirents:
315 elif self._queued_trees:
320 def _work_file(self):
322 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
326 self.finish_current_file()
328 self._queued_file.close()
329 self._close_file = None
330 self._queued_file = None
332 def _work_dirents(self):
333 path, stream_name, max_manifest_depth = self._queued_trees[0]
334 if stream_name != self.current_stream_name():
335 self.start_new_stream(stream_name)
336 while self._queued_dirents:
337 dirent = self._queued_dirents.popleft()
338 target = os.path.join(path, dirent)
339 if os.path.isdir(target):
340 self._queue_tree(target,
341 os.path.join(stream_name, dirent),
342 max_manifest_depth - 1)
344 self._queue_file(target, dirent)
346 if not self._queued_dirents:
347 self._queued_trees.popleft()
349 def _work_trees(self):
350 path, stream_name, max_manifest_depth = self._queued_trees[0]
351 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
353 d = make_dirents(path)
355 self._queue_dirents(stream_name, d)
357 self._queued_trees.popleft()
359 def _queue_file(self, source, filename=None):
360 assert (self._queued_file is None), "tried to queue more than one file"
361 if not hasattr(source, 'read'):
362 source = open(source, 'rb')
363 self._close_file = True
365 self._close_file = False
367 filename = os.path.basename(source.name)
368 self.start_new_file(filename)
369 self._queued_file = source
371 def _queue_dirents(self, stream_name, dirents):
372 assert (not self._queued_dirents), "tried to queue more than one tree"
373 self._queued_dirents = deque(sorted(dirents))
375 def _queue_tree(self, path, stream_name, max_manifest_depth):
376 self._queued_trees.append((path, stream_name, max_manifest_depth))
378 def write_file(self, source, filename=None):
379 self._queue_file(source, filename)
380 self.do_queued_work()
382 def write_directory_tree(self,
383 path, stream_name='.', max_manifest_depth=-1):
384 self._queue_tree(path, stream_name, max_manifest_depth)
385 self.do_queued_work()
387 def write(self, newdata):
388 if hasattr(newdata, '__iter__'):
392 self._data_buffer.append(newdata)
393 self._data_buffer_len += len(newdata)
394 self._current_stream_length += len(newdata)
395 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
398 def flush_data(self):
399 data_buffer = ''.join(self._data_buffer)
401 self._current_stream_locators.append(
402 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
403 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
404 self._data_buffer_len = len(self._data_buffer[0])
406 def start_new_file(self, newfilename=None):
407 self.finish_current_file()
408 self.set_current_file_name(newfilename)
410 def set_current_file_name(self, newfilename):
411 if re.search(r'[\t\n]', newfilename):
412 raise errors.AssertionError(
413 "Manifest filenames cannot contain whitespace: %s" %
415 self._current_file_name = newfilename
417 def current_file_name(self):
418 return self._current_file_name
420 def finish_current_file(self):
421 if self._current_file_name == None:
422 if self._current_file_pos == self._current_stream_length:
424 raise errors.AssertionError(
425 "Cannot finish an unnamed file " +
426 "(%d bytes at offset %d in '%s' stream)" %
427 (self._current_stream_length - self._current_file_pos,
428 self._current_file_pos,
429 self._current_stream_name))
430 self._current_stream_files.append([
431 self._current_file_pos,
432 self._current_stream_length - self._current_file_pos,
433 self._current_file_name])
434 self._current_file_pos = self._current_stream_length
435 self._current_file_name = None
437 def start_new_stream(self, newstreamname='.'):
438 self.finish_current_stream()
439 self.set_current_stream_name(newstreamname)
441 def set_current_stream_name(self, newstreamname):
442 if re.search(r'[\t\n]', newstreamname):
443 raise errors.AssertionError(
444 "Manifest stream names cannot contain whitespace")
445 self._current_stream_name = '.' if newstreamname=='' else newstreamname
447 def current_stream_name(self):
448 return self._current_stream_name
450 def finish_current_stream(self):
451 self.finish_current_file()
453 if not self._current_stream_files:
455 elif self._current_stream_name is None:
456 raise errors.AssertionError(
457 "Cannot finish an unnamed stream (%d bytes in %d files)" %
458 (self._current_stream_length, len(self._current_stream_files)))
460 if not self._current_stream_locators:
461 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
462 self._finished_streams.append([self._current_stream_name,
463 self._current_stream_locators,
464 self._current_stream_files])
465 self._current_stream_files = []
466 self._current_stream_length = 0
467 self._current_stream_locators = []
468 self._current_stream_name = None
469 self._current_file_pos = 0
470 self._current_file_name = None
473 # Store the manifest in Keep and return its locator.
474 return self._my_keep().put(self.manifest_text())
476 def portable_data_hash(self):
477 stripped = self.stripped_manifest()
478 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
480 def manifest_text(self):
481 self.finish_current_stream()
484 for stream in self._finished_streams:
485 if not re.search(r'^\.(/.*)?$', stream[0]):
487 manifest += stream[0].replace(' ', '\\040')
488 manifest += ' ' + ' '.join(stream[1])
489 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
497 def data_locators(self):
499 for name, locators, files in self._finished_streams:
504 class ResumableCollectionWriter(CollectionWriter):
505 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
506 '_current_stream_locators', '_current_stream_name',
507 '_current_file_name', '_current_file_pos', '_close_file',
508 '_data_buffer', '_dependencies', '_finished_streams',
509 '_queued_dirents', '_queued_trees']
511 def __init__(self, api_client=None, num_retries=0):
512 self._dependencies = {}
513 super(ResumableCollectionWriter, self).__init__(
514 api_client, num_retries=num_retries)
517 def from_state(cls, state, *init_args, **init_kwargs):
518 # Try to build a new writer from scratch with the given state.
519 # If the state is not suitable to resume (because files have changed,
520 # been deleted, aren't predictable, etc.), raise a
521 # StaleWriterStateError. Otherwise, return the initialized writer.
522 # The caller is responsible for calling writer.do_queued_work()
523 # appropriately after it's returned.
524 writer = cls(*init_args, **init_kwargs)
525 for attr_name in cls.STATE_PROPS:
526 attr_value = state[attr_name]
527 attr_class = getattr(writer, attr_name).__class__
528 # Coerce the value into the same type as the initial value, if
530 if attr_class not in (type(None), attr_value.__class__):
531 attr_value = attr_class(attr_value)
532 setattr(writer, attr_name, attr_value)
533 # Check dependencies before we try to resume anything.
534 if any(KeepLocator(ls).permission_expired()
535 for ls in writer._current_stream_locators):
536 raise errors.StaleWriterStateError(
537 "locators include expired permission hint")
538 writer.check_dependencies()
539 if state['_current_file'] is not None:
540 path, pos = state['_current_file']
542 writer._queued_file = open(path, 'rb')
543 writer._queued_file.seek(pos)
544 except IOError as error:
545 raise errors.StaleWriterStateError(
546 "failed to reopen active file {}: {}".format(path, error))
549 def check_dependencies(self):
550 for path, orig_stat in self._dependencies.items():
551 if not S_ISREG(orig_stat[ST_MODE]):
552 raise errors.StaleWriterStateError("{} not file".format(path))
554 now_stat = tuple(os.stat(path))
555 except OSError as error:
556 raise errors.StaleWriterStateError(
557 "failed to stat {}: {}".format(path, error))
558 if ((not S_ISREG(now_stat[ST_MODE])) or
559 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
560 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
561 raise errors.StaleWriterStateError("{} changed".format(path))
563 def dump_state(self, copy_func=lambda x: x):
564 state = {attr: copy_func(getattr(self, attr))
565 for attr in self.STATE_PROPS}
566 if self._queued_file is None:
567 state['_current_file'] = None
569 state['_current_file'] = (os.path.realpath(self._queued_file.name),
570 self._queued_file.tell())
573 def _queue_file(self, source, filename=None):
575 src_path = os.path.realpath(source)
577 raise errors.AssertionError("{} not a file path".format(source))
579 path_stat = os.stat(src_path)
580 except OSError as stat_error:
582 super(ResumableCollectionWriter, self)._queue_file(source, filename)
583 fd_stat = os.fstat(self._queued_file.fileno())
584 if not S_ISREG(fd_stat.st_mode):
585 # We won't be able to resume from this cache anyway, so don't
586 # worry about further checks.
587 self._dependencies[source] = tuple(fd_stat)
588 elif path_stat is None:
589 raise errors.AssertionError(
590 "could not stat {}: {}".format(source, stat_error))
591 elif path_stat.st_ino != fd_stat.st_ino:
592 raise errors.AssertionError(
593 "{} changed between open and stat calls".format(source))
595 self._dependencies[src_path] = tuple(fd_stat)
597 def write(self, data):
598 if self._queued_file is None:
599 raise errors.AssertionError(
600 "resumable writer can't accept unsourced data")
601 return super(ResumableCollectionWriter, self).write(data)