Merge branch '3147-py-collection-retries-wip2'
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionBase(object):
95     def __enter__(self):
96         pass
97
98     def __exit__(self):
99         pass
100
101     def _my_keep(self):
102         if self._keep_client is None:
103             self._keep_client = KeepClient(api_client=self._api_client,
104                                            num_retries=self.num_retries)
105         return self._keep_client
106
107
108 class CollectionReader(CollectionBase):
109     def __init__(self, manifest_locator_or_text, api_client=None,
110                  keep_client=None, num_retries=0):
111         """Instantiate a CollectionReader.
112
113         This class parses Collection manifests to provide a simple interface
114         to read its underlying files.
115
116         Arguments:
117         * manifest_locator_or_text: One of a Collection UUID, portable data
118           hash, or full manifest text.
119         * api_client: The API client to use to look up Collections.  If not
120           provided, CollectionReader will build one from available Arvados
121           configuration.
122         * keep_client: The KeepClient to use to download Collection data.
123           If not provided, CollectionReader will build one from available
124           Arvados configuration.
125         * num_retries: The default number of times to retry failed
126           service requests.  Default 0.  You may change this value
127           after instantiation, but note those changes may not
128           propagate to related objects like the Keep client.
129         """
130         self._api_client = api_client
131         self._keep_client = keep_client
132         self.num_retries = num_retries
133         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
134             self._manifest_locator = manifest_locator_or_text
135             self._manifest_text = None
136         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
137             self._manifest_locator = manifest_locator_or_text
138             self._manifest_text = None
139         elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
140             self._manifest_text = manifest_locator_or_text
141             self._manifest_locator = None
142         else:
143             raise errors.ArgumentError(
144                 "Argument to CollectionReader must be a manifest or a collection UUID")
145         self._streams = None
146
147     def _populate(self):
148         if self._streams is not None:
149             return
150         if not self._manifest_text:
151             try:
152                 # As in KeepClient itself, we must wait until the last possible
153                 # moment to instantiate an API client, in order to avoid
154                 # tripping up clients that don't have access to an API server.
155                 # If we do build one, make sure our Keep client uses it.
156                 # If instantiation fails, we'll fall back to the except clause,
157                 # just like any other Collection lookup failure.
158                 if self._api_client is None:
159                     self._api_client = arvados.api('v1')
160                     self._keep_client = None  # Make a new one with the new api.
161                 c = self._api_client.collections().get(
162                     uuid=self._manifest_locator).execute(
163                     num_retries=self.num_retries)
164                 self._manifest_text = c['manifest_text']
165             except Exception as e:
166                 if not util.portable_data_hash_pattern.match(
167                       self._manifest_locator):
168                     raise
169                 _logger.warning(
170                     "API server did not return Collection '%s'. " +
171                     "Trying to fetch directly from Keep (deprecated).",
172                     self._manifest_locator)
173                 self._manifest_text = self._my_keep().get(
174                     self._manifest_locator, num_retries=self.num_retries)
175         self._streams = [sline.split()
176                          for sline in self._manifest_text.split("\n")
177                          if sline]
178         self._streams = normalize(self)
179
180         # now regenerate the manifest text based on the normalized stream
181
182         #print "normalizing", self._manifest_text
183         self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
184         #print "result", self._manifest_text
185
186
187     def all_streams(self):
188         self._populate()
189         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
190                 for s in self._streams]
191
192     def all_files(self):
193         for s in self.all_streams():
194             for f in s.all_files():
195                 yield f
196
197     def manifest_text(self, strip=False):
198         self._populate()
199         if strip:
200             m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
201             return m
202         else:
203             return self._manifest_text
204
205
206 class CollectionWriter(CollectionBase):
207     KEEP_BLOCK_SIZE = 2**26
208
209     def __init__(self, api_client=None, num_retries=0):
210         """Instantiate a CollectionWriter.
211
212         CollectionWriter lets you build a new Arvados Collection from scratch.
213         Write files to it.  The CollectionWriter will upload data to Keep as
214         appropriate, and provide you with the Collection manifest text when
215         you're finished.
216
217         Arguments:
218         * api_client: The API client to use to look up Collections.  If not
219           provided, CollectionReader will build one from available Arvados
220           configuration.
221         * num_retries: The default number of times to retry failed
222           service requests.  Default 0.  You may change this value
223           after instantiation, but note those changes may not
224           propagate to related objects like the Keep client.
225         """
226         self._api_client = api_client
227         self.num_retries = num_retries
228         self._keep_client = None
229         self._data_buffer = []
230         self._data_buffer_len = 0
231         self._current_stream_files = []
232         self._current_stream_length = 0
233         self._current_stream_locators = []
234         self._current_stream_name = '.'
235         self._current_file_name = None
236         self._current_file_pos = 0
237         self._finished_streams = []
238         self._close_file = None
239         self._queued_file = None
240         self._queued_dirents = deque()
241         self._queued_trees = deque()
242
243     def __exit__(self):
244         self.finish()
245
246     def do_queued_work(self):
247         # The work queue consists of three pieces:
248         # * _queued_file: The file object we're currently writing to the
249         #   Collection.
250         # * _queued_dirents: Entries under the current directory
251         #   (_queued_trees[0]) that we want to write or recurse through.
252         #   This may contain files from subdirectories if
253         #   max_manifest_depth == 0 for this directory.
254         # * _queued_trees: Directories that should be written as separate
255         #   streams to the Collection.
256         # This function handles the smallest piece of work currently queued
257         # (current file, then current directory, then next directory) until
258         # no work remains.  The _work_THING methods each do a unit of work on
259         # THING.  _queue_THING methods add a THING to the work queue.
260         while True:
261             if self._queued_file:
262                 self._work_file()
263             elif self._queued_dirents:
264                 self._work_dirents()
265             elif self._queued_trees:
266                 self._work_trees()
267             else:
268                 break
269
270     def _work_file(self):
271         while True:
272             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
273             if not buf:
274                 break
275             self.write(buf)
276         self.finish_current_file()
277         if self._close_file:
278             self._queued_file.close()
279         self._close_file = None
280         self._queued_file = None
281
282     def _work_dirents(self):
283         path, stream_name, max_manifest_depth = self._queued_trees[0]
284         if stream_name != self.current_stream_name():
285             self.start_new_stream(stream_name)
286         while self._queued_dirents:
287             dirent = self._queued_dirents.popleft()
288             target = os.path.join(path, dirent)
289             if os.path.isdir(target):
290                 self._queue_tree(target,
291                                  os.path.join(stream_name, dirent),
292                                  max_manifest_depth - 1)
293             else:
294                 self._queue_file(target, dirent)
295                 break
296         if not self._queued_dirents:
297             self._queued_trees.popleft()
298
299     def _work_trees(self):
300         path, stream_name, max_manifest_depth = self._queued_trees[0]
301         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
302                         else os.listdir)
303         d = make_dirents(path)
304         if len(d) > 0:
305             self._queue_dirents(stream_name, d)
306         else:
307             self._queued_trees.popleft()
308
309     def _queue_file(self, source, filename=None):
310         assert (self._queued_file is None), "tried to queue more than one file"
311         if not hasattr(source, 'read'):
312             source = open(source, 'rb')
313             self._close_file = True
314         else:
315             self._close_file = False
316         if filename is None:
317             filename = os.path.basename(source.name)
318         self.start_new_file(filename)
319         self._queued_file = source
320
321     def _queue_dirents(self, stream_name, dirents):
322         assert (not self._queued_dirents), "tried to queue more than one tree"
323         self._queued_dirents = deque(sorted(dirents))
324
325     def _queue_tree(self, path, stream_name, max_manifest_depth):
326         self._queued_trees.append((path, stream_name, max_manifest_depth))
327
328     def write_file(self, source, filename=None):
329         self._queue_file(source, filename)
330         self.do_queued_work()
331
332     def write_directory_tree(self,
333                              path, stream_name='.', max_manifest_depth=-1):
334         self._queue_tree(path, stream_name, max_manifest_depth)
335         self.do_queued_work()
336
337     def write(self, newdata):
338         if hasattr(newdata, '__iter__'):
339             for s in newdata:
340                 self.write(s)
341             return
342         self._data_buffer.append(newdata)
343         self._data_buffer_len += len(newdata)
344         self._current_stream_length += len(newdata)
345         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
346             self.flush_data()
347
348     def flush_data(self):
349         data_buffer = ''.join(self._data_buffer)
350         if data_buffer:
351             self._current_stream_locators.append(
352                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
353             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
354             self._data_buffer_len = len(self._data_buffer[0])
355
356     def start_new_file(self, newfilename=None):
357         self.finish_current_file()
358         self.set_current_file_name(newfilename)
359
360     def set_current_file_name(self, newfilename):
361         if re.search(r'[\t\n]', newfilename):
362             raise errors.AssertionError(
363                 "Manifest filenames cannot contain whitespace: %s" %
364                 newfilename)
365         self._current_file_name = newfilename
366
367     def current_file_name(self):
368         return self._current_file_name
369
370     def finish_current_file(self):
371         if self._current_file_name == None:
372             if self._current_file_pos == self._current_stream_length:
373                 return
374             raise errors.AssertionError(
375                 "Cannot finish an unnamed file " +
376                 "(%d bytes at offset %d in '%s' stream)" %
377                 (self._current_stream_length - self._current_file_pos,
378                  self._current_file_pos,
379                  self._current_stream_name))
380         self._current_stream_files.append([
381                 self._current_file_pos,
382                 self._current_stream_length - self._current_file_pos,
383                 self._current_file_name])
384         self._current_file_pos = self._current_stream_length
385
386     def start_new_stream(self, newstreamname='.'):
387         self.finish_current_stream()
388         self.set_current_stream_name(newstreamname)
389
390     def set_current_stream_name(self, newstreamname):
391         if re.search(r'[\t\n]', newstreamname):
392             raise errors.AssertionError(
393                 "Manifest stream names cannot contain whitespace")
394         self._current_stream_name = '.' if newstreamname=='' else newstreamname
395
396     def current_stream_name(self):
397         return self._current_stream_name
398
399     def finish_current_stream(self):
400         self.finish_current_file()
401         self.flush_data()
402         if not self._current_stream_files:
403             pass
404         elif self._current_stream_name is None:
405             raise errors.AssertionError(
406                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
407                 (self._current_stream_length, len(self._current_stream_files)))
408         else:
409             if not self._current_stream_locators:
410                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
411             self._finished_streams.append([self._current_stream_name,
412                                            self._current_stream_locators,
413                                            self._current_stream_files])
414         self._current_stream_files = []
415         self._current_stream_length = 0
416         self._current_stream_locators = []
417         self._current_stream_name = None
418         self._current_file_pos = 0
419         self._current_file_name = None
420
421     def finish(self):
422         # Store the manifest in Keep and return its locator.
423         return self._my_keep().put(self.manifest_text())
424
425     def stripped_manifest(self):
426         """
427         Return the manifest for the current collection with all permission
428         hints removed from the locators in the manifest.
429         """
430         raw = self.manifest_text()
431         clean = ''
432         for line in raw.split("\n"):
433             fields = line.split()
434             if len(fields) > 0:
435                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
436                              for x in fields[1:-1] ]
437                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
438         return clean
439
440     def manifest_text(self):
441         self.finish_current_stream()
442         manifest = ''
443
444         for stream in self._finished_streams:
445             if not re.search(r'^\.(/.*)?$', stream[0]):
446                 manifest += './'
447             manifest += stream[0].replace(' ', '\\040')
448             manifest += ' ' + ' '.join(stream[1])
449             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
450             manifest += "\n"
451
452         if manifest:
453             return CollectionReader(manifest, self._api_client).manifest_text()
454         else:
455             return ""
456
457     def data_locators(self):
458         ret = []
459         for name, locators, files in self._finished_streams:
460             ret += locators
461         return ret
462
463
464 class ResumableCollectionWriter(CollectionWriter):
465     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
466                    '_current_stream_locators', '_current_stream_name',
467                    '_current_file_name', '_current_file_pos', '_close_file',
468                    '_data_buffer', '_dependencies', '_finished_streams',
469                    '_queued_dirents', '_queued_trees']
470
471     def __init__(self, api_client=None, num_retries=0):
472         self._dependencies = {}
473         super(ResumableCollectionWriter, self).__init__(
474             api_client, num_retries=num_retries)
475
476     @classmethod
477     def from_state(cls, state, *init_args, **init_kwargs):
478         # Try to build a new writer from scratch with the given state.
479         # If the state is not suitable to resume (because files have changed,
480         # been deleted, aren't predictable, etc.), raise a
481         # StaleWriterStateError.  Otherwise, return the initialized writer.
482         # The caller is responsible for calling writer.do_queued_work()
483         # appropriately after it's returned.
484         writer = cls(*init_args, **init_kwargs)
485         for attr_name in cls.STATE_PROPS:
486             attr_value = state[attr_name]
487             attr_class = getattr(writer, attr_name).__class__
488             # Coerce the value into the same type as the initial value, if
489             # needed.
490             if attr_class not in (type(None), attr_value.__class__):
491                 attr_value = attr_class(attr_value)
492             setattr(writer, attr_name, attr_value)
493         # Check dependencies before we try to resume anything.
494         if any(KeepLocator(ls).permission_expired()
495                for ls in writer._current_stream_locators):
496             raise errors.StaleWriterStateError(
497                 "locators include expired permission hint")
498         writer.check_dependencies()
499         if state['_current_file'] is not None:
500             path, pos = state['_current_file']
501             try:
502                 writer._queued_file = open(path, 'rb')
503                 writer._queued_file.seek(pos)
504             except IOError as error:
505                 raise errors.StaleWriterStateError(
506                     "failed to reopen active file {}: {}".format(path, error))
507         return writer
508
509     def check_dependencies(self):
510         for path, orig_stat in self._dependencies.items():
511             if not S_ISREG(orig_stat[ST_MODE]):
512                 raise errors.StaleWriterStateError("{} not file".format(path))
513             try:
514                 now_stat = tuple(os.stat(path))
515             except OSError as error:
516                 raise errors.StaleWriterStateError(
517                     "failed to stat {}: {}".format(path, error))
518             if ((not S_ISREG(now_stat[ST_MODE])) or
519                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
520                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
521                 raise errors.StaleWriterStateError("{} changed".format(path))
522
523     def dump_state(self, copy_func=lambda x: x):
524         state = {attr: copy_func(getattr(self, attr))
525                  for attr in self.STATE_PROPS}
526         if self._queued_file is None:
527             state['_current_file'] = None
528         else:
529             state['_current_file'] = (os.path.realpath(self._queued_file.name),
530                                       self._queued_file.tell())
531         return state
532
533     def _queue_file(self, source, filename=None):
534         try:
535             src_path = os.path.realpath(source)
536         except Exception:
537             raise errors.AssertionError("{} not a file path".format(source))
538         try:
539             path_stat = os.stat(src_path)
540         except OSError as stat_error:
541             path_stat = None
542         super(ResumableCollectionWriter, self)._queue_file(source, filename)
543         fd_stat = os.fstat(self._queued_file.fileno())
544         if not S_ISREG(fd_stat.st_mode):
545             # We won't be able to resume from this cache anyway, so don't
546             # worry about further checks.
547             self._dependencies[source] = tuple(fd_stat)
548         elif path_stat is None:
549             raise errors.AssertionError(
550                 "could not stat {}: {}".format(source, stat_error))
551         elif path_stat.st_ino != fd_stat.st_ino:
552             raise errors.AssertionError(
553                 "{} changed between open and stat calls".format(source))
554         else:
555             self._dependencies[src_path] = tuple(fd_stat)
556
557     def write(self, data):
558         if self._queued_file is None:
559             raise errors.AssertionError(
560                 "resumable writer can't accept unsourced data")
561         return super(ResumableCollectionWriter, self).write(data)