21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
66 if len(stream[f]) == 0:
67 stream_tokens.append("0:0:{0}".format(fout))
71 def normalize(collection):
73 for s in collection.all_streams():
74 for f in s.all_files():
75 filestream = s.name() + "/" + f.name()
76 r = filestream.rindex("/")
77 streamname = filestream[:r]
78 filename = filestream[r+1:]
79 if streamname not in streams:
80 streams[streamname] = {}
81 if filename not in streams[streamname]:
82 streams[streamname][filename] = []
84 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
86 normalized_streams = []
87 sortedstreams = list(streams.keys())
89 for s in sortedstreams:
90 normalized_streams.append(normalize_stream(s, streams[s]))
91 return normalized_streams
94 class CollectionBase(object):
102 if self._keep_client is None:
103 self._keep_client = KeepClient(api_client=self._api_client,
104 num_retries=self.num_retries)
105 return self._keep_client
108 class CollectionReader(CollectionBase):
109 def __init__(self, manifest_locator_or_text, api_client=None,
110 keep_client=None, num_retries=0):
111 """Instantiate a CollectionReader.
113 This class parses Collection manifests to provide a simple interface
114 to read its underlying files.
117 * manifest_locator_or_text: One of a Collection UUID, portable data
118 hash, or full manifest text.
119 * api_client: The API client to use to look up Collections. If not
120 provided, CollectionReader will build one from available Arvados
122 * keep_client: The KeepClient to use to download Collection data.
123 If not provided, CollectionReader will build one from available
124 Arvados configuration.
125 * num_retries: The default number of times to retry failed
126 service requests. Default 0. You may change this value
127 after instantiation, but note those changes may not
128 propagate to related objects like the Keep client.
130 self._api_client = api_client
131 self._keep_client = keep_client
132 self.num_retries = num_retries
133 if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
134 self._manifest_locator = manifest_locator_or_text
135 self._manifest_text = None
136 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
137 self._manifest_locator = manifest_locator_or_text
138 self._manifest_text = None
139 elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
140 self._manifest_text = manifest_locator_or_text
141 self._manifest_locator = None
143 raise errors.ArgumentError(
144 "Argument to CollectionReader must be a manifest or a collection UUID")
148 if self._streams is not None:
150 if not self._manifest_text:
152 # As in KeepClient itself, we must wait until the last possible
153 # moment to instantiate an API client, in order to avoid
154 # tripping up clients that don't have access to an API server.
155 # If we do build one, make sure our Keep client uses it.
156 # If instantiation fails, we'll fall back to the except clause,
157 # just like any other Collection lookup failure.
158 if self._api_client is None:
159 self._api_client = arvados.api('v1')
160 self._keep_client = None # Make a new one with the new api.
161 c = self._api_client.collections().get(
162 uuid=self._manifest_locator).execute(
163 num_retries=self.num_retries)
164 self._manifest_text = c['manifest_text']
165 except Exception as e:
166 if not util.portable_data_hash_pattern.match(
167 self._manifest_locator):
170 "API server did not return Collection '%s'. " +
171 "Trying to fetch directly from Keep (deprecated).",
172 self._manifest_locator)
173 self._manifest_text = self._my_keep().get(
174 self._manifest_locator, num_retries=self.num_retries)
175 self._streams = [sline.split()
176 for sline in self._manifest_text.split("\n")
178 self._streams = normalize(self)
180 # now regenerate the manifest text based on the normalized stream
182 #print "normalizing", self._manifest_text
183 self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
184 #print "result", self._manifest_text
187 def all_streams(self):
189 return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
190 for s in self._streams]
193 for s in self.all_streams():
194 for f in s.all_files():
197 def manifest_text(self, strip=False):
200 m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
203 return self._manifest_text
206 class CollectionWriter(CollectionBase):
207 KEEP_BLOCK_SIZE = 2**26
209 def __init__(self, api_client=None, num_retries=0):
210 """Instantiate a CollectionWriter.
212 CollectionWriter lets you build a new Arvados Collection from scratch.
213 Write files to it. The CollectionWriter will upload data to Keep as
214 appropriate, and provide you with the Collection manifest text when
218 * api_client: The API client to use to look up Collections. If not
219 provided, CollectionReader will build one from available Arvados
221 * num_retries: The default number of times to retry failed
222 service requests. Default 0. You may change this value
223 after instantiation, but note those changes may not
224 propagate to related objects like the Keep client.
226 self._api_client = api_client
227 self.num_retries = num_retries
228 self._keep_client = None
229 self._data_buffer = []
230 self._data_buffer_len = 0
231 self._current_stream_files = []
232 self._current_stream_length = 0
233 self._current_stream_locators = []
234 self._current_stream_name = '.'
235 self._current_file_name = None
236 self._current_file_pos = 0
237 self._finished_streams = []
238 self._close_file = None
239 self._queued_file = None
240 self._queued_dirents = deque()
241 self._queued_trees = deque()
246 def do_queued_work(self):
247 # The work queue consists of three pieces:
248 # * _queued_file: The file object we're currently writing to the
250 # * _queued_dirents: Entries under the current directory
251 # (_queued_trees[0]) that we want to write or recurse through.
252 # This may contain files from subdirectories if
253 # max_manifest_depth == 0 for this directory.
254 # * _queued_trees: Directories that should be written as separate
255 # streams to the Collection.
256 # This function handles the smallest piece of work currently queued
257 # (current file, then current directory, then next directory) until
258 # no work remains. The _work_THING methods each do a unit of work on
259 # THING. _queue_THING methods add a THING to the work queue.
261 if self._queued_file:
263 elif self._queued_dirents:
265 elif self._queued_trees:
270 def _work_file(self):
272 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
276 self.finish_current_file()
278 self._queued_file.close()
279 self._close_file = None
280 self._queued_file = None
282 def _work_dirents(self):
283 path, stream_name, max_manifest_depth = self._queued_trees[0]
284 if stream_name != self.current_stream_name():
285 self.start_new_stream(stream_name)
286 while self._queued_dirents:
287 dirent = self._queued_dirents.popleft()
288 target = os.path.join(path, dirent)
289 if os.path.isdir(target):
290 self._queue_tree(target,
291 os.path.join(stream_name, dirent),
292 max_manifest_depth - 1)
294 self._queue_file(target, dirent)
296 if not self._queued_dirents:
297 self._queued_trees.popleft()
299 def _work_trees(self):
300 path, stream_name, max_manifest_depth = self._queued_trees[0]
301 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
303 d = make_dirents(path)
305 self._queue_dirents(stream_name, d)
307 self._queued_trees.popleft()
309 def _queue_file(self, source, filename=None):
310 assert (self._queued_file is None), "tried to queue more than one file"
311 if not hasattr(source, 'read'):
312 source = open(source, 'rb')
313 self._close_file = True
315 self._close_file = False
317 filename = os.path.basename(source.name)
318 self.start_new_file(filename)
319 self._queued_file = source
321 def _queue_dirents(self, stream_name, dirents):
322 assert (not self._queued_dirents), "tried to queue more than one tree"
323 self._queued_dirents = deque(sorted(dirents))
325 def _queue_tree(self, path, stream_name, max_manifest_depth):
326 self._queued_trees.append((path, stream_name, max_manifest_depth))
328 def write_file(self, source, filename=None):
329 self._queue_file(source, filename)
330 self.do_queued_work()
332 def write_directory_tree(self,
333 path, stream_name='.', max_manifest_depth=-1):
334 self._queue_tree(path, stream_name, max_manifest_depth)
335 self.do_queued_work()
337 def write(self, newdata):
338 if hasattr(newdata, '__iter__'):
342 self._data_buffer.append(newdata)
343 self._data_buffer_len += len(newdata)
344 self._current_stream_length += len(newdata)
345 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
348 def flush_data(self):
349 data_buffer = ''.join(self._data_buffer)
351 self._current_stream_locators.append(
352 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
353 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
354 self._data_buffer_len = len(self._data_buffer[0])
356 def start_new_file(self, newfilename=None):
357 self.finish_current_file()
358 self.set_current_file_name(newfilename)
360 def set_current_file_name(self, newfilename):
361 if re.search(r'[\t\n]', newfilename):
362 raise errors.AssertionError(
363 "Manifest filenames cannot contain whitespace: %s" %
365 self._current_file_name = newfilename
367 def current_file_name(self):
368 return self._current_file_name
370 def finish_current_file(self):
371 if self._current_file_name == None:
372 if self._current_file_pos == self._current_stream_length:
374 raise errors.AssertionError(
375 "Cannot finish an unnamed file " +
376 "(%d bytes at offset %d in '%s' stream)" %
377 (self._current_stream_length - self._current_file_pos,
378 self._current_file_pos,
379 self._current_stream_name))
380 self._current_stream_files.append([
381 self._current_file_pos,
382 self._current_stream_length - self._current_file_pos,
383 self._current_file_name])
384 self._current_file_pos = self._current_stream_length
386 def start_new_stream(self, newstreamname='.'):
387 self.finish_current_stream()
388 self.set_current_stream_name(newstreamname)
390 def set_current_stream_name(self, newstreamname):
391 if re.search(r'[\t\n]', newstreamname):
392 raise errors.AssertionError(
393 "Manifest stream names cannot contain whitespace")
394 self._current_stream_name = '.' if newstreamname=='' else newstreamname
396 def current_stream_name(self):
397 return self._current_stream_name
399 def finish_current_stream(self):
400 self.finish_current_file()
402 if not self._current_stream_files:
404 elif self._current_stream_name is None:
405 raise errors.AssertionError(
406 "Cannot finish an unnamed stream (%d bytes in %d files)" %
407 (self._current_stream_length, len(self._current_stream_files)))
409 if not self._current_stream_locators:
410 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
411 self._finished_streams.append([self._current_stream_name,
412 self._current_stream_locators,
413 self._current_stream_files])
414 self._current_stream_files = []
415 self._current_stream_length = 0
416 self._current_stream_locators = []
417 self._current_stream_name = None
418 self._current_file_pos = 0
419 self._current_file_name = None
422 # Store the manifest in Keep and return its locator.
423 return self._my_keep().put(self.manifest_text())
425 def stripped_manifest(self):
427 Return the manifest for the current collection with all permission
428 hints removed from the locators in the manifest.
430 raw = self.manifest_text()
432 for line in raw.split("\n"):
433 fields = line.split()
435 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
436 for x in fields[1:-1] ]
437 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
440 def manifest_text(self):
441 self.finish_current_stream()
444 for stream in self._finished_streams:
445 if not re.search(r'^\.(/.*)?$', stream[0]):
447 manifest += stream[0].replace(' ', '\\040')
448 manifest += ' ' + ' '.join(stream[1])
449 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
453 return CollectionReader(manifest, self._api_client).manifest_text()
457 def data_locators(self):
459 for name, locators, files in self._finished_streams:
464 class ResumableCollectionWriter(CollectionWriter):
465 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
466 '_current_stream_locators', '_current_stream_name',
467 '_current_file_name', '_current_file_pos', '_close_file',
468 '_data_buffer', '_dependencies', '_finished_streams',
469 '_queued_dirents', '_queued_trees']
471 def __init__(self, api_client=None, num_retries=0):
472 self._dependencies = {}
473 super(ResumableCollectionWriter, self).__init__(
474 api_client, num_retries=num_retries)
477 def from_state(cls, state, *init_args, **init_kwargs):
478 # Try to build a new writer from scratch with the given state.
479 # If the state is not suitable to resume (because files have changed,
480 # been deleted, aren't predictable, etc.), raise a
481 # StaleWriterStateError. Otherwise, return the initialized writer.
482 # The caller is responsible for calling writer.do_queued_work()
483 # appropriately after it's returned.
484 writer = cls(*init_args, **init_kwargs)
485 for attr_name in cls.STATE_PROPS:
486 attr_value = state[attr_name]
487 attr_class = getattr(writer, attr_name).__class__
488 # Coerce the value into the same type as the initial value, if
490 if attr_class not in (type(None), attr_value.__class__):
491 attr_value = attr_class(attr_value)
492 setattr(writer, attr_name, attr_value)
493 # Check dependencies before we try to resume anything.
494 if any(KeepLocator(ls).permission_expired()
495 for ls in writer._current_stream_locators):
496 raise errors.StaleWriterStateError(
497 "locators include expired permission hint")
498 writer.check_dependencies()
499 if state['_current_file'] is not None:
500 path, pos = state['_current_file']
502 writer._queued_file = open(path, 'rb')
503 writer._queued_file.seek(pos)
504 except IOError as error:
505 raise errors.StaleWriterStateError(
506 "failed to reopen active file {}: {}".format(path, error))
509 def check_dependencies(self):
510 for path, orig_stat in self._dependencies.items():
511 if not S_ISREG(orig_stat[ST_MODE]):
512 raise errors.StaleWriterStateError("{} not file".format(path))
514 now_stat = tuple(os.stat(path))
515 except OSError as error:
516 raise errors.StaleWriterStateError(
517 "failed to stat {}: {}".format(path, error))
518 if ((not S_ISREG(now_stat[ST_MODE])) or
519 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
520 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
521 raise errors.StaleWriterStateError("{} changed".format(path))
523 def dump_state(self, copy_func=lambda x: x):
524 state = {attr: copy_func(getattr(self, attr))
525 for attr in self.STATE_PROPS}
526 if self._queued_file is None:
527 state['_current_file'] = None
529 state['_current_file'] = (os.path.realpath(self._queued_file.name),
530 self._queued_file.tell())
533 def _queue_file(self, source, filename=None):
535 src_path = os.path.realpath(source)
537 raise errors.AssertionError("{} not a file path".format(source))
539 path_stat = os.stat(src_path)
540 except OSError as stat_error:
542 super(ResumableCollectionWriter, self)._queue_file(source, filename)
543 fd_stat = os.fstat(self._queued_file.fileno())
544 if not S_ISREG(fd_stat.st_mode):
545 # We won't be able to resume from this cache anyway, so don't
546 # worry about further checks.
547 self._dependencies[source] = tuple(fd_stat)
548 elif path_stat is None:
549 raise errors.AssertionError(
550 "could not stat {}: {}".format(source, stat_error))
551 elif path_stat.st_ino != fd_stat.st_ino:
552 raise errors.AssertionError(
553 "{} changed between open and stat calls".format(source))
555 self._dependencies[src_path] = tuple(fd_stat)
557 def write(self, data):
558 if self._queued_file is None:
559 raise errors.AssertionError(
560 "resumable writer can't accept unsourced data")
561 return super(ResumableCollectionWriter, self).write(data)