21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
66 if len(stream[f]) == 0:
67 stream_tokens.append("0:0:{0}".format(fout))
72 class CollectionBase(object):
80 if self._keep_client is None:
81 self._keep_client = KeepClient(api_client=self._api_client,
82 num_retries=self.num_retries)
83 return self._keep_client
85 def stripped_manifest(self):
87 Return the manifest for the current collection with all
88 non-portable hints (i.e., permission signatures and other
89 hints other than size hints) removed from the locators.
91 raw = self.manifest_text()
93 for line in raw.split("\n"):
96 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97 for x in fields[1:-1] ]
98 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
101 class CollectionReader(CollectionBase):
102 def __init__(self, manifest_locator_or_text, api_client=None,
103 keep_client=None, num_retries=0):
104 """Instantiate a CollectionReader.
106 This class parses Collection manifests to provide a simple interface
107 to read its underlying files.
110 * manifest_locator_or_text: One of a Collection UUID, portable data
111 hash, or full manifest text.
112 * api_client: The API client to use to look up Collections. If not
113 provided, CollectionReader will build one from available Arvados
115 * keep_client: The KeepClient to use to download Collection data.
116 If not provided, CollectionReader will build one from available
117 Arvados configuration.
118 * num_retries: The default number of times to retry failed
119 service requests. Default 0. You may change this value
120 after instantiation, but note those changes may not
121 propagate to related objects like the Keep client.
123 self._api_client = api_client
124 self._keep_client = keep_client
125 self.num_retries = num_retries
126 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
127 self._manifest_locator = manifest_locator_or_text
128 self._manifest_text = None
129 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
130 self._manifest_locator = manifest_locator_or_text
131 self._manifest_text = None
132 elif re.match(util.manifest_pattern, manifest_locator_or_text):
133 self._manifest_text = manifest_locator_or_text
134 self._manifest_locator = None
136 raise errors.ArgumentError(
137 "Argument to CollectionReader must be a manifest or a collection UUID")
140 def _populate_from_api_server(self):
141 # As in KeepClient itself, we must wait until the last
142 # possible moment to instantiate an API client, in order to
143 # avoid tripping up clients that don't have access to an API
144 # server. If we do build one, make sure our Keep client uses
145 # it. If instantiation fails, we'll fall back to the except
146 # clause, just like any other Collection lookup
147 # failure. Return an exception, or None if successful.
149 if self._api_client is None:
150 self._api_client = arvados.api('v1')
151 self._keep_client = None # Make a new one with the new api.
152 c = self._api_client.collections().get(
153 uuid=self._manifest_locator).execute(
154 num_retries=self.num_retries)
155 self._manifest_text = c['manifest_text']
157 except Exception as e:
160 def _populate_from_keep(self):
161 # Retrieve a manifest directly from Keep. This has a chance of
162 # working if [a] the locator includes a permission signature
163 # or [b] the Keep services are operating in world-readable
164 # mode. Return an exception, or None if successful.
166 self._manifest_text = self._my_keep().get(
167 self._manifest_locator, num_retries=self.num_retries)
168 except Exception as e:
172 if self._streams is not None:
175 error_via_keep = None
176 should_try_keep = (not self._manifest_text and
177 util.keep_locator_pattern.match(
178 self._manifest_locator))
179 if (not self._manifest_text and
180 util.signed_locator_pattern.match(self._manifest_locator)):
181 error_via_keep = self._populate_from_keep()
182 if not self._manifest_text:
183 error_via_api = self._populate_from_api_server()
184 if error_via_api != None and not should_try_keep:
186 if (not self._manifest_text and
187 not error_via_keep and
189 # Looks like a keep locator, and we didn't already try keep above
190 error_via_keep = self._populate_from_keep()
191 if not self._manifest_text:
193 raise arvados.errors.NotFoundError(
194 ("Failed to retrieve collection '{}' " +
195 "from either API server ({}) or Keep ({})."
197 self._manifest_locator,
200 self._streams = [sline.split()
201 for sline in self._manifest_text.split("\n")
209 for s in self.all_streams():
210 for f in s.all_files():
211 filestream = s.name() + "/" + f.name()
212 r = filestream.rindex("/")
213 streamname = filestream[:r]
214 filename = filestream[r+1:]
215 if streamname not in streams:
216 streams[streamname] = {}
217 if filename not in streams[streamname]:
218 streams[streamname][filename] = []
220 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
223 sortedstreams = list(streams.keys())
225 for s in sortedstreams:
226 self._streams.append(normalize_stream(s, streams[s]))
228 # Regenerate the manifest text based on the normalized streams
229 self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
233 def all_streams(self):
235 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
236 for s in self._streams]
239 for s in self.all_streams():
240 for f in s.all_files():
243 def manifest_text(self, strip=False):
245 return self.stripped_manifest()
248 return self._manifest_text
251 class CollectionWriter(CollectionBase):
252 KEEP_BLOCK_SIZE = 2**26
254 def __init__(self, api_client=None, num_retries=0):
255 """Instantiate a CollectionWriter.
257 CollectionWriter lets you build a new Arvados Collection from scratch.
258 Write files to it. The CollectionWriter will upload data to Keep as
259 appropriate, and provide you with the Collection manifest text when
263 * api_client: The API client to use to look up Collections. If not
264 provided, CollectionReader will build one from available Arvados
266 * num_retries: The default number of times to retry failed
267 service requests. Default 0. You may change this value
268 after instantiation, but note those changes may not
269 propagate to related objects like the Keep client.
271 self._api_client = api_client
272 self.num_retries = num_retries
273 self._keep_client = None
274 self._data_buffer = []
275 self._data_buffer_len = 0
276 self._current_stream_files = []
277 self._current_stream_length = 0
278 self._current_stream_locators = []
279 self._current_stream_name = '.'
280 self._current_file_name = None
281 self._current_file_pos = 0
282 self._finished_streams = []
283 self._close_file = None
284 self._queued_file = None
285 self._queued_dirents = deque()
286 self._queued_trees = deque()
291 def do_queued_work(self):
292 # The work queue consists of three pieces:
293 # * _queued_file: The file object we're currently writing to the
295 # * _queued_dirents: Entries under the current directory
296 # (_queued_trees[0]) that we want to write or recurse through.
297 # This may contain files from subdirectories if
298 # max_manifest_depth == 0 for this directory.
299 # * _queued_trees: Directories that should be written as separate
300 # streams to the Collection.
301 # This function handles the smallest piece of work currently queued
302 # (current file, then current directory, then next directory) until
303 # no work remains. The _work_THING methods each do a unit of work on
304 # THING. _queue_THING methods add a THING to the work queue.
306 if self._queued_file:
308 elif self._queued_dirents:
310 elif self._queued_trees:
315 def _work_file(self):
317 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
321 self.finish_current_file()
323 self._queued_file.close()
324 self._close_file = None
325 self._queued_file = None
327 def _work_dirents(self):
328 path, stream_name, max_manifest_depth = self._queued_trees[0]
329 if stream_name != self.current_stream_name():
330 self.start_new_stream(stream_name)
331 while self._queued_dirents:
332 dirent = self._queued_dirents.popleft()
333 target = os.path.join(path, dirent)
334 if os.path.isdir(target):
335 self._queue_tree(target,
336 os.path.join(stream_name, dirent),
337 max_manifest_depth - 1)
339 self._queue_file(target, dirent)
341 if not self._queued_dirents:
342 self._queued_trees.popleft()
344 def _work_trees(self):
345 path, stream_name, max_manifest_depth = self._queued_trees[0]
346 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
348 d = make_dirents(path)
350 self._queue_dirents(stream_name, d)
352 self._queued_trees.popleft()
354 def _queue_file(self, source, filename=None):
355 assert (self._queued_file is None), "tried to queue more than one file"
356 if not hasattr(source, 'read'):
357 source = open(source, 'rb')
358 self._close_file = True
360 self._close_file = False
362 filename = os.path.basename(source.name)
363 self.start_new_file(filename)
364 self._queued_file = source
366 def _queue_dirents(self, stream_name, dirents):
367 assert (not self._queued_dirents), "tried to queue more than one tree"
368 self._queued_dirents = deque(sorted(dirents))
370 def _queue_tree(self, path, stream_name, max_manifest_depth):
371 self._queued_trees.append((path, stream_name, max_manifest_depth))
373 def write_file(self, source, filename=None):
374 self._queue_file(source, filename)
375 self.do_queued_work()
377 def write_directory_tree(self,
378 path, stream_name='.', max_manifest_depth=-1):
379 self._queue_tree(path, stream_name, max_manifest_depth)
380 self.do_queued_work()
382 def write(self, newdata):
383 if hasattr(newdata, '__iter__'):
387 self._data_buffer.append(newdata)
388 self._data_buffer_len += len(newdata)
389 self._current_stream_length += len(newdata)
390 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
393 def flush_data(self):
394 data_buffer = ''.join(self._data_buffer)
396 self._current_stream_locators.append(
397 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
398 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
399 self._data_buffer_len = len(self._data_buffer[0])
401 def start_new_file(self, newfilename=None):
402 self.finish_current_file()
403 self.set_current_file_name(newfilename)
405 def set_current_file_name(self, newfilename):
406 if re.search(r'[\t\n]', newfilename):
407 raise errors.AssertionError(
408 "Manifest filenames cannot contain whitespace: %s" %
410 self._current_file_name = newfilename
412 def current_file_name(self):
413 return self._current_file_name
415 def finish_current_file(self):
416 if self._current_file_name == None:
417 if self._current_file_pos == self._current_stream_length:
419 raise errors.AssertionError(
420 "Cannot finish an unnamed file " +
421 "(%d bytes at offset %d in '%s' stream)" %
422 (self._current_stream_length - self._current_file_pos,
423 self._current_file_pos,
424 self._current_stream_name))
425 self._current_stream_files.append([
426 self._current_file_pos,
427 self._current_stream_length - self._current_file_pos,
428 self._current_file_name])
429 self._current_file_pos = self._current_stream_length
430 self._current_file_name = None
432 def start_new_stream(self, newstreamname='.'):
433 self.finish_current_stream()
434 self.set_current_stream_name(newstreamname)
436 def set_current_stream_name(self, newstreamname):
437 if re.search(r'[\t\n]', newstreamname):
438 raise errors.AssertionError(
439 "Manifest stream names cannot contain whitespace")
440 self._current_stream_name = '.' if newstreamname=='' else newstreamname
442 def current_stream_name(self):
443 return self._current_stream_name
445 def finish_current_stream(self):
446 self.finish_current_file()
448 if not self._current_stream_files:
450 elif self._current_stream_name is None:
451 raise errors.AssertionError(
452 "Cannot finish an unnamed stream (%d bytes in %d files)" %
453 (self._current_stream_length, len(self._current_stream_files)))
455 if not self._current_stream_locators:
456 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
457 self._finished_streams.append([self._current_stream_name,
458 self._current_stream_locators,
459 self._current_stream_files])
460 self._current_stream_files = []
461 self._current_stream_length = 0
462 self._current_stream_locators = []
463 self._current_stream_name = None
464 self._current_file_pos = 0
465 self._current_file_name = None
468 # Store the manifest in Keep and return its locator.
469 return self._my_keep().put(self.manifest_text())
471 def portable_data_hash(self):
472 stripped = self.stripped_manifest()
473 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
475 def manifest_text(self):
476 self.finish_current_stream()
479 for stream in self._finished_streams:
480 if not re.search(r'^\.(/.*)?$', stream[0]):
482 manifest += stream[0].replace(' ', '\\040')
483 manifest += ' ' + ' '.join(stream[1])
484 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
492 def data_locators(self):
494 for name, locators, files in self._finished_streams:
499 class ResumableCollectionWriter(CollectionWriter):
500 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
501 '_current_stream_locators', '_current_stream_name',
502 '_current_file_name', '_current_file_pos', '_close_file',
503 '_data_buffer', '_dependencies', '_finished_streams',
504 '_queued_dirents', '_queued_trees']
506 def __init__(self, api_client=None, num_retries=0):
507 self._dependencies = {}
508 super(ResumableCollectionWriter, self).__init__(
509 api_client, num_retries=num_retries)
512 def from_state(cls, state, *init_args, **init_kwargs):
513 # Try to build a new writer from scratch with the given state.
514 # If the state is not suitable to resume (because files have changed,
515 # been deleted, aren't predictable, etc.), raise a
516 # StaleWriterStateError. Otherwise, return the initialized writer.
517 # The caller is responsible for calling writer.do_queued_work()
518 # appropriately after it's returned.
519 writer = cls(*init_args, **init_kwargs)
520 for attr_name in cls.STATE_PROPS:
521 attr_value = state[attr_name]
522 attr_class = getattr(writer, attr_name).__class__
523 # Coerce the value into the same type as the initial value, if
525 if attr_class not in (type(None), attr_value.__class__):
526 attr_value = attr_class(attr_value)
527 setattr(writer, attr_name, attr_value)
528 # Check dependencies before we try to resume anything.
529 if any(KeepLocator(ls).permission_expired()
530 for ls in writer._current_stream_locators):
531 raise errors.StaleWriterStateError(
532 "locators include expired permission hint")
533 writer.check_dependencies()
534 if state['_current_file'] is not None:
535 path, pos = state['_current_file']
537 writer._queued_file = open(path, 'rb')
538 writer._queued_file.seek(pos)
539 except IOError as error:
540 raise errors.StaleWriterStateError(
541 "failed to reopen active file {}: {}".format(path, error))
544 def check_dependencies(self):
545 for path, orig_stat in self._dependencies.items():
546 if not S_ISREG(orig_stat[ST_MODE]):
547 raise errors.StaleWriterStateError("{} not file".format(path))
549 now_stat = tuple(os.stat(path))
550 except OSError as error:
551 raise errors.StaleWriterStateError(
552 "failed to stat {}: {}".format(path, error))
553 if ((not S_ISREG(now_stat[ST_MODE])) or
554 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
555 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
556 raise errors.StaleWriterStateError("{} changed".format(path))
558 def dump_state(self, copy_func=lambda x: x):
559 state = {attr: copy_func(getattr(self, attr))
560 for attr in self.STATE_PROPS}
561 if self._queued_file is None:
562 state['_current_file'] = None
564 state['_current_file'] = (os.path.realpath(self._queued_file.name),
565 self._queued_file.tell())
568 def _queue_file(self, source, filename=None):
570 src_path = os.path.realpath(source)
572 raise errors.AssertionError("{} not a file path".format(source))
574 path_stat = os.stat(src_path)
575 except OSError as stat_error:
577 super(ResumableCollectionWriter, self)._queue_file(source, filename)
578 fd_stat = os.fstat(self._queued_file.fileno())
579 if not S_ISREG(fd_stat.st_mode):
580 # We won't be able to resume from this cache anyway, so don't
581 # worry about further checks.
582 self._dependencies[source] = tuple(fd_stat)
583 elif path_stat is None:
584 raise errors.AssertionError(
585 "could not stat {}: {}".format(source, stat_error))
586 elif path_stat.st_ino != fd_stat.st_ino:
587 raise errors.AssertionError(
588 "{} changed between open and stat calls".format(source))
590 self._dependencies[src_path] = tuple(fd_stat)
592 def write(self, data):
593 if self._queued_file is None:
594 raise errors.AssertionError(
595 "resumable writer can't accept unsourced data")
596 return super(ResumableCollectionWriter, self).write(data)