3706: Catch and return exceptions in _populate_* methods to dry up _populate()
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71
72 class CollectionBase(object):
73     def __enter__(self):
74         pass
75
76     def __exit__(self):
77         pass
78
79     def _my_keep(self):
80         if self._keep_client is None:
81             self._keep_client = KeepClient(api_client=self._api_client,
82                                            num_retries=self.num_retries)
83         return self._keep_client
84
85     def stripped_manifest(self):
86         """
87         Return the manifest for the current collection with all
88         non-portable hints (i.e., permission signatures and other
89         hints other than size hints) removed from the locators.
90         """
91         raw = self.manifest_text()
92         clean = ''
93         for line in raw.split("\n"):
94             fields = line.split()
95             if len(fields) > 0:
96                 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97                              for x in fields[1:-1] ]
98                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
99         return clean
100
101 class CollectionReader(CollectionBase):
102     def __init__(self, manifest_locator_or_text, api_client=None,
103                  keep_client=None, num_retries=0):
104         """Instantiate a CollectionReader.
105
106         This class parses Collection manifests to provide a simple interface
107         to read its underlying files.
108
109         Arguments:
110         * manifest_locator_or_text: One of a Collection UUID, portable data
111           hash, or full manifest text.
112         * api_client: The API client to use to look up Collections.  If not
113           provided, CollectionReader will build one from available Arvados
114           configuration.
115         * keep_client: The KeepClient to use to download Collection data.
116           If not provided, CollectionReader will build one from available
117           Arvados configuration.
118         * num_retries: The default number of times to retry failed
119           service requests.  Default 0.  You may change this value
120           after instantiation, but note those changes may not
121           propagate to related objects like the Keep client.
122         """
123         self._api_client = api_client
124         self._keep_client = keep_client
125         self.num_retries = num_retries
126         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
127             self._manifest_locator = manifest_locator_or_text
128             self._manifest_text = None
129         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
130             self._manifest_locator = manifest_locator_or_text
131             self._manifest_text = None
132         elif re.match(util.manifest_pattern, manifest_locator_or_text):
133             self._manifest_text = manifest_locator_or_text
134             self._manifest_locator = None
135         else:
136             raise errors.ArgumentError(
137                 "Argument to CollectionReader must be a manifest or a collection UUID")
138         self._streams = None
139
140     def _populate_from_api_server(self):
141         # As in KeepClient itself, we must wait until the last
142         # possible moment to instantiate an API client, in order to
143         # avoid tripping up clients that don't have access to an API
144         # server.  If we do build one, make sure our Keep client uses
145         # it.  If instantiation fails, we'll fall back to the except
146         # clause, just like any other Collection lookup
147         # failure. Return an exception, or None if successful.
148         try:
149             if self._api_client is None:
150                 self._api_client = arvados.api('v1')
151                 self._keep_client = None  # Make a new one with the new api.
152             c = self._api_client.collections().get(
153                 uuid=self._manifest_locator).execute(
154                 num_retries=self.num_retries)
155             self._manifest_text = c['manifest_text']
156             return None
157         except Exception as e:
158             return e
159
160     def _populate_from_keep(self):
161         # Retrieve a manifest directly from Keep. This has a chance of
162         # working if [a] the locator includes a permission signature
163         # or [b] the Keep services are operating in world-readable
164         # mode. Return an exception, or None if successful.
165         try:
166             self._manifest_text = self._my_keep().get(
167                 self._manifest_locator, num_retries=self.num_retries)
168         except Exception as e:
169             return e
170
171     def _populate(self):
172         if self._streams is not None:
173             return
174         error_via_api = None
175         error_via_keep = None
176         should_try_keep = (not self._manifest_text and
177                            util.keep_locator_pattern.match(
178                 self._manifest_locator))
179         if (not self._manifest_text and
180             util.signed_locator_pattern.match(self._manifest_locator)):
181             error_via_keep = self._populate_from_keep()
182         if not self._manifest_text:
183             error_via_api = self._populate_from_api_server()
184             if error_via_api != None and not should_try_keep:
185                 raise error_via_api
186         if (not self._manifest_text and
187             not error_via_keep and
188             should_try_keep):
189             # Looks like a keep locator, and we didn't already try keep above
190             error_via_keep = self._populate_from_keep()
191         if not self._manifest_text:
192             # Nothing worked!
193             raise arvados.errors.NotFoundError(
194                 ("Failed to retrieve collection '{}' " +
195                  "from either API server ({}) or Keep ({})."
196                  ).format(
197                     self._manifest_locator,
198                     error_via_api,
199                     error_via_keep))
200         self._streams = [sline.split()
201                          for sline in self._manifest_text.split("\n")
202                          if sline]
203
204     def normalize(self):
205         self._populate()
206
207         # Rearrange streams
208         streams = {}
209         for s in self.all_streams():
210             for f in s.all_files():
211                 filestream = s.name() + "/" + f.name()
212                 r = filestream.rindex("/")
213                 streamname = filestream[:r]
214                 filename = filestream[r+1:]
215                 if streamname not in streams:
216                     streams[streamname] = {}
217                 if filename not in streams[streamname]:
218                     streams[streamname][filename] = []
219                 for r in f.segments:
220                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
221
222         self._streams = []
223         sortedstreams = list(streams.keys())
224         sortedstreams.sort()
225         for s in sortedstreams:
226             self._streams.append(normalize_stream(s, streams[s]))
227
228         # Regenerate the manifest text based on the normalized streams
229         self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
230
231         return self
232
233     def all_streams(self):
234         self._populate()
235         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
236                 for s in self._streams]
237
238     def all_files(self):
239         for s in self.all_streams():
240             for f in s.all_files():
241                 yield f
242
243     def manifest_text(self, strip=False):
244         if strip:
245             return self.stripped_manifest()
246         else:
247             self._populate()
248             return self._manifest_text
249
250
251 class CollectionWriter(CollectionBase):
252     KEEP_BLOCK_SIZE = 2**26
253
254     def __init__(self, api_client=None, num_retries=0):
255         """Instantiate a CollectionWriter.
256
257         CollectionWriter lets you build a new Arvados Collection from scratch.
258         Write files to it.  The CollectionWriter will upload data to Keep as
259         appropriate, and provide you with the Collection manifest text when
260         you're finished.
261
262         Arguments:
263         * api_client: The API client to use to look up Collections.  If not
264           provided, CollectionReader will build one from available Arvados
265           configuration.
266         * num_retries: The default number of times to retry failed
267           service requests.  Default 0.  You may change this value
268           after instantiation, but note those changes may not
269           propagate to related objects like the Keep client.
270         """
271         self._api_client = api_client
272         self.num_retries = num_retries
273         self._keep_client = None
274         self._data_buffer = []
275         self._data_buffer_len = 0
276         self._current_stream_files = []
277         self._current_stream_length = 0
278         self._current_stream_locators = []
279         self._current_stream_name = '.'
280         self._current_file_name = None
281         self._current_file_pos = 0
282         self._finished_streams = []
283         self._close_file = None
284         self._queued_file = None
285         self._queued_dirents = deque()
286         self._queued_trees = deque()
287
288     def __exit__(self):
289         self.finish()
290
291     def do_queued_work(self):
292         # The work queue consists of three pieces:
293         # * _queued_file: The file object we're currently writing to the
294         #   Collection.
295         # * _queued_dirents: Entries under the current directory
296         #   (_queued_trees[0]) that we want to write or recurse through.
297         #   This may contain files from subdirectories if
298         #   max_manifest_depth == 0 for this directory.
299         # * _queued_trees: Directories that should be written as separate
300         #   streams to the Collection.
301         # This function handles the smallest piece of work currently queued
302         # (current file, then current directory, then next directory) until
303         # no work remains.  The _work_THING methods each do a unit of work on
304         # THING.  _queue_THING methods add a THING to the work queue.
305         while True:
306             if self._queued_file:
307                 self._work_file()
308             elif self._queued_dirents:
309                 self._work_dirents()
310             elif self._queued_trees:
311                 self._work_trees()
312             else:
313                 break
314
315     def _work_file(self):
316         while True:
317             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
318             if not buf:
319                 break
320             self.write(buf)
321         self.finish_current_file()
322         if self._close_file:
323             self._queued_file.close()
324         self._close_file = None
325         self._queued_file = None
326
327     def _work_dirents(self):
328         path, stream_name, max_manifest_depth = self._queued_trees[0]
329         if stream_name != self.current_stream_name():
330             self.start_new_stream(stream_name)
331         while self._queued_dirents:
332             dirent = self._queued_dirents.popleft()
333             target = os.path.join(path, dirent)
334             if os.path.isdir(target):
335                 self._queue_tree(target,
336                                  os.path.join(stream_name, dirent),
337                                  max_manifest_depth - 1)
338             else:
339                 self._queue_file(target, dirent)
340                 break
341         if not self._queued_dirents:
342             self._queued_trees.popleft()
343
344     def _work_trees(self):
345         path, stream_name, max_manifest_depth = self._queued_trees[0]
346         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
347                         else os.listdir)
348         d = make_dirents(path)
349         if len(d) > 0:
350             self._queue_dirents(stream_name, d)
351         else:
352             self._queued_trees.popleft()
353
354     def _queue_file(self, source, filename=None):
355         assert (self._queued_file is None), "tried to queue more than one file"
356         if not hasattr(source, 'read'):
357             source = open(source, 'rb')
358             self._close_file = True
359         else:
360             self._close_file = False
361         if filename is None:
362             filename = os.path.basename(source.name)
363         self.start_new_file(filename)
364         self._queued_file = source
365
366     def _queue_dirents(self, stream_name, dirents):
367         assert (not self._queued_dirents), "tried to queue more than one tree"
368         self._queued_dirents = deque(sorted(dirents))
369
370     def _queue_tree(self, path, stream_name, max_manifest_depth):
371         self._queued_trees.append((path, stream_name, max_manifest_depth))
372
373     def write_file(self, source, filename=None):
374         self._queue_file(source, filename)
375         self.do_queued_work()
376
377     def write_directory_tree(self,
378                              path, stream_name='.', max_manifest_depth=-1):
379         self._queue_tree(path, stream_name, max_manifest_depth)
380         self.do_queued_work()
381
382     def write(self, newdata):
383         if hasattr(newdata, '__iter__'):
384             for s in newdata:
385                 self.write(s)
386             return
387         self._data_buffer.append(newdata)
388         self._data_buffer_len += len(newdata)
389         self._current_stream_length += len(newdata)
390         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
391             self.flush_data()
392
393     def flush_data(self):
394         data_buffer = ''.join(self._data_buffer)
395         if data_buffer:
396             self._current_stream_locators.append(
397                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
398             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
399             self._data_buffer_len = len(self._data_buffer[0])
400
401     def start_new_file(self, newfilename=None):
402         self.finish_current_file()
403         self.set_current_file_name(newfilename)
404
405     def set_current_file_name(self, newfilename):
406         if re.search(r'[\t\n]', newfilename):
407             raise errors.AssertionError(
408                 "Manifest filenames cannot contain whitespace: %s" %
409                 newfilename)
410         self._current_file_name = newfilename
411
412     def current_file_name(self):
413         return self._current_file_name
414
415     def finish_current_file(self):
416         if self._current_file_name == None:
417             if self._current_file_pos == self._current_stream_length:
418                 return
419             raise errors.AssertionError(
420                 "Cannot finish an unnamed file " +
421                 "(%d bytes at offset %d in '%s' stream)" %
422                 (self._current_stream_length - self._current_file_pos,
423                  self._current_file_pos,
424                  self._current_stream_name))
425         self._current_stream_files.append([
426                 self._current_file_pos,
427                 self._current_stream_length - self._current_file_pos,
428                 self._current_file_name])
429         self._current_file_pos = self._current_stream_length
430         self._current_file_name = None
431
432     def start_new_stream(self, newstreamname='.'):
433         self.finish_current_stream()
434         self.set_current_stream_name(newstreamname)
435
436     def set_current_stream_name(self, newstreamname):
437         if re.search(r'[\t\n]', newstreamname):
438             raise errors.AssertionError(
439                 "Manifest stream names cannot contain whitespace")
440         self._current_stream_name = '.' if newstreamname=='' else newstreamname
441
442     def current_stream_name(self):
443         return self._current_stream_name
444
445     def finish_current_stream(self):
446         self.finish_current_file()
447         self.flush_data()
448         if not self._current_stream_files:
449             pass
450         elif self._current_stream_name is None:
451             raise errors.AssertionError(
452                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
453                 (self._current_stream_length, len(self._current_stream_files)))
454         else:
455             if not self._current_stream_locators:
456                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
457             self._finished_streams.append([self._current_stream_name,
458                                            self._current_stream_locators,
459                                            self._current_stream_files])
460         self._current_stream_files = []
461         self._current_stream_length = 0
462         self._current_stream_locators = []
463         self._current_stream_name = None
464         self._current_file_pos = 0
465         self._current_file_name = None
466
467     def finish(self):
468         # Store the manifest in Keep and return its locator.
469         return self._my_keep().put(self.manifest_text())
470
471     def portable_data_hash(self):
472         stripped = self.stripped_manifest()
473         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
474
475     def manifest_text(self):
476         self.finish_current_stream()
477         manifest = ''
478
479         for stream in self._finished_streams:
480             if not re.search(r'^\.(/.*)?$', stream[0]):
481                 manifest += './'
482             manifest += stream[0].replace(' ', '\\040')
483             manifest += ' ' + ' '.join(stream[1])
484             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
485             manifest += "\n"
486
487         if manifest:
488             return manifest
489         else:
490             return ""
491
492     def data_locators(self):
493         ret = []
494         for name, locators, files in self._finished_streams:
495             ret += locators
496         return ret
497
498
499 class ResumableCollectionWriter(CollectionWriter):
500     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
501                    '_current_stream_locators', '_current_stream_name',
502                    '_current_file_name', '_current_file_pos', '_close_file',
503                    '_data_buffer', '_dependencies', '_finished_streams',
504                    '_queued_dirents', '_queued_trees']
505
506     def __init__(self, api_client=None, num_retries=0):
507         self._dependencies = {}
508         super(ResumableCollectionWriter, self).__init__(
509             api_client, num_retries=num_retries)
510
511     @classmethod
512     def from_state(cls, state, *init_args, **init_kwargs):
513         # Try to build a new writer from scratch with the given state.
514         # If the state is not suitable to resume (because files have changed,
515         # been deleted, aren't predictable, etc.), raise a
516         # StaleWriterStateError.  Otherwise, return the initialized writer.
517         # The caller is responsible for calling writer.do_queued_work()
518         # appropriately after it's returned.
519         writer = cls(*init_args, **init_kwargs)
520         for attr_name in cls.STATE_PROPS:
521             attr_value = state[attr_name]
522             attr_class = getattr(writer, attr_name).__class__
523             # Coerce the value into the same type as the initial value, if
524             # needed.
525             if attr_class not in (type(None), attr_value.__class__):
526                 attr_value = attr_class(attr_value)
527             setattr(writer, attr_name, attr_value)
528         # Check dependencies before we try to resume anything.
529         if any(KeepLocator(ls).permission_expired()
530                for ls in writer._current_stream_locators):
531             raise errors.StaleWriterStateError(
532                 "locators include expired permission hint")
533         writer.check_dependencies()
534         if state['_current_file'] is not None:
535             path, pos = state['_current_file']
536             try:
537                 writer._queued_file = open(path, 'rb')
538                 writer._queued_file.seek(pos)
539             except IOError as error:
540                 raise errors.StaleWriterStateError(
541                     "failed to reopen active file {}: {}".format(path, error))
542         return writer
543
544     def check_dependencies(self):
545         for path, orig_stat in self._dependencies.items():
546             if not S_ISREG(orig_stat[ST_MODE]):
547                 raise errors.StaleWriterStateError("{} not file".format(path))
548             try:
549                 now_stat = tuple(os.stat(path))
550             except OSError as error:
551                 raise errors.StaleWriterStateError(
552                     "failed to stat {}: {}".format(path, error))
553             if ((not S_ISREG(now_stat[ST_MODE])) or
554                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
555                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
556                 raise errors.StaleWriterStateError("{} changed".format(path))
557
558     def dump_state(self, copy_func=lambda x: x):
559         state = {attr: copy_func(getattr(self, attr))
560                  for attr in self.STATE_PROPS}
561         if self._queued_file is None:
562             state['_current_file'] = None
563         else:
564             state['_current_file'] = (os.path.realpath(self._queued_file.name),
565                                       self._queued_file.tell())
566         return state
567
568     def _queue_file(self, source, filename=None):
569         try:
570             src_path = os.path.realpath(source)
571         except Exception:
572             raise errors.AssertionError("{} not a file path".format(source))
573         try:
574             path_stat = os.stat(src_path)
575         except OSError as stat_error:
576             path_stat = None
577         super(ResumableCollectionWriter, self)._queue_file(source, filename)
578         fd_stat = os.fstat(self._queued_file.fileno())
579         if not S_ISREG(fd_stat.st_mode):
580             # We won't be able to resume from this cache anyway, so don't
581             # worry about further checks.
582             self._dependencies[source] = tuple(fd_stat)
583         elif path_stat is None:
584             raise errors.AssertionError(
585                 "could not stat {}: {}".format(source, stat_error))
586         elif path_stat.st_ino != fd_stat.st_ino:
587             raise errors.AssertionError(
588                 "{} changed between open and stat calls".format(source))
589         else:
590             self._dependencies[src_path] = tuple(fd_stat)
591
592     def write(self, data):
593         if self._queued_file is None:
594             raise errors.AssertionError(
595                 "resumable writer can't accept unsourced data")
596         return super(ResumableCollectionWriter, self).write(data)