3198: Initial support appending to streams.
[arvados.git] / sdk / python / arvados / collection.py
1 import functools
2 import logging
3 import os
4 import re
5
6 from collections import deque
7 from stat import *
8
9 from .arvfile import ArvadosFileBase
10 from keep import *
11 from .stream import StreamReader, split
12 import config
13 import errors
14 import util
15
16 _logger = logging.getLogger('arvados.collection')
17
18 def normalize_stream(s, stream):
19     stream_tokens = [s]
20     sortedfiles = list(stream.keys())
21     sortedfiles.sort()
22
23     blocks = {}
24     streamoffset = 0L
25     for f in sortedfiles:
26         for b in stream[f]:
27             if b[arvados.LOCATOR] not in blocks:
28                 stream_tokens.append(b[arvados.LOCATOR])
29                 blocks[b[arvados.LOCATOR]] = streamoffset
30                 streamoffset += b[arvados.BLOCKSIZE]
31
32     if len(stream_tokens) == 1:
33         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
34
35     for f in sortedfiles:
36         current_span = None
37         fout = f.replace(' ', '\\040')
38         for segment in stream[f]:
39             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
40             if current_span is None:
41                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
42             else:
43                 if segmentoffset == current_span[1]:
44                     current_span[1] += segment[arvados.SEGMENTSIZE]
45                 else:
46                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
47                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
48
49         if current_span is not None:
50             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
51
52         if not stream[f]:
53             stream_tokens.append("0:0:{0}".format(fout))
54
55     return stream_tokens
56
57
58 class CollectionBase(object):
59     def __enter__(self):
60         return self
61
62     def __exit__(self, exc_type, exc_value, traceback):
63         pass
64
65     def _my_keep(self):
66         if self._keep_client is None:
67             self._keep_client = KeepClient(api_client=self._api_client,
68                                            num_retries=self.num_retries)
69         return self._keep_client
70
71     def stripped_manifest(self):
72         """
73         Return the manifest for the current collection with all
74         non-portable hints (i.e., permission signatures and other
75         hints other than size hints) removed from the locators.
76         """
77         raw = self.manifest_text()
78         clean = []
79         for line in raw.split("\n"):
80             fields = line.split()
81             if fields:
82                 clean_fields = fields[:1] + [
83                     (re.sub(r'\+[^\d][^\+]*', '', x)
84                      if re.match(util.keep_locator_pattern, x)
85                      else x)
86                     for x in fields[1:]]
87                 clean += [' '.join(clean_fields), "\n"]
88         return ''.join(clean)
89
90
91 class CollectionReader(CollectionBase):
92     def __init__(self, manifest_locator_or_text, api_client=None,
93                  keep_client=None, num_retries=0):
94         """Instantiate a CollectionReader.
95
96         This class parses Collection manifests to provide a simple interface
97         to read its underlying files.
98
99         Arguments:
100         * manifest_locator_or_text: One of a Collection UUID, portable data
101           hash, or full manifest text.
102         * api_client: The API client to use to look up Collections.  If not
103           provided, CollectionReader will build one from available Arvados
104           configuration.
105         * keep_client: The KeepClient to use to download Collection data.
106           If not provided, CollectionReader will build one from available
107           Arvados configuration.
108         * num_retries: The default number of times to retry failed
109           service requests.  Default 0.  You may change this value
110           after instantiation, but note those changes may not
111           propagate to related objects like the Keep client.
112         """
113         self._api_client = api_client
114         self._keep_client = keep_client
115         self.num_retries = num_retries
116         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
117             self._manifest_locator = manifest_locator_or_text
118             self._manifest_text = None
119         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
120             self._manifest_locator = manifest_locator_or_text
121             self._manifest_text = None
122         elif re.match(util.manifest_pattern, manifest_locator_or_text):
123             self._manifest_text = manifest_locator_or_text
124             self._manifest_locator = None
125         else:
126             raise errors.ArgumentError(
127                 "Argument to CollectionReader must be a manifest or a collection UUID")
128         self._api_response = None
129         self._streams = None
130
131     def _populate_from_api_server(self):
132         # As in KeepClient itself, we must wait until the last
133         # possible moment to instantiate an API client, in order to
134         # avoid tripping up clients that don't have access to an API
135         # server.  If we do build one, make sure our Keep client uses
136         # it.  If instantiation fails, we'll fall back to the except
137         # clause, just like any other Collection lookup
138         # failure. Return an exception, or None if successful.
139         try:
140             if self._api_client is None:
141                 self._api_client = arvados.api('v1')
142                 self._keep_client = None  # Make a new one with the new api.
143             self._api_response = self._api_client.collections().get(
144                 uuid=self._manifest_locator).execute(
145                 num_retries=self.num_retries)
146             self._manifest_text = self._api_response['manifest_text']
147             return None
148         except Exception as e:
149             return e
150
151     def _populate_from_keep(self):
152         # Retrieve a manifest directly from Keep. This has a chance of
153         # working if [a] the locator includes a permission signature
154         # or [b] the Keep services are operating in world-readable
155         # mode. Return an exception, or None if successful.
156         try:
157             self._manifest_text = self._my_keep().get(
158                 self._manifest_locator, num_retries=self.num_retries)
159         except Exception as e:
160             return e
161
162     def _populate(self):
163         error_via_api = None
164         error_via_keep = None
165         should_try_keep = ((self._manifest_text is None) and
166                            util.keep_locator_pattern.match(
167                 self._manifest_locator))
168         if ((self._manifest_text is None) and
169             util.signed_locator_pattern.match(self._manifest_locator)):
170             error_via_keep = self._populate_from_keep()
171         if self._manifest_text is None:
172             error_via_api = self._populate_from_api_server()
173             if error_via_api is not None and not should_try_keep:
174                 raise error_via_api
175         if ((self._manifest_text is None) and
176             not error_via_keep and
177             should_try_keep):
178             # Looks like a keep locator, and we didn't already try keep above
179             error_via_keep = self._populate_from_keep()
180         if self._manifest_text is None:
181             # Nothing worked!
182             raise arvados.errors.NotFoundError(
183                 ("Failed to retrieve collection '{}' " +
184                  "from either API server ({}) or Keep ({})."
185                  ).format(
186                     self._manifest_locator,
187                     error_via_api,
188                     error_via_keep))
189         self._streams = [sline.split()
190                          for sline in self._manifest_text.split("\n")
191                          if sline]
192
193     def _populate_first(orig_func):
194         # Decorator for methods that read actual Collection data.
195         @functools.wraps(orig_func)
196         def wrapper(self, *args, **kwargs):
197             if self._streams is None:
198                 self._populate()
199             return orig_func(self, *args, **kwargs)
200         return wrapper
201
202     @_populate_first
203     def api_response(self):
204         """api_response() -> dict or None
205
206         Returns information about this Collection fetched from the API server.
207         If the Collection exists in Keep but not the API server, currently
208         returns None.  Future versions may provide a synthetic response.
209         """
210         return self._api_response
211
212     @_populate_first
213     def normalize(self):
214         # Rearrange streams
215         streams = {}
216         for s in self.all_streams():
217             for f in s.all_files():
218                 streamname, filename = split(s.name() + "/" + f.name())
219                 if streamname not in streams:
220                     streams[streamname] = {}
221                 if filename not in streams[streamname]:
222                     streams[streamname][filename] = []
223                 for r in f.segments:
224                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
225
226         self._streams = [normalize_stream(s, streams[s])
227                          for s in sorted(streams)]
228
229         # Regenerate the manifest text based on the normalized streams
230         self._manifest_text = ''.join(
231             [StreamReader(stream, keep=self._my_keep()).manifest_text()
232              for stream in self._streams])
233
234     @_populate_first
235     def open(self, streampath, filename=None):
236         """open(streampath[, filename]) -> file-like object
237
238         Pass in the path of a file to read from the Collection, either as a
239         single string or as two separate stream name and file name arguments.
240         This method returns a file-like object to read that file.
241         """
242         if filename is None:
243             streampath, filename = split(streampath)
244         keep_client = self._my_keep()
245         for stream_s in self._streams:
246             stream = StreamReader(stream_s, keep_client,
247                                   num_retries=self.num_retries)
248             if stream.name() == streampath:
249                 break
250         else:
251             raise ValueError("stream '{}' not found in Collection".
252                              format(streampath))
253         try:
254             return stream.files()[filename]
255         except KeyError:
256             raise ValueError("file '{}' not found in Collection stream '{}'".
257                              format(filename, streampath))
258
259     @_populate_first
260     def all_streams(self):
261         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
262                 for s in self._streams]
263
264     def all_files(self):
265         for s in self.all_streams():
266             for f in s.all_files():
267                 yield f
268
269     @_populate_first
270     def manifest_text(self, strip=False, normalize=False):
271         if normalize:
272             cr = CollectionReader(self.manifest_text())
273             cr.normalize()
274             return cr.manifest_text(strip=strip, normalize=False)
275         elif strip:
276             return self.stripped_manifest()
277         else:
278             return self._manifest_text
279
280
281 class _WriterFile(ArvadosFileBase):
282     def __init__(self, coll_writer, name):
283         super(_WriterFile, self).__init__(name, 'wb')
284         self.dest = coll_writer
285
286     def close(self):
287         super(_WriterFile, self).close()
288         self.dest.finish_current_file()
289
290     @ArvadosFileBase._before_close
291     def write(self, data):
292         self.dest.write(data)
293
294     @ArvadosFileBase._before_close
295     def writelines(self, seq):
296         for data in seq:
297             self.write(data)
298
299     @ArvadosFileBase._before_close
300     def flush(self):
301         self.dest.flush_data()
302
303
304 class CollectionWriter(CollectionBase):
305     def __init__(self, api_client=None, num_retries=0):
306         """Instantiate a CollectionWriter.
307
308         CollectionWriter lets you build a new Arvados Collection from scratch.
309         Write files to it.  The CollectionWriter will upload data to Keep as
310         appropriate, and provide you with the Collection manifest text when
311         you're finished.
312
313         Arguments:
314         * api_client: The API client to use to look up Collections.  If not
315           provided, CollectionReader will build one from available Arvados
316           configuration.
317         * num_retries: The default number of times to retry failed
318           service requests.  Default 0.  You may change this value
319           after instantiation, but note those changes may not
320           propagate to related objects like the Keep client.
321         """
322         self._api_client = api_client
323         self.num_retries = num_retries
324         self._keep_client = None
325         self._data_buffer = []
326         self._data_buffer_len = 0
327         self._current_stream_files = []
328         self._current_stream_length = 0
329         self._current_stream_locators = []
330         self._current_stream_name = '.'
331         self._current_file_name = None
332         self._current_file_pos = 0
333         self._finished_streams = []
334         self._close_file = None
335         self._queued_file = None
336         self._queued_dirents = deque()
337         self._queued_trees = deque()
338         self._last_open = None
339
340     def __exit__(self, exc_type, exc_value, traceback):
341         if exc_type is None:
342             self.finish()
343
344     def do_queued_work(self):
345         # The work queue consists of three pieces:
346         # * _queued_file: The file object we're currently writing to the
347         #   Collection.
348         # * _queued_dirents: Entries under the current directory
349         #   (_queued_trees[0]) that we want to write or recurse through.
350         #   This may contain files from subdirectories if
351         #   max_manifest_depth == 0 for this directory.
352         # * _queued_trees: Directories that should be written as separate
353         #   streams to the Collection.
354         # This function handles the smallest piece of work currently queued
355         # (current file, then current directory, then next directory) until
356         # no work remains.  The _work_THING methods each do a unit of work on
357         # THING.  _queue_THING methods add a THING to the work queue.
358         while True:
359             if self._queued_file:
360                 self._work_file()
361             elif self._queued_dirents:
362                 self._work_dirents()
363             elif self._queued_trees:
364                 self._work_trees()
365             else:
366                 break
367
368     def _work_file(self):
369         while True:
370             buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
371             if not buf:
372                 break
373             self.write(buf)
374         self.finish_current_file()
375         if self._close_file:
376             self._queued_file.close()
377         self._close_file = None
378         self._queued_file = None
379
380     def _work_dirents(self):
381         path, stream_name, max_manifest_depth = self._queued_trees[0]
382         if stream_name != self.current_stream_name():
383             self.start_new_stream(stream_name)
384         while self._queued_dirents:
385             dirent = self._queued_dirents.popleft()
386             target = os.path.join(path, dirent)
387             if os.path.isdir(target):
388                 self._queue_tree(target,
389                                  os.path.join(stream_name, dirent),
390                                  max_manifest_depth - 1)
391             else:
392                 self._queue_file(target, dirent)
393                 break
394         if not self._queued_dirents:
395             self._queued_trees.popleft()
396
397     def _work_trees(self):
398         path, stream_name, max_manifest_depth = self._queued_trees[0]
399         d = util.listdir_recursive(
400             path, max_depth = (None if max_manifest_depth == 0 else 0))
401         if d:
402             self._queue_dirents(stream_name, d)
403         else:
404             self._queued_trees.popleft()
405
406     def _queue_file(self, source, filename=None):
407         assert (self._queued_file is None), "tried to queue more than one file"
408         if not hasattr(source, 'read'):
409             source = open(source, 'rb')
410             self._close_file = True
411         else:
412             self._close_file = False
413         if filename is None:
414             filename = os.path.basename(source.name)
415         self.start_new_file(filename)
416         self._queued_file = source
417
418     def _queue_dirents(self, stream_name, dirents):
419         assert (not self._queued_dirents), "tried to queue more than one tree"
420         self._queued_dirents = deque(sorted(dirents))
421
422     def _queue_tree(self, path, stream_name, max_manifest_depth):
423         self._queued_trees.append((path, stream_name, max_manifest_depth))
424
425     def write_file(self, source, filename=None):
426         self._queue_file(source, filename)
427         self.do_queued_work()
428
429     def write_directory_tree(self,
430                              path, stream_name='.', max_manifest_depth=-1):
431         self._queue_tree(path, stream_name, max_manifest_depth)
432         self.do_queued_work()
433
434     def write(self, newdata):
435         if hasattr(newdata, '__iter__'):
436             for s in newdata:
437                 self.write(s)
438             return
439         self._data_buffer.append(newdata)
440         self._data_buffer_len += len(newdata)
441         self._current_stream_length += len(newdata)
442         while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
443             self.flush_data()
444
445     def open(self, streampath, filename=None):
446         """open(streampath[, filename]) -> file-like object
447
448         Pass in the path of a file to write to the Collection, either as a
449         single string or as two separate stream name and file name arguments.
450         This method returns a file-like object you can write to add it to the
451         Collection.
452
453         You may only have one file object from the Collection open at a time,
454         so be sure to close the object when you're done.  Using the object in
455         a with statement makes that easy::
456
457           with cwriter.open('./doc/page1.txt') as outfile:
458               outfile.write(page1_data)
459           with cwriter.open('./doc/page2.txt') as outfile:
460               outfile.write(page2_data)
461         """
462         if filename is None:
463             streampath, filename = split(streampath)
464         if self._last_open and not self._last_open.closed:
465             raise errors.AssertionError(
466                 "can't open '{}' when '{}' is still open".format(
467                     filename, self._last_open.name))
468         if streampath != self.current_stream_name():
469             self.start_new_stream(streampath)
470         self.set_current_file_name(filename)
471         self._last_open = _WriterFile(self, filename)
472         return self._last_open
473
474     def flush_data(self):
475         data_buffer = ''.join(self._data_buffer)
476         if data_buffer:
477             self._current_stream_locators.append(
478                 self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
479             self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
480             self._data_buffer_len = len(self._data_buffer[0])
481
482     def start_new_file(self, newfilename=None):
483         self.finish_current_file()
484         self.set_current_file_name(newfilename)
485
486     def set_current_file_name(self, newfilename):
487         if re.search(r'[\t\n]', newfilename):
488             raise errors.AssertionError(
489                 "Manifest filenames cannot contain whitespace: %s" %
490                 newfilename)
491         elif re.search(r'\x00', newfilename):
492             raise errors.AssertionError(
493                 "Manifest filenames cannot contain NUL characters: %s" %
494                 newfilename)
495         self._current_file_name = newfilename
496
497     def current_file_name(self):
498         return self._current_file_name
499
500     def finish_current_file(self):
501         if self._current_file_name is None:
502             if self._current_file_pos == self._current_stream_length:
503                 return
504             raise errors.AssertionError(
505                 "Cannot finish an unnamed file " +
506                 "(%d bytes at offset %d in '%s' stream)" %
507                 (self._current_stream_length - self._current_file_pos,
508                  self._current_file_pos,
509                  self._current_stream_name))
510         self._current_stream_files.append([
511                 self._current_file_pos,
512                 self._current_stream_length - self._current_file_pos,
513                 self._current_file_name])
514         self._current_file_pos = self._current_stream_length
515         self._current_file_name = None
516
517     def start_new_stream(self, newstreamname='.'):
518         self.finish_current_stream()
519         self.set_current_stream_name(newstreamname)
520
521     def set_current_stream_name(self, newstreamname):
522         if re.search(r'[\t\n]', newstreamname):
523             raise errors.AssertionError(
524                 "Manifest stream names cannot contain whitespace")
525         self._current_stream_name = '.' if newstreamname=='' else newstreamname
526
527     def current_stream_name(self):
528         return self._current_stream_name
529
530     def finish_current_stream(self):
531         self.finish_current_file()
532         self.flush_data()
533         if not self._current_stream_files:
534             pass
535         elif self._current_stream_name is None:
536             raise errors.AssertionError(
537                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
538                 (self._current_stream_length, len(self._current_stream_files)))
539         else:
540             if not self._current_stream_locators:
541                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
542             self._finished_streams.append([self._current_stream_name,
543                                            self._current_stream_locators,
544                                            self._current_stream_files])
545         self._current_stream_files = []
546         self._current_stream_length = 0
547         self._current_stream_locators = []
548         self._current_stream_name = None
549         self._current_file_pos = 0
550         self._current_file_name = None
551
552     def finish(self):
553         # Store the manifest in Keep and return its locator.
554         return self._my_keep().put(self.manifest_text())
555
556     def portable_data_hash(self):
557         stripped = self.stripped_manifest()
558         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
559
560     def manifest_text(self):
561         self.finish_current_stream()
562         manifest = ''
563
564         for stream in self._finished_streams:
565             if not re.search(r'^\.(/.*)?$', stream[0]):
566                 manifest += './'
567             manifest += stream[0].replace(' ', '\\040')
568             manifest += ' ' + ' '.join(stream[1])
569             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
570             manifest += "\n"
571
572         return manifest
573
574     def data_locators(self):
575         ret = []
576         for name, locators, files in self._finished_streams:
577             ret += locators
578         return ret
579
580
581 class ResumableCollectionWriter(CollectionWriter):
582     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
583                    '_current_stream_locators', '_current_stream_name',
584                    '_current_file_name', '_current_file_pos', '_close_file',
585                    '_data_buffer', '_dependencies', '_finished_streams',
586                    '_queued_dirents', '_queued_trees']
587
588     def __init__(self, api_client=None, num_retries=0):
589         self._dependencies = {}
590         super(ResumableCollectionWriter, self).__init__(
591             api_client, num_retries=num_retries)
592
593     @classmethod
594     def from_state(cls, state, *init_args, **init_kwargs):
595         # Try to build a new writer from scratch with the given state.
596         # If the state is not suitable to resume (because files have changed,
597         # been deleted, aren't predictable, etc.), raise a
598         # StaleWriterStateError.  Otherwise, return the initialized writer.
599         # The caller is responsible for calling writer.do_queued_work()
600         # appropriately after it's returned.
601         writer = cls(*init_args, **init_kwargs)
602         for attr_name in cls.STATE_PROPS:
603             attr_value = state[attr_name]
604             attr_class = getattr(writer, attr_name).__class__
605             # Coerce the value into the same type as the initial value, if
606             # needed.
607             if attr_class not in (type(None), attr_value.__class__):
608                 attr_value = attr_class(attr_value)
609             setattr(writer, attr_name, attr_value)
610         # Check dependencies before we try to resume anything.
611         if any(KeepLocator(ls).permission_expired()
612                for ls in writer._current_stream_locators):
613             raise errors.StaleWriterStateError(
614                 "locators include expired permission hint")
615         writer.check_dependencies()
616         if state['_current_file'] is not None:
617             path, pos = state['_current_file']
618             try:
619                 writer._queued_file = open(path, 'rb')
620                 writer._queued_file.seek(pos)
621             except IOError as error:
622                 raise errors.StaleWriterStateError(
623                     "failed to reopen active file {}: {}".format(path, error))
624         return writer
625
626     def check_dependencies(self):
627         for path, orig_stat in self._dependencies.items():
628             if not S_ISREG(orig_stat[ST_MODE]):
629                 raise errors.StaleWriterStateError("{} not file".format(path))
630             try:
631                 now_stat = tuple(os.stat(path))
632             except OSError as error:
633                 raise errors.StaleWriterStateError(
634                     "failed to stat {}: {}".format(path, error))
635             if ((not S_ISREG(now_stat[ST_MODE])) or
636                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
637                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
638                 raise errors.StaleWriterStateError("{} changed".format(path))
639
640     def dump_state(self, copy_func=lambda x: x):
641         state = {attr: copy_func(getattr(self, attr))
642                  for attr in self.STATE_PROPS}
643         if self._queued_file is None:
644             state['_current_file'] = None
645         else:
646             state['_current_file'] = (os.path.realpath(self._queued_file.name),
647                                       self._queued_file.tell())
648         return state
649
650     def _queue_file(self, source, filename=None):
651         try:
652             src_path = os.path.realpath(source)
653         except Exception:
654             raise errors.AssertionError("{} not a file path".format(source))
655         try:
656             path_stat = os.stat(src_path)
657         except OSError as stat_error:
658             path_stat = None
659         super(ResumableCollectionWriter, self)._queue_file(source, filename)
660         fd_stat = os.fstat(self._queued_file.fileno())
661         if not S_ISREG(fd_stat.st_mode):
662             # We won't be able to resume from this cache anyway, so don't
663             # worry about further checks.
664             self._dependencies[source] = tuple(fd_stat)
665         elif path_stat is None:
666             raise errors.AssertionError(
667                 "could not stat {}: {}".format(source, stat_error))
668         elif path_stat.st_ino != fd_stat.st_ino:
669             raise errors.AssertionError(
670                 "{} changed between open and stat calls".format(source))
671         else:
672             self._dependencies[src_path] = tuple(fd_stat)
673
674     def write(self, data):
675         if self._queued_file is None:
676             raise errors.AssertionError(
677                 "resumable writer can't accept unsourced data")
678         return super(ResumableCollectionWriter, self).write(data)