21 from collections import deque
30 _logger = logging.getLogger('arvados.collection')
32 def normalize_stream(s, stream):
34 sortedfiles = list(stream.keys())
41 if b[arvados.LOCATOR] not in blocks:
42 stream_tokens.append(b[arvados.LOCATOR])
43 blocks[b[arvados.LOCATOR]] = streamoffset
44 streamoffset += b[arvados.BLOCKSIZE]
46 if len(stream_tokens) == 1:
47 stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
51 fout = f.replace(' ', '\\040')
52 for segment in stream[f]:
53 segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54 if current_span == None:
55 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57 if segmentoffset == current_span[1]:
58 current_span[1] += segment[arvados.SEGMENTSIZE]
60 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
63 if current_span != None:
64 stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
66 if len(stream[f]) == 0:
67 stream_tokens.append("0:0:{0}".format(fout))
71 def normalize(collection):
73 for s in collection.all_streams():
74 for f in s.all_files():
75 filestream = s.name() + "/" + f.name()
76 r = filestream.rindex("/")
77 streamname = filestream[:r]
78 filename = filestream[r+1:]
79 if streamname not in streams:
80 streams[streamname] = {}
81 if filename not in streams[streamname]:
82 streams[streamname][filename] = []
84 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
86 normalized_streams = []
87 sortedstreams = list(streams.keys())
89 for s in sortedstreams:
90 normalized_streams.append(normalize_stream(s, streams[s]))
91 return normalized_streams
94 class CollectionReader(object):
95 def __init__(self, manifest_locator_or_text, api_client=None):
96 self._api_client = api_client
97 self._keep_client = None
98 if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
99 self._manifest_locator = manifest_locator_or_text
100 self._manifest_text = None
101 elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
102 self._manifest_locator = manifest_locator_or_text
103 self._manifest_text = None
104 elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
105 self._manifest_text = manifest_locator_or_text
106 self._manifest_locator = None
108 raise errors.ArgumentError(
109 "Argument to CollectionReader must be a manifest or a collection UUID")
119 if self._streams is not None:
121 if not self._manifest_text:
123 # As in KeepClient itself, we must wait until the last possible
124 # moment to instantiate an API client, in order to avoid
125 # tripping up clients that don't have access to an API server.
126 # If we do build one, make sure our Keep client uses it.
127 # If instantiation fails, we'll fall back to the except clause,
128 # just like any other Collection lookup failure.
129 if self._api_client is None:
130 self._api_client = arvados.api('v1')
131 self._keep_client = KeepClient(api_client=self._api_client)
132 if self._keep_client is None:
133 self._keep_client = KeepClient(api_client=self._api_client)
134 c = self._api_client.collections().get(
135 uuid=self._manifest_locator).execute()
136 self._manifest_text = c['manifest_text']
137 except Exception as e:
138 if not util.portable_data_hash_pattern.match(
139 self._manifest_locator):
141 _logger.warning("API lookup failed for collection %s (%s: %s)",
142 self._manifest_locator, type(e), str(e))
143 if self._keep_client is None:
144 self._keep_client = KeepClient(api_client=self._api_client)
145 self._manifest_text = self._keep_client.get(self._manifest_locator)
147 for stream_line in self._manifest_text.split("\n"):
148 if stream_line != '':
149 stream_tokens = stream_line.split()
150 self._streams += [stream_tokens]
151 self._streams = normalize(self)
153 # now regenerate the manifest text based on the normalized stream
155 #print "normalizing", self._manifest_text
156 self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
157 #print "result", self._manifest_text
160 def all_streams(self):
163 for s in self._streams:
164 resp.append(StreamReader(s))
168 for s in self.all_streams():
169 for f in s.all_files():
172 def manifest_text(self, strip=False):
175 m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
178 return self._manifest_text
180 class CollectionWriter(object):
181 KEEP_BLOCK_SIZE = 2**26
183 def __init__(self, api_client=None):
184 self._api_client = api_client
185 self._keep_client = None
186 self._data_buffer = []
187 self._data_buffer_len = 0
188 self._current_stream_files = []
189 self._current_stream_length = 0
190 self._current_stream_locators = []
191 self._current_stream_name = '.'
192 self._current_file_name = None
193 self._current_file_pos = 0
194 self._finished_streams = []
195 self._close_file = None
196 self._queued_file = None
197 self._queued_dirents = deque()
198 self._queued_trees = deque()
206 def _prep_keep_client(self):
207 if self._keep_client is None:
208 self._keep_client = KeepClient(api_client=self._api_client)
210 def do_queued_work(self):
211 # The work queue consists of three pieces:
212 # * _queued_file: The file object we're currently writing to the
214 # * _queued_dirents: Entries under the current directory
215 # (_queued_trees[0]) that we want to write or recurse through.
216 # This may contain files from subdirectories if
217 # max_manifest_depth == 0 for this directory.
218 # * _queued_trees: Directories that should be written as separate
219 # streams to the Collection.
220 # This function handles the smallest piece of work currently queued
221 # (current file, then current directory, then next directory) until
222 # no work remains. The _work_THING methods each do a unit of work on
223 # THING. _queue_THING methods add a THING to the work queue.
225 if self._queued_file:
227 elif self._queued_dirents:
229 elif self._queued_trees:
234 def _work_file(self):
236 buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
240 self.finish_current_file()
242 self._queued_file.close()
243 self._close_file = None
244 self._queued_file = None
246 def _work_dirents(self):
247 path, stream_name, max_manifest_depth = self._queued_trees[0]
248 if stream_name != self.current_stream_name():
249 self.start_new_stream(stream_name)
250 while self._queued_dirents:
251 dirent = self._queued_dirents.popleft()
252 target = os.path.join(path, dirent)
253 if os.path.isdir(target):
254 self._queue_tree(target,
255 os.path.join(stream_name, dirent),
256 max_manifest_depth - 1)
258 self._queue_file(target, dirent)
260 if not self._queued_dirents:
261 self._queued_trees.popleft()
263 def _work_trees(self):
264 path, stream_name, max_manifest_depth = self._queued_trees[0]
265 make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
267 d = make_dirents(path)
269 self._queue_dirents(stream_name, d)
271 self._queued_trees.popleft()
273 def _queue_file(self, source, filename=None):
274 assert (self._queued_file is None), "tried to queue more than one file"
275 if not hasattr(source, 'read'):
276 source = open(source, 'rb')
277 self._close_file = True
279 self._close_file = False
281 filename = os.path.basename(source.name)
282 self.start_new_file(filename)
283 self._queued_file = source
285 def _queue_dirents(self, stream_name, dirents):
286 assert (not self._queued_dirents), "tried to queue more than one tree"
287 self._queued_dirents = deque(sorted(dirents))
289 def _queue_tree(self, path, stream_name, max_manifest_depth):
290 self._queued_trees.append((path, stream_name, max_manifest_depth))
292 def write_file(self, source, filename=None):
293 self._queue_file(source, filename)
294 self.do_queued_work()
296 def write_directory_tree(self,
297 path, stream_name='.', max_manifest_depth=-1):
298 self._queue_tree(path, stream_name, max_manifest_depth)
299 self.do_queued_work()
301 def write(self, newdata):
302 if hasattr(newdata, '__iter__'):
306 self._data_buffer += [newdata]
307 self._data_buffer_len += len(newdata)
308 self._current_stream_length += len(newdata)
309 while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
312 def flush_data(self):
313 data_buffer = ''.join(self._data_buffer)
314 if data_buffer != '':
315 self._prep_keep_client()
316 self._current_stream_locators.append(
317 self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
318 self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
319 self._data_buffer_len = len(self._data_buffer[0])
321 def start_new_file(self, newfilename=None):
322 self.finish_current_file()
323 self.set_current_file_name(newfilename)
325 def set_current_file_name(self, newfilename):
326 if re.search(r'[\t\n]', newfilename):
327 raise errors.AssertionError(
328 "Manifest filenames cannot contain whitespace: %s" %
330 self._current_file_name = newfilename
332 def current_file_name(self):
333 return self._current_file_name
335 def finish_current_file(self):
336 if self._current_file_name == None:
337 if self._current_file_pos == self._current_stream_length:
339 raise errors.AssertionError(
340 "Cannot finish an unnamed file " +
341 "(%d bytes at offset %d in '%s' stream)" %
342 (self._current_stream_length - self._current_file_pos,
343 self._current_file_pos,
344 self._current_stream_name))
345 self._current_stream_files += [[self._current_file_pos,
346 self._current_stream_length - self._current_file_pos,
347 self._current_file_name]]
348 self._current_file_pos = self._current_stream_length
350 def start_new_stream(self, newstreamname='.'):
351 self.finish_current_stream()
352 self.set_current_stream_name(newstreamname)
354 def set_current_stream_name(self, newstreamname):
355 if re.search(r'[\t\n]', newstreamname):
356 raise errors.AssertionError(
357 "Manifest stream names cannot contain whitespace")
358 self._current_stream_name = '.' if newstreamname=='' else newstreamname
360 def current_stream_name(self):
361 return self._current_stream_name
363 def finish_current_stream(self):
364 self.finish_current_file()
366 if len(self._current_stream_files) == 0:
368 elif self._current_stream_name == None:
369 raise errors.AssertionError(
370 "Cannot finish an unnamed stream (%d bytes in %d files)" %
371 (self._current_stream_length, len(self._current_stream_files)))
373 if len(self._current_stream_locators) == 0:
374 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
375 self._finished_streams += [[self._current_stream_name,
376 self._current_stream_locators,
377 self._current_stream_files]]
378 self._current_stream_files = []
379 self._current_stream_length = 0
380 self._current_stream_locators = []
381 self._current_stream_name = None
382 self._current_file_pos = 0
383 self._current_file_name = None
386 # Store the manifest in Keep and return its locator.
387 self._prep_keep_client()
388 return self._keep_client.put(self.manifest_text())
390 def stripped_manifest(self):
392 Return the manifest for the current collection with all permission
393 hints removed from the locators in the manifest.
395 raw = self.manifest_text()
397 for line in raw.split("\n"):
398 fields = line.split()
400 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
401 for x in fields[1:-1] ]
402 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
405 def manifest_text(self):
406 self.finish_current_stream()
409 for stream in self._finished_streams:
410 if not re.search(r'^\.(/.*)?$', stream[0]):
412 manifest += stream[0].replace(' ', '\\040')
413 manifest += ' ' + ' '.join(stream[1])
414 manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
417 if len(manifest) > 0:
418 return CollectionReader(manifest).manifest_text()
422 def data_locators(self):
424 for name, locators, files in self._finished_streams:
429 class ResumableCollectionWriter(CollectionWriter):
430 STATE_PROPS = ['_current_stream_files', '_current_stream_length',
431 '_current_stream_locators', '_current_stream_name',
432 '_current_file_name', '_current_file_pos', '_close_file',
433 '_data_buffer', '_dependencies', '_finished_streams',
434 '_queued_dirents', '_queued_trees']
436 def __init__(self, api_client=None):
437 self._dependencies = {}
438 super(ResumableCollectionWriter, self).__init__(api_client)
441 def from_state(cls, state, *init_args, **init_kwargs):
442 # Try to build a new writer from scratch with the given state.
443 # If the state is not suitable to resume (because files have changed,
444 # been deleted, aren't predictable, etc.), raise a
445 # StaleWriterStateError. Otherwise, return the initialized writer.
446 # The caller is responsible for calling writer.do_queued_work()
447 # appropriately after it's returned.
448 writer = cls(*init_args, **init_kwargs)
449 for attr_name in cls.STATE_PROPS:
450 attr_value = state[attr_name]
451 attr_class = getattr(writer, attr_name).__class__
452 # Coerce the value into the same type as the initial value, if
454 if attr_class not in (type(None), attr_value.__class__):
455 attr_value = attr_class(attr_value)
456 setattr(writer, attr_name, attr_value)
457 # Check dependencies before we try to resume anything.
458 if any(KeepLocator(ls).permission_expired()
459 for ls in writer._current_stream_locators):
460 raise errors.StaleWriterStateError(
461 "locators include expired permission hint")
462 writer.check_dependencies()
463 if state['_current_file'] is not None:
464 path, pos = state['_current_file']
466 writer._queued_file = open(path, 'rb')
467 writer._queued_file.seek(pos)
468 except IOError as error:
469 raise errors.StaleWriterStateError(
470 "failed to reopen active file {}: {}".format(path, error))
473 def check_dependencies(self):
474 for path, orig_stat in self._dependencies.items():
475 if not S_ISREG(orig_stat[ST_MODE]):
476 raise errors.StaleWriterStateError("{} not file".format(path))
478 now_stat = tuple(os.stat(path))
479 except OSError as error:
480 raise errors.StaleWriterStateError(
481 "failed to stat {}: {}".format(path, error))
482 if ((not S_ISREG(now_stat[ST_MODE])) or
483 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
484 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
485 raise errors.StaleWriterStateError("{} changed".format(path))
487 def dump_state(self, copy_func=lambda x: x):
488 state = {attr: copy_func(getattr(self, attr))
489 for attr in self.STATE_PROPS}
490 if self._queued_file is None:
491 state['_current_file'] = None
493 state['_current_file'] = (os.path.realpath(self._queued_file.name),
494 self._queued_file.tell())
497 def _queue_file(self, source, filename=None):
499 src_path = os.path.realpath(source)
501 raise errors.AssertionError("{} not a file path".format(source))
503 path_stat = os.stat(src_path)
504 except OSError as stat_error:
506 super(ResumableCollectionWriter, self)._queue_file(source, filename)
507 fd_stat = os.fstat(self._queued_file.fileno())
508 if not S_ISREG(fd_stat.st_mode):
509 # We won't be able to resume from this cache anyway, so don't
510 # worry about further checks.
511 self._dependencies[source] = tuple(fd_stat)
512 elif path_stat is None:
513 raise errors.AssertionError(
514 "could not stat {}: {}".format(source, stat_error))
515 elif path_stat.st_ino != fd_stat.st_ino:
516 raise errors.AssertionError(
517 "{} changed between open and stat calls".format(source))
519 self._dependencies[src_path] = tuple(fd_stat)
521 def write(self, data):
522 if self._queued_file is None:
523 raise errors.AssertionError(
524 "resumable writer can't accept unsourced data")
525 return super(ResumableCollectionWriter, self).write(data)