21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
67 stream_tokens.append("0:0:{0}".format(fout))
72 class CollectionBase(object):
80 if self._keep_client is None:
81 self._keep_client = KeepClient(api_client=self._api_client,
82 num_retries=self.num_retries)
83 return self._keep_client
85 def stripped_manifest(self):
87 Return the manifest for the current collection with all
88 non-portable hints (i.e., permission signatures and other
89 hints other than size hints) removed from the locators.
91 raw = self.manifest_text()
93 for line in raw.split("\n"):
96 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97 for x in fields[1:-1] ]
98 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
102 class CollectionReader(CollectionBase):
103 def __init__(self, manifest_locator_or_text, api_client=None,
104 keep_client=None, num_retries=0):
105 """Instantiate a CollectionReader.
107 This class parses Collection manifests to provide a simple interface
108 to read its underlying files.
111 * manifest_locator_or_text: One of a Collection UUID, portable data
112 hash, or full manifest text.
113 * api_client: The API client to use to look up Collections. If not
114 provided, CollectionReader will build one from available Arvados
116 * keep_client: The KeepClient to use to download Collection data.
117 If not provided, CollectionReader will build one from available
118 Arvados configuration.
119 * num_retries: The default number of times to retry failed
120 service requests. Default 0. You may change this value
121 after instantiation, but note those changes may not
122 propagate to related objects like the Keep client.
124 self._api_client = api_client
125 self._keep_client = keep_client
126 self.num_retries = num_retries
127 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
128 self._manifest_locator = manifest_locator_or_text
129 self._manifest_text = None
130 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
131 self._manifest_locator = manifest_locator_or_text
132 self._manifest_text = None
133 elif re.match(util.manifest_pattern, manifest_locator_or_text):
134 self._manifest_text = manifest_locator_or_text
135 self._manifest_locator = None
137 raise errors.ArgumentError(
138 "Argument to CollectionReader must be a manifest or a collection UUID")
141 def _populate_from_api_server(self):
142 # As in KeepClient itself, we must wait until the last
143 # possible moment to instantiate an API client, in order to
144 # avoid tripping up clients that don't have access to an API
145 # server. If we do build one, make sure our Keep client uses
146 # it. If instantiation fails, we'll fall back to the except
147 # clause, just like any other Collection lookup
148 # failure. Return an exception, or None if successful.
150 if self._api_client is None:
151 self._api_client = arvados.api('v1')
152 self._keep_client = None # Make a new one with the new api.
153 c = self._api_client.collections().get(
154 uuid=self._manifest_locator).execute(
155 num_retries=self.num_retries)
156 self._manifest_text = c['manifest_text']
158 except Exception as e:
161 def _populate_from_keep(self):
162 # Retrieve a manifest directly from Keep. This has a chance of
163 # working if [a] the locator includes a permission signature
164 # or [b] the Keep services are operating in world-readable
165 # mode. Return an exception, or None if successful.
167 self._manifest_text = self._my_keep().get(
168 self._manifest_locator, num_retries=self.num_retries)
169 except Exception as e:
173 if self._streams is not None:
176 error_via_keep = None
177 should_try_keep = (not self._manifest_text and
178 util.keep_locator_pattern.match(
179 self._manifest_locator))
180 if (not self._manifest_text and
181 util.signed_locator_pattern.match(self._manifest_locator)):
182 error_via_keep = self._populate_from_keep()
183 if not self._manifest_text:
184 error_via_api = self._populate_from_api_server()
185 if error_via_api != None and not should_try_keep:
187 if (not self._manifest_text and
188 not error_via_keep and
190 # Looks like a keep locator, and we didn't already try keep above
191 error_via_keep = self._populate_from_keep()
192 if not self._manifest_text:
194 raise arvados.errors.NotFoundError(
195 ("Failed to retrieve collection '{}' " +
196 "from either API server ({}) or Keep ({})."
198 self._manifest_locator,
201 self._streams = [sline.split()
202 for sline in self._manifest_text.split("\n")
210 for s in self.all_streams():
211 for f in s.all_files():
212 filestream = s.name() + "/" + f.name()
213 r = filestream.rindex("/")
214 streamname = filestream[:r]
215 filename = filestream[r+1:]
216 if streamname not in streams:
217 streams[streamname] = {}
218 if filename not in streams[streamname]:
219 streams[streamname][filename] = []
221 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
223 self._streams = [normalize_stream(s, streams[s])
224 for s in sorted(streams)]
226 # Regenerate the manifest text based on the normalized streams
227 self._manifest_text = ''.join(
228 [StreamReader(stream, keep=self._my_keep()).manifest_text()
229 for stream in self._streams])
231 def all_streams(self):
233 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
234 for s in self._streams]
237 for s in self.all_streams():
238 for f in s.all_files():
241 def manifest_text(self, strip=False, normalize=False):
243 cr = CollectionReader(self.manifest_text())
245 return cr.manifest_text(strip=strip, normalize=False)
247 return self.stripped_manifest()
250 return self._manifest_text
253 class CollectionWriter(CollectionBase):
254 KEEP_BLOCK_SIZE = 2**26
256 def __init__(self, api_client=None, num_retries=0):
257 """Instantiate a CollectionWriter.
259 CollectionWriter lets you build a new Arvados Collection from scratch.
260 Write files to it. The CollectionWriter will upload data to Keep as
261 appropriate, and provide you with the Collection manifest text when
265 * api_client: The API client to use to look up Collections. If not
266 provided, CollectionReader will build one from available Arvados
268 * num_retries: The default number of times to retry failed
269 service requests. Default 0. You may change this value
270 after instantiation, but note those changes may not
271 propagate to related objects like the Keep client.
273 self._api_client = api_client
274 self.num_retries = num_retries
275 self._keep_client = None
276 self._data_buffer = []
277 self._data_buffer_len = 0
278 self._current_stream_files = []
279 self._current_stream_length = 0
280 self._current_stream_locators = []
281 self._current_stream_name = '.'
282 self._current_file_name = None
283 self._current_file_pos = 0
284 self._finished_streams = []
285 self._close_file = None
286 self._queued_file = None
287 self._queued_dirents = deque()
288 self._queued_trees = deque()
293 def do_queued_work(self):
294 # The work queue consists of three pieces:
295 # * _queued_file: The file object we're currently writing to the
297 # * _queued_dirents: Entries under the current directory
298 # (_queued_trees[0]) that we want to write or recurse through.
299 # This may contain files from subdirectories if
300 # max_manifest_depth == 0 for this directory.
301 # * _queued_trees: Directories that should be written as separate
302 # streams to the Collection.
303 # This function handles the smallest piece of work currently queued
304 # (current file, then current directory, then next directory) until
305 # no work remains. The _work_THING methods each do a unit of work on
306 # THING. _queue_THING methods add a THING to the work queue.
308 if self._queued_file:
310 elif self._queued_dirents:
312 elif self._queued_trees:
317 def _work_file(self):
319 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
323 self.finish_current_file()
325 self._queued_file.close()
326 self._close_file = None
327 self._queued_file = None
329 def _work_dirents(self):
330 path, stream_name, max_manifest_depth = self._queued_trees[0]
331 if stream_name != self.current_stream_name():
332 self.start_new_stream(stream_name)
333 while self._queued_dirents:
334 dirent = self._queued_dirents.popleft()
335 target = os.path.join(path, dirent)
336 if os.path.isdir(target):
337 self._queue_tree(target,
338 os.path.join(stream_name, dirent),
339 max_manifest_depth - 1)
341 self._queue_file(target, dirent)
343 if not self._queued_dirents:
344 self._queued_trees.popleft()
346 def _work_trees(self):
347 path, stream_name, max_manifest_depth = self._queued_trees[0]
348 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
350 d = make_dirents(path)
352 self._queue_dirents(stream_name, d)
354 self._queued_trees.popleft()
356 def _queue_file(self, source, filename=None):
357 assert (self._queued_file is None), "tried to queue more than one file"
358 if not hasattr(source, 'read'):
359 source = open(source, 'rb')
360 self._close_file = True
362 self._close_file = False
364 filename = os.path.basename(source.name)
365 self.start_new_file(filename)
366 self._queued_file = source
368 def _queue_dirents(self, stream_name, dirents):
369 assert (not self._queued_dirents), "tried to queue more than one tree"
370 self._queued_dirents = deque(sorted(dirents))
372 def _queue_tree(self, path, stream_name, max_manifest_depth):
373 self._queued_trees.append((path, stream_name, max_manifest_depth))
375 def write_file(self, source, filename=None):
376 self._queue_file(source, filename)
377 self.do_queued_work()
379 def write_directory_tree(self,
380 path, stream_name='.', max_manifest_depth=-1):
381 self._queue_tree(path, stream_name, max_manifest_depth)
382 self.do_queued_work()
384 def write(self, newdata):
385 if hasattr(newdata, '__iter__'):
389 self._data_buffer.append(newdata)
390 self._data_buffer_len += len(newdata)
391 self._current_stream_length += len(newdata)
392 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
395 def flush_data(self):
396 data_buffer = ''.join(self._data_buffer)
398 self._current_stream_locators.append(
399 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
400 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
401 self._data_buffer_len = len(self._data_buffer[0])
403 def start_new_file(self, newfilename=None):
404 self.finish_current_file()
405 self.set_current_file_name(newfilename)
407 def set_current_file_name(self, newfilename):
408 if re.search(r'[\t\n]', newfilename):
409 raise errors.AssertionError(
410 "Manifest filenames cannot contain whitespace: %s" %
412 self._current_file_name = newfilename
414 def current_file_name(self):
415 return self._current_file_name
417 def finish_current_file(self):
418 if self._current_file_name == None:
419 if self._current_file_pos == self._current_stream_length:
421 raise errors.AssertionError(
422 "Cannot finish an unnamed file " +
423 "(%d bytes at offset %d in '%s' stream)" %
424 (self._current_stream_length - self._current_file_pos,
425 self._current_file_pos,
426 self._current_stream_name))
427 self._current_stream_files.append([
428 self._current_file_pos,
429 self._current_stream_length - self._current_file_pos,
430 self._current_file_name])
431 self._current_file_pos = self._current_stream_length
432 self._current_file_name = None
434 def start_new_stream(self, newstreamname='.'):
435 self.finish_current_stream()
436 self.set_current_stream_name(newstreamname)
438 def set_current_stream_name(self, newstreamname):
439 if re.search(r'[\t\n]', newstreamname):
440 raise errors.AssertionError(
441 "Manifest stream names cannot contain whitespace")
442 self._current_stream_name = '.' if newstreamname=='' else newstreamname
444 def current_stream_name(self):
445 return self._current_stream_name
447 def finish_current_stream(self):
448 self.finish_current_file()
450 if not self._current_stream_files:
452 elif self._current_stream_name is None:
453 raise errors.AssertionError(
454 "Cannot finish an unnamed stream (%d bytes in %d files)" %
455 (self._current_stream_length, len(self._current_stream_files)))
457 if not self._current_stream_locators:
458 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
459 self._finished_streams.append([self._current_stream_name,
460 self._current_stream_locators,
461 self._current_stream_files])
462 self._current_stream_files = []
463 self._current_stream_length = 0
464 self._current_stream_locators = []
465 self._current_stream_name = None
466 self._current_file_pos = 0
467 self._current_file_name = None
470 # Store the manifest in Keep and return its locator.
471 return self._my_keep().put(self.manifest_text())
473 def portable_data_hash(self):
474 stripped = self.stripped_manifest()
475 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
477 def manifest_text(self):
478 self.finish_current_stream()
481 for stream in self._finished_streams:
482 if not re.search(r'^\.(/.*)?$', stream[0]):
484 manifest += stream[0].replace(' ', '\\040')
485 manifest += ' ' + ' '.join(stream[1])
486 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
494 def data_locators(self):
496 for name, locators, files in self._finished_streams:
501 class ResumableCollectionWriter(CollectionWriter):
502 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
503 '_current_stream_locators', '_current_stream_name',
504 '_current_file_name', '_current_file_pos', '_close_file',
505 '_data_buffer', '_dependencies', '_finished_streams',
506 '_queued_dirents', '_queued_trees']
508 def __init__(self, api_client=None, num_retries=0):
509 self._dependencies = {}
510 super(ResumableCollectionWriter, self).__init__(
511 api_client, num_retries=num_retries)
514 def from_state(cls, state, *init_args, **init_kwargs):
515 # Try to build a new writer from scratch with the given state.
516 # If the state is not suitable to resume (because files have changed,
517 # been deleted, aren't predictable, etc.), raise a
518 # StaleWriterStateError. Otherwise, return the initialized writer.
519 # The caller is responsible for calling writer.do_queued_work()
520 # appropriately after it's returned.
521 writer = cls(*init_args, **init_kwargs)
522 for attr_name in cls.STATE_PROPS:
523 attr_value = state[attr_name]
524 attr_class = getattr(writer, attr_name).__class__
525 # Coerce the value into the same type as the initial value, if
527 if attr_class not in (type(None), attr_value.__class__):
528 attr_value = attr_class(attr_value)
529 setattr(writer, attr_name, attr_value)
530 # Check dependencies before we try to resume anything.
531 if any(KeepLocator(ls).permission_expired()
532 for ls in writer._current_stream_locators):
533 raise errors.StaleWriterStateError(
534 "locators include expired permission hint")
535 writer.check_dependencies()
536 if state['_current_file'] is not None:
537 path, pos = state['_current_file']
539 writer._queued_file = open(path, 'rb')
540 writer._queued_file.seek(pos)
541 except IOError as error:
542 raise errors.StaleWriterStateError(
543 "failed to reopen active file {}: {}".format(path, error))
546 def check_dependencies(self):
547 for path, orig_stat in self._dependencies.items():
548 if not S_ISREG(orig_stat[ST_MODE]):
549 raise errors.StaleWriterStateError("{} not file".format(path))
551 now_stat = tuple(os.stat(path))
552 except OSError as error:
553 raise errors.StaleWriterStateError(
554 "failed to stat {}: {}".format(path, error))
555 if ((not S_ISREG(now_stat[ST_MODE])) or
556 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
557 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
558 raise errors.StaleWriterStateError("{} changed".format(path))
560 def dump_state(self, copy_func=lambda x: x):
561 state = {attr: copy_func(getattr(self, attr))
562 for attr in self.STATE_PROPS}
563 if self._queued_file is None:
564 state['_current_file'] = None
566 state['_current_file'] = (os.path.realpath(self._queued_file.name),
567 self._queued_file.tell())
570 def _queue_file(self, source, filename=None):
572 src_path = os.path.realpath(source)
574 raise errors.AssertionError("{} not a file path".format(source))
576 path_stat = os.stat(src_path)
577 except OSError as stat_error:
579 super(ResumableCollectionWriter, self)._queue_file(source, filename)
580 fd_stat = os.fstat(self._queued_file.fileno())
581 if not S_ISREG(fd_stat.st_mode):
582 # We won't be able to resume from this cache anyway, so don't
583 # worry about further checks.
584 self._dependencies[source] = tuple(fd_stat)
585 elif path_stat is None:
586 raise errors.AssertionError(
587 "could not stat {}: {}".format(source, stat_error))
588 elif path_stat.st_ino != fd_stat.st_ino:
589 raise errors.AssertionError(
590 "{} changed between open and stat calls".format(source))
592 self._dependencies[src_path] = tuple(fd_stat)
594 def write(self, data):
595 if self._queued_file is None:
596 raise errors.AssertionError(
597 "resumable writer can't accept unsourced data")
598 return super(ResumableCollectionWriter, self).write(data)