19 from collections import deque
28 _logger = logging.getLogger('arvados.collection')
30 def normalize_stream(s, stream):
32 sortedfiles = list(stream.keys())
39 if b[arvados.LOCATOR] not in blocks:
40 stream_tokens.append(b[arvados.LOCATOR])
41 blocks[b[arvados.LOCATOR]] = streamoffset
42 streamoffset += b[arvados.BLOCKSIZE]
44 if len(stream_tokens) == 1:
45 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
49 fout = f.replace(' ', '\\040')
50 for segment in stream[f]:
51 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
52 if current_span is None:
53 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
55 if segmentoffset == current_span[1]:
56 current_span[1] += segment[arvados.SEGMENTSIZE]
58 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
61 if current_span is not None:
62 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65 stream_tokens.append("0:0:{0}".format(fout))
70 class CollectionBase(object):
78 if self._keep_client is None:
79 self._keep_client = KeepClient(api_client=self._api_client,
80 num_retries=self.num_retries)
81 return self._keep_client
83 def stripped_manifest(self):
85 Return the manifest for the current collection with all
86 non-portable hints (i.e., permission signatures and other
87 hints other than size hints) removed from the locators.
89 raw = self.manifest_text()
91 for line in raw.split("\n"):
94 clean_fields = fields[:1] + [
95 (re.sub(r'\+[^\d][^\+]*', '', x)
96 if re.match(util.keep_locator_pattern, x)
99 clean += [' '.join(clean_fields), "\n"]
100 return ''.join(clean)
103 class CollectionReader(CollectionBase):
104 def __init__(self, manifest_locator_or_text, api_client=None,
105 keep_client=None, num_retries=0):
106 """Instantiate a CollectionReader.
108 This class parses Collection manifests to provide a simple interface
109 to read its underlying files.
112 * manifest_locator_or_text: One of a Collection UUID, portable data
113 hash, or full manifest text.
114 * api_client: The API client to use to look up Collections. If not
115 provided, CollectionReader will build one from available Arvados
117 * keep_client: The KeepClient to use to download Collection data.
118 If not provided, CollectionReader will build one from available
119 Arvados configuration.
120 * num_retries: The default number of times to retry failed
121 service requests. Default 0. You may change this value
122 after instantiation, but note those changes may not
123 propagate to related objects like the Keep client.
125 self._api_client = api_client
126 self._keep_client = keep_client
127 self.num_retries = num_retries
128 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
129 self._manifest_locator = manifest_locator_or_text
130 self._manifest_text = None
131 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
132 self._manifest_locator = manifest_locator_or_text
133 self._manifest_text = None
134 elif re.match(util.manifest_pattern, manifest_locator_or_text):
135 self._manifest_text = manifest_locator_or_text
136 self._manifest_locator = None
138 raise errors.ArgumentError(
139 "Argument to CollectionReader must be a manifest or a collection UUID")
142 def _populate_from_api_server(self):
143 # As in KeepClient itself, we must wait until the last
144 # possible moment to instantiate an API client, in order to
145 # avoid tripping up clients that don't have access to an API
146 # server. If we do build one, make sure our Keep client uses
147 # it. If instantiation fails, we'll fall back to the except
148 # clause, just like any other Collection lookup
149 # failure. Return an exception, or None if successful.
151 if self._api_client is None:
152 self._api_client = arvados.api('v1')
153 self._keep_client = None # Make a new one with the new api.
154 c = self._api_client.collections().get(
155 uuid=self._manifest_locator).execute(
156 num_retries=self.num_retries)
157 self._manifest_text = c['manifest_text']
159 except Exception as e:
162 def _populate_from_keep(self):
163 # Retrieve a manifest directly from Keep. This has a chance of
164 # working if [a] the locator includes a permission signature
165 # or [b] the Keep services are operating in world-readable
166 # mode. Return an exception, or None if successful.
168 self._manifest_text = self._my_keep().get(
169 self._manifest_locator, num_retries=self.num_retries)
170 except Exception as e:
174 if self._streams is not None:
177 error_via_keep = None
178 should_try_keep = ((self._manifest_text is None) and
179 util.keep_locator_pattern.match(
180 self._manifest_locator))
181 if ((self._manifest_text is None) and
182 util.signed_locator_pattern.match(self._manifest_locator)):
183 error_via_keep = self._populate_from_keep()
184 if self._manifest_text is None:
185 error_via_api = self._populate_from_api_server()
186 if error_via_api is not None and not should_try_keep:
188 if ((self._manifest_text is None) and
189 not error_via_keep and
191 # Looks like a keep locator, and we didn't already try keep above
192 error_via_keep = self._populate_from_keep()
193 if self._manifest_text is None:
195 raise arvados.errors.NotFoundError(
196 ("Failed to retrieve collection '{}' " +
197 "from either API server ({}) or Keep ({})."
199 self._manifest_locator,
202 self._streams = [sline.split()
203 for sline in self._manifest_text.split("\n")
211 for s in self.all_streams():
212 for f in s.all_files():
213 filestream = s.name() + "/" + f.name()
214 r = filestream.rindex("/")
215 streamname = filestream[:r]
216 filename = filestream[r+1:]
217 if streamname not in streams:
218 streams[streamname] = {}
219 if filename not in streams[streamname]:
220 streams[streamname][filename] = []
222 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
224 self._streams = [normalize_stream(s, streams[s])
225 for s in sorted(streams)]
227 # Regenerate the manifest text based on the normalized streams
228 self._manifest_text = ''.join(
229 [StreamReader(stream, keep=self._my_keep()).manifest_text()
230 for stream in self._streams])
232 def all_streams(self):
234 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
235 for s in self._streams]
238 for s in self.all_streams():
239 for f in s.all_files():
242 def manifest_text(self, strip=False, normalize=False):
244 cr = CollectionReader(self.manifest_text())
246 return cr.manifest_text(strip=strip, normalize=False)
248 return self.stripped_manifest()
251 return self._manifest_text
254 class CollectionWriter(CollectionBase):
255 KEEP_BLOCK_SIZE = 2**26
257 def __init__(self, api_client=None, num_retries=0):
258 """Instantiate a CollectionWriter.
260 CollectionWriter lets you build a new Arvados Collection from scratch.
261 Write files to it. The CollectionWriter will upload data to Keep as
262 appropriate, and provide you with the Collection manifest text when
266 * api_client: The API client to use to look up Collections. If not
267 provided, CollectionReader will build one from available Arvados
269 * num_retries: The default number of times to retry failed
270 service requests. Default 0. You may change this value
271 after instantiation, but note those changes may not
272 propagate to related objects like the Keep client.
274 self._api_client = api_client
275 self.num_retries = num_retries
276 self._keep_client = None
277 self._data_buffer = []
278 self._data_buffer_len = 0
279 self._current_stream_files = []
280 self._current_stream_length = 0
281 self._current_stream_locators = []
282 self._current_stream_name = '.'
283 self._current_file_name = None
284 self._current_file_pos = 0
285 self._finished_streams = []
286 self._close_file = None
287 self._queued_file = None
288 self._queued_dirents = deque()
289 self._queued_trees = deque()
294 def do_queued_work(self):
295 # The work queue consists of three pieces:
296 # * _queued_file: The file object we're currently writing to the
298 # * _queued_dirents: Entries under the current directory
299 # (_queued_trees[0]) that we want to write or recurse through.
300 # This may contain files from subdirectories if
301 # max_manifest_depth == 0 for this directory.
302 # * _queued_trees: Directories that should be written as separate
303 # streams to the Collection.
304 # This function handles the smallest piece of work currently queued
305 # (current file, then current directory, then next directory) until
306 # no work remains. The _work_THING methods each do a unit of work on
307 # THING. _queue_THING methods add a THING to the work queue.
309 if self._queued_file:
311 elif self._queued_dirents:
313 elif self._queued_trees:
318 def _work_file(self):
320 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
324 self.finish_current_file()
326 self._queued_file.close()
327 self._close_file = None
328 self._queued_file = None
330 def _work_dirents(self):
331 path, stream_name, max_manifest_depth = self._queued_trees[0]
332 if stream_name != self.current_stream_name():
333 self.start_new_stream(stream_name)
334 while self._queued_dirents:
335 dirent = self._queued_dirents.popleft()
336 target = os.path.join(path, dirent)
337 if os.path.isdir(target):
338 self._queue_tree(target,
339 os.path.join(stream_name, dirent),
340 max_manifest_depth - 1)
342 self._queue_file(target, dirent)
344 if not self._queued_dirents:
345 self._queued_trees.popleft()
347 def _work_trees(self):
348 path, stream_name, max_manifest_depth = self._queued_trees[0]
349 d = util.listdir_recursive(
350 path, max_depth = (None if max_manifest_depth == 0 else 0))
352 self._queue_dirents(stream_name, d)
354 self._queued_trees.popleft()
356 def _queue_file(self, source, filename=None):
357 assert (self._queued_file is None), "tried to queue more than one file"
358 if not hasattr(source, 'read'):
359 source = open(source, 'rb')
360 self._close_file = True
362 self._close_file = False
364 filename = os.path.basename(source.name)
365 self.start_new_file(filename)
366 self._queued_file = source
368 def _queue_dirents(self, stream_name, dirents):
369 assert (not self._queued_dirents), "tried to queue more than one tree"
370 self._queued_dirents = deque(sorted(dirents))
372 def _queue_tree(self, path, stream_name, max_manifest_depth):
373 self._queued_trees.append((path, stream_name, max_manifest_depth))
375 def write_file(self, source, filename=None):
376 self._queue_file(source, filename)
377 self.do_queued_work()
379 def write_directory_tree(self,
380 path, stream_name='.', max_manifest_depth=-1):
381 self._queue_tree(path, stream_name, max_manifest_depth)
382 self.do_queued_work()
384 def write(self, newdata):
385 if hasattr(newdata, '__iter__'):
389 self._data_buffer.append(newdata)
390 self._data_buffer_len += len(newdata)
391 self._current_stream_length += len(newdata)
392 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
395 def flush_data(self):
396 data_buffer = ''.join(self._data_buffer)
398 self._current_stream_locators.append(
399 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
400 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
401 self._data_buffer_len = len(self._data_buffer[0])
403 def start_new_file(self, newfilename=None):
404 self.finish_current_file()
405 self.set_current_file_name(newfilename)
407 def set_current_file_name(self, newfilename):
408 if re.search(r'[\t\n]', newfilename):
409 raise errors.AssertionError(
410 "Manifest filenames cannot contain whitespace: %s" %
412 self._current_file_name = newfilename
414 def current_file_name(self):
415 return self._current_file_name
417 def finish_current_file(self):
418 if self._current_file_name is None:
419 if self._current_file_pos == self._current_stream_length:
421 raise errors.AssertionError(
422 "Cannot finish an unnamed file " +
423 "(%d bytes at offset %d in '%s' stream)" %
424 (self._current_stream_length - self._current_file_pos,
425 self._current_file_pos,
426 self._current_stream_name))
427 self._current_stream_files.append([
428 self._current_file_pos,
429 self._current_stream_length - self._current_file_pos,
430 self._current_file_name])
431 self._current_file_pos = self._current_stream_length
432 self._current_file_name = None
434 def start_new_stream(self, newstreamname='.'):
435 self.finish_current_stream()
436 self.set_current_stream_name(newstreamname)
438 def set_current_stream_name(self, newstreamname):
439 if re.search(r'[\t\n]', newstreamname):
440 raise errors.AssertionError(
441 "Manifest stream names cannot contain whitespace")
442 self._current_stream_name = '.' if newstreamname=='' else newstreamname
444 def current_stream_name(self):
445 return self._current_stream_name
447 def finish_current_stream(self):
448 self.finish_current_file()
450 if not self._current_stream_files:
452 elif self._current_stream_name is None:
453 raise errors.AssertionError(
454 "Cannot finish an unnamed stream (%d bytes in %d files)" %
455 (self._current_stream_length, len(self._current_stream_files)))
457 if not self._current_stream_locators:
458 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
459 self._finished_streams.append([self._current_stream_name,
460 self._current_stream_locators,
461 self._current_stream_files])
462 self._current_stream_files = []
463 self._current_stream_length = 0
464 self._current_stream_locators = []
465 self._current_stream_name = None
466 self._current_file_pos = 0
467 self._current_file_name = None
470 # Store the manifest in Keep and return its locator.
471 return self._my_keep().put(self.manifest_text())
473 def portable_data_hash(self):
474 stripped = self.stripped_manifest()
475 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
477 def manifest_text(self):
478 self.finish_current_stream()
481 for stream in self._finished_streams:
482 if not re.search(r'^\.(/.*)?$', stream[0]):
484 manifest += stream[0].replace(' ', '\\040')
485 manifest += ' ' + ' '.join(stream[1])
486 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
491 def data_locators(self):
493 for name, locators, files in self._finished_streams:
498 class ResumableCollectionWriter(CollectionWriter):
499 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
500 '_current_stream_locators', '_current_stream_name',
501 '_current_file_name', '_current_file_pos', '_close_file',
502 '_data_buffer', '_dependencies', '_finished_streams',
503 '_queued_dirents', '_queued_trees']
505 def __init__(self, api_client=None, num_retries=0):
506 self._dependencies = {}
507 super(ResumableCollectionWriter, self).__init__(
508 api_client, num_retries=num_retries)
511 def from_state(cls, state, *init_args, **init_kwargs):
512 # Try to build a new writer from scratch with the given state.
513 # If the state is not suitable to resume (because files have changed,
514 # been deleted, aren't predictable, etc.), raise a
515 # StaleWriterStateError. Otherwise, return the initialized writer.
516 # The caller is responsible for calling writer.do_queued_work()
517 # appropriately after it's returned.
518 writer = cls(*init_args, **init_kwargs)
519 for attr_name in cls.STATE_PROPS:
520 attr_value = state[attr_name]
521 attr_class = getattr(writer, attr_name).__class__
522 # Coerce the value into the same type as the initial value, if
524 if attr_class not in (type(None), attr_value.__class__):
525 attr_value = attr_class(attr_value)
526 setattr(writer, attr_name, attr_value)
527 # Check dependencies before we try to resume anything.
528 if any(KeepLocator(ls).permission_expired()
529 for ls in writer._current_stream_locators):
530 raise errors.StaleWriterStateError(
531 "locators include expired permission hint")
532 writer.check_dependencies()
533 if state['_current_file'] is not None:
534 path, pos = state['_current_file']
536 writer._queued_file = open(path, 'rb')
537 writer._queued_file.seek(pos)
538 except IOError as error:
539 raise errors.StaleWriterStateError(
540 "failed to reopen active file {}: {}".format(path, error))
543 def check_dependencies(self):
544 for path, orig_stat in self._dependencies.items():
545 if not S_ISREG(orig_stat[ST_MODE]):
546 raise errors.StaleWriterStateError("{} not file".format(path))
548 now_stat = tuple(os.stat(path))
549 except OSError as error:
550 raise errors.StaleWriterStateError(
551 "failed to stat {}: {}".format(path, error))
552 if ((not S_ISREG(now_stat[ST_MODE])) or
553 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
554 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
555 raise errors.StaleWriterStateError("{} changed".format(path))
557 def dump_state(self, copy_func=lambda x: x):
558 state = {attr: copy_func(getattr(self, attr))
559 for attr in self.STATE_PROPS}
560 if self._queued_file is None:
561 state['_current_file'] = None
563 state['_current_file'] = (os.path.realpath(self._queued_file.name),
564 self._queued_file.tell())
567 def _queue_file(self, source, filename=None):
569 src_path = os.path.realpath(source)
571 raise errors.AssertionError("{} not a file path".format(source))
573 path_stat = os.stat(src_path)
574 except OSError as stat_error:
576 super(ResumableCollectionWriter, self)._queue_file(source, filename)
577 fd_stat = os.fstat(self._queued_file.fileno())
578 if not S_ISREG(fd_stat.st_mode):
579 # We won't be able to resume from this cache anyway, so don't
580 # worry about further checks.
581 self._dependencies[source] = tuple(fd_stat)
582 elif path_stat is None:
583 raise errors.AssertionError(
584 "could not stat {}: {}".format(source, stat_error))
585 elif path_stat.st_ino != fd_stat.st_ino:
586 raise errors.AssertionError(
587 "{} changed between open and stat calls".format(source))
589 self._dependencies[src_path] = tuple(fd_stat)
591 def write(self, data):
592 if self._queued_file is None:
593 raise errors.AssertionError(
594 "resumable writer can't accept unsourced data")
595 return super(ResumableCollectionWriter, self).write(data)