21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
66 if len(stream[f]) == 0:
67 stream_tokens.append("0:0:{0}".format(fout))
72 class CollectionBase(object):
80 if self._keep_client is None:
81 self._keep_client = KeepClient(api_client=self._api_client,
82 num_retries=self.num_retries)
83 return self._keep_client
85 def stripped_manifest(self):
87 Return the manifest for the current collection with all
88 non-portable hints (i.e., permission signatures and other
89 hints other than size hints) removed from the locators.
91 raw = self.manifest_text()
93 for line in raw.split("\n"):
96 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97 for x in fields[1:-1] ]
98 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
101 class CollectionReader(CollectionBase):
102 def __init__(self, manifest_locator_or_text, api_client=None,
103 keep_client=None, num_retries=0):
104 """Instantiate a CollectionReader.
106 This class parses Collection manifests to provide a simple interface
107 to read its underlying files.
110 * manifest_locator_or_text: One of a Collection UUID, portable data
111 hash, or full manifest text.
112 * api_client: The API client to use to look up Collections. If not
113 provided, CollectionReader will build one from available Arvados
115 * keep_client: The KeepClient to use to download Collection data.
116 If not provided, CollectionReader will build one from available
117 Arvados configuration.
118 * num_retries: The default number of times to retry failed
119 service requests. Default 0. You may change this value
120 after instantiation, but note those changes may not
121 propagate to related objects like the Keep client.
123 self._api_client = api_client
124 self._keep_client = keep_client
125 self.num_retries = num_retries
126 if re.match(util.keep_locator_pattern, manifest_locator_or_text):
127 self._manifest_locator = manifest_locator_or_text
128 self._manifest_text = None
129 elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
130 self._manifest_locator = manifest_locator_or_text
131 self._manifest_text = None
132 elif re.match(util.manifest_pattern, manifest_locator_or_text):
133 self._manifest_text = manifest_locator_or_text
134 self._manifest_locator = None
136 raise errors.ArgumentError(
137 "Argument to CollectionReader must be a manifest or a collection UUID")
140 def _populate_from_api_server(self):
141 # As in KeepClient itself, we must wait until the last possible
142 # moment to instantiate an API client, in order to avoid
143 # tripping up clients that don't have access to an API server.
144 # If we do build one, make sure our Keep client uses it.
145 # If instantiation fails, we'll fall back to the except clause,
146 # just like any other Collection lookup failure.
147 if self._api_client is None:
148 self._api_client = arvados.api('v1')
149 self._keep_client = None # Make a new one with the new api.
150 c = self._api_client.collections().get(
151 uuid=self._manifest_locator).execute(
152 num_retries=self.num_retries)
153 self._manifest_text = c['manifest_text']
155 def _populate_from_keep(self):
156 # Retrieve a manifest directly from Keep. This has a chance of
157 # working if [a] the locator includes a permission signature
158 # or [b] the Keep services are operating in world-readable
160 self._manifest_text = self._my_keep().get(
161 self._manifest_locator, num_retries=self.num_retries)
164 if self._streams is not None:
167 error_via_keep = None
168 should_try_keep = (not self._manifest_text and
169 util.keep_locator_pattern.match(
170 self._manifest_locator))
171 if (not self._manifest_text and
172 util.signed_locator_pattern.match(self._manifest_locator)):
174 self._populate_from_keep()
177 if not self._manifest_text:
179 self._populate_from_api_server()
180 except Exception as e:
181 if not should_try_keep:
184 if (not self._manifest_text and
185 not error_via_keep and
187 # Looks like a keep locator, and we didn't already try keep above
189 self._populate_from_keep()
190 except Exception as e:
192 if not self._manifest_text:
194 raise arvados.errors.NotFoundError(
195 ("Failed to retrieve collection '{}' " +
196 "from either API server ({}) or Keep ({})."
198 self._manifest_locator,
201 self._streams = [sline.split()
202 for sline in self._manifest_text.split("\n")
210 for s in self.all_streams():
211 for f in s.all_files():
212 filestream = s.name() + "/" + f.name()
213 r = filestream.rindex("/")
214 streamname = filestream[:r]
215 filename = filestream[r+1:]
216 if streamname not in streams:
217 streams[streamname] = {}
218 if filename not in streams[streamname]:
219 streams[streamname][filename] = []
221 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
224 sortedstreams = list(streams.keys())
226 for s in sortedstreams:
227 self._streams.append(normalize_stream(s, streams[s]))
229 # Regenerate the manifest text based on the normalized streams
230 self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
234 def all_streams(self):
236 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237 for s in self._streams]
240 for s in self.all_streams():
241 for f in s.all_files():
244 def manifest_text(self, strip=False):
246 return self.stripped_manifest()
249 return self._manifest_text
252 class CollectionWriter(CollectionBase):
253 KEEP_BLOCK_SIZE = 2**26
255 def __init__(self, api_client=None, num_retries=0):
256 """Instantiate a CollectionWriter.
258 CollectionWriter lets you build a new Arvados Collection from scratch.
259 Write files to it. The CollectionWriter will upload data to Keep as
260 appropriate, and provide you with the Collection manifest text when
264 * api_client: The API client to use to look up Collections. If not
265 provided, CollectionReader will build one from available Arvados
267 * num_retries: The default number of times to retry failed
268 service requests. Default 0. You may change this value
269 after instantiation, but note those changes may not
270 propagate to related objects like the Keep client.
272 self._api_client = api_client
273 self.num_retries = num_retries
274 self._keep_client = None
275 self._data_buffer = []
276 self._data_buffer_len = 0
277 self._current_stream_files = []
278 self._current_stream_length = 0
279 self._current_stream_locators = []
280 self._current_stream_name = '.'
281 self._current_file_name = None
282 self._current_file_pos = 0
283 self._finished_streams = []
284 self._close_file = None
285 self._queued_file = None
286 self._queued_dirents = deque()
287 self._queued_trees = deque()
292 def do_queued_work(self):
293 # The work queue consists of three pieces:
294 # * _queued_file: The file object we're currently writing to the
296 # * _queued_dirents: Entries under the current directory
297 # (_queued_trees[0]) that we want to write or recurse through.
298 # This may contain files from subdirectories if
299 # max_manifest_depth == 0 for this directory.
300 # * _queued_trees: Directories that should be written as separate
301 # streams to the Collection.
302 # This function handles the smallest piece of work currently queued
303 # (current file, then current directory, then next directory) until
304 # no work remains. The _work_THING methods each do a unit of work on
305 # THING. _queue_THING methods add a THING to the work queue.
307 if self._queued_file:
309 elif self._queued_dirents:
311 elif self._queued_trees:
316 def _work_file(self):
318 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
322 self.finish_current_file()
324 self._queued_file.close()
325 self._close_file = None
326 self._queued_file = None
328 def _work_dirents(self):
329 path, stream_name, max_manifest_depth = self._queued_trees[0]
330 if stream_name != self.current_stream_name():
331 self.start_new_stream(stream_name)
332 while self._queued_dirents:
333 dirent = self._queued_dirents.popleft()
334 target = os.path.join(path, dirent)
335 if os.path.isdir(target):
336 self._queue_tree(target,
337 os.path.join(stream_name, dirent),
338 max_manifest_depth - 1)
340 self._queue_file(target, dirent)
342 if not self._queued_dirents:
343 self._queued_trees.popleft()
345 def _work_trees(self):
346 path, stream_name, max_manifest_depth = self._queued_trees[0]
347 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
349 d = make_dirents(path)
351 self._queue_dirents(stream_name, d)
353 self._queued_trees.popleft()
355 def _queue_file(self, source, filename=None):
356 assert (self._queued_file is None), "tried to queue more than one file"
357 if not hasattr(source, 'read'):
358 source = open(source, 'rb')
359 self._close_file = True
361 self._close_file = False
363 filename = os.path.basename(source.name)
364 self.start_new_file(filename)
365 self._queued_file = source
367 def _queue_dirents(self, stream_name, dirents):
368 assert (not self._queued_dirents), "tried to queue more than one tree"
369 self._queued_dirents = deque(sorted(dirents))
371 def _queue_tree(self, path, stream_name, max_manifest_depth):
372 self._queued_trees.append((path, stream_name, max_manifest_depth))
374 def write_file(self, source, filename=None):
375 self._queue_file(source, filename)
376 self.do_queued_work()
378 def write_directory_tree(self,
379 path, stream_name='.', max_manifest_depth=-1):
380 self._queue_tree(path, stream_name, max_manifest_depth)
381 self.do_queued_work()
383 def write(self, newdata):
384 if hasattr(newdata, '__iter__'):
388 self._data_buffer.append(newdata)
389 self._data_buffer_len += len(newdata)
390 self._current_stream_length += len(newdata)
391 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
394 def flush_data(self):
395 data_buffer = ''.join(self._data_buffer)
397 self._current_stream_locators.append(
398 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
399 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
400 self._data_buffer_len = len(self._data_buffer[0])
402 def start_new_file(self, newfilename=None):
403 self.finish_current_file()
404 self.set_current_file_name(newfilename)
406 def set_current_file_name(self, newfilename):
407 if re.search(r'[\t\n]', newfilename):
408 raise errors.AssertionError(
409 "Manifest filenames cannot contain whitespace: %s" %
411 self._current_file_name = newfilename
413 def current_file_name(self):
414 return self._current_file_name
416 def finish_current_file(self):
417 if self._current_file_name == None:
418 if self._current_file_pos == self._current_stream_length:
420 raise errors.AssertionError(
421 "Cannot finish an unnamed file " +
422 "(%d bytes at offset %d in '%s' stream)" %
423 (self._current_stream_length - self._current_file_pos,
424 self._current_file_pos,
425 self._current_stream_name))
426 self._current_stream_files.append([
427 self._current_file_pos,
428 self._current_stream_length - self._current_file_pos,
429 self._current_file_name])
430 self._current_file_pos = self._current_stream_length
431 self._current_file_name = None
433 def start_new_stream(self, newstreamname='.'):
434 self.finish_current_stream()
435 self.set_current_stream_name(newstreamname)
437 def set_current_stream_name(self, newstreamname):
438 if re.search(r'[\t\n]', newstreamname):
439 raise errors.AssertionError(
440 "Manifest stream names cannot contain whitespace")
441 self._current_stream_name = '.' if newstreamname=='' else newstreamname
443 def current_stream_name(self):
444 return self._current_stream_name
446 def finish_current_stream(self):
447 self.finish_current_file()
449 if not self._current_stream_files:
451 elif self._current_stream_name is None:
452 raise errors.AssertionError(
453 "Cannot finish an unnamed stream (%d bytes in %d files)" %
454 (self._current_stream_length, len(self._current_stream_files)))
456 if not self._current_stream_locators:
457 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
458 self._finished_streams.append([self._current_stream_name,
459 self._current_stream_locators,
460 self._current_stream_files])
461 self._current_stream_files = []
462 self._current_stream_length = 0
463 self._current_stream_locators = []
464 self._current_stream_name = None
465 self._current_file_pos = 0
466 self._current_file_name = None
469 # Store the manifest in Keep and return its locator.
470 return self._my_keep().put(self.manifest_text())
472 def portable_data_hash(self):
473 stripped = self.stripped_manifest()
474 return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
476 def manifest_text(self):
477 self.finish_current_stream()
480 for stream in self._finished_streams:
481 if not re.search(r'^\.(/.*)?$', stream[0]):
483 manifest += stream[0].replace(' ', '\\040')
484 manifest += ' ' + ' '.join(stream[1])
485 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
493 def data_locators(self):
495 for name, locators, files in self._finished_streams:
500 class ResumableCollectionWriter(CollectionWriter):
501 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
502 '_current_stream_locators', '_current_stream_name',
503 '_current_file_name', '_current_file_pos', '_close_file',
504 '_data_buffer', '_dependencies', '_finished_streams',
505 '_queued_dirents', '_queued_trees']
507 def __init__(self, api_client=None, num_retries=0):
508 self._dependencies = {}
509 super(ResumableCollectionWriter, self).__init__(
510 api_client, num_retries=num_retries)
513 def from_state(cls, state, *init_args, **init_kwargs):
514 # Try to build a new writer from scratch with the given state.
515 # If the state is not suitable to resume (because files have changed,
516 # been deleted, aren't predictable, etc.), raise a
517 # StaleWriterStateError. Otherwise, return the initialized writer.
518 # The caller is responsible for calling writer.do_queued_work()
519 # appropriately after it's returned.
520 writer = cls(*init_args, **init_kwargs)
521 for attr_name in cls.STATE_PROPS:
522 attr_value = state[attr_name]
523 attr_class = getattr(writer, attr_name).__class__
524 # Coerce the value into the same type as the initial value, if
526 if attr_class not in (type(None), attr_value.__class__):
527 attr_value = attr_class(attr_value)
528 setattr(writer, attr_name, attr_value)
529 # Check dependencies before we try to resume anything.
530 if any(KeepLocator(ls).permission_expired()
531 for ls in writer._current_stream_locators):
532 raise errors.StaleWriterStateError(
533 "locators include expired permission hint")
534 writer.check_dependencies()
535 if state['_current_file'] is not None:
536 path, pos = state['_current_file']
538 writer._queued_file = open(path, 'rb')
539 writer._queued_file.seek(pos)
540 except IOError as error:
541 raise errors.StaleWriterStateError(
542 "failed to reopen active file {}: {}".format(path, error))
545 def check_dependencies(self):
546 for path, orig_stat in self._dependencies.items():
547 if not S_ISREG(orig_stat[ST_MODE]):
548 raise errors.StaleWriterStateError("{} not file".format(path))
550 now_stat = tuple(os.stat(path))
551 except OSError as error:
552 raise errors.StaleWriterStateError(
553 "failed to stat {}: {}".format(path, error))
554 if ((not S_ISREG(now_stat[ST_MODE])) or
555 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
556 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
557 raise errors.StaleWriterStateError("{} changed".format(path))
559 def dump_state(self, copy_func=lambda x: x):
560 state = {attr: copy_func(getattr(self, attr))
561 for attr in self.STATE_PROPS}
562 if self._queued_file is None:
563 state['_current_file'] = None
565 state['_current_file'] = (os.path.realpath(self._queued_file.name),
566 self._queued_file.tell())
569 def _queue_file(self, source, filename=None):
571 src_path = os.path.realpath(source)
573 raise errors.AssertionError("{} not a file path".format(source))
575 path_stat = os.stat(src_path)
576 except OSError as stat_error:
578 super(ResumableCollectionWriter, self)._queue_file(source, filename)
579 fd_stat = os.fstat(self._queued_file.fileno())
580 if not S_ISREG(fd_stat.st_mode):
581 # We won't be able to resume from this cache anyway, so don't
582 # worry about further checks.
583 self._dependencies[source] = tuple(fd_stat)
584 elif path_stat is None:
585 raise errors.AssertionError(
586 "could not stat {}: {}".format(source, stat_error))
587 elif path_stat.st_ino != fd_stat.st_ino:
588 raise errors.AssertionError(
589 "{} changed between open and stat calls".format(source))
591 self._dependencies[src_path] = tuple(fd_stat)
593 def write(self, data):
594 if self._queued_file is None:
595 raise errors.AssertionError(
596 "resumable writer can't accept unsourced data")
597 return super(ResumableCollectionWriter, self).write(data)