3706: Merge branch 'master' into 3706-keep-warning
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71
72 class CollectionBase(object):
73     def __enter__(self):
74         pass
75
76     def __exit__(self):
77         pass
78
79     def _my_keep(self):
80         if self._keep_client is None:
81             self._keep_client = KeepClient(api_client=self._api_client,
82                                            num_retries=self.num_retries)
83         return self._keep_client
84
85     def stripped_manifest(self):
86         """
87         Return the manifest for the current collection with all
88         non-portable hints (i.e., permission signatures and other
89         hints other than size hints) removed from the locators.
90         """
91         raw = self.manifest_text()
92         clean = ''
93         for line in raw.split("\n"):
94             fields = line.split()
95             if len(fields) > 0:
96                 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97                              for x in fields[1:-1] ]
98                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
99         return clean
100
101 class CollectionReader(CollectionBase):
102     def __init__(self, manifest_locator_or_text, api_client=None,
103                  keep_client=None, num_retries=0):
104         """Instantiate a CollectionReader.
105
106         This class parses Collection manifests to provide a simple interface
107         to read its underlying files.
108
109         Arguments:
110         * manifest_locator_or_text: One of a Collection UUID, portable data
111           hash, or full manifest text.
112         * api_client: The API client to use to look up Collections.  If not
113           provided, CollectionReader will build one from available Arvados
114           configuration.
115         * keep_client: The KeepClient to use to download Collection data.
116           If not provided, CollectionReader will build one from available
117           Arvados configuration.
118         * num_retries: The default number of times to retry failed
119           service requests.  Default 0.  You may change this value
120           after instantiation, but note those changes may not
121           propagate to related objects like the Keep client.
122         """
123         self._api_client = api_client
124         self._keep_client = keep_client
125         self.num_retries = num_retries
126         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
127             self._manifest_locator = manifest_locator_or_text
128             self._manifest_text = None
129         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
130             self._manifest_locator = manifest_locator_or_text
131             self._manifest_text = None
132         elif re.match(util.manifest_pattern, manifest_locator_or_text):
133             self._manifest_text = manifest_locator_or_text
134             self._manifest_locator = None
135         else:
136             raise errors.ArgumentError(
137                 "Argument to CollectionReader must be a manifest or a collection UUID")
138         self._streams = None
139
140     def _populate_from_api_server(self):
141         # As in KeepClient itself, we must wait until the last possible
142         # moment to instantiate an API client, in order to avoid
143         # tripping up clients that don't have access to an API server.
144         # If we do build one, make sure our Keep client uses it.
145         # If instantiation fails, we'll fall back to the except clause,
146         # just like any other Collection lookup failure.
147         if self._api_client is None:
148             self._api_client = arvados.api('v1')
149             self._keep_client = None  # Make a new one with the new api.
150         c = self._api_client.collections().get(
151             uuid=self._manifest_locator).execute(
152             num_retries=self.num_retries)
153         self._manifest_text = c['manifest_text']
154
155     def _populate_from_keep(self):
156         # Retrieve a manifest directly from Keep. This has a chance of
157         # working if [a] the locator includes a permission signature
158         # or [b] the Keep services are operating in world-readable
159         # mode.
160         self._manifest_text = self._my_keep().get(
161             self._manifest_locator, num_retries=self.num_retries)
162
163     def _populate(self):
164         if self._streams is not None:
165             return
166         error_via_api = None
167         error_via_keep = None
168         should_try_keep = (not self._manifest_text and
169                            util.keep_locator_pattern.match(
170                 self._manifest_locator))
171         if (not self._manifest_text and
172             util.signed_locator_pattern.match(self._manifest_locator)):
173             try:
174                 self._populate_from_keep()
175             except e:
176                 error_via_keep = e
177         if not self._manifest_text:
178             try:
179                 self._populate_from_api_server()
180             except Exception as e:
181                 if not should_try_keep:
182                     raise
183                 error_via_api = e
184         if (not self._manifest_text and
185             not error_via_keep and
186             should_try_keep):
187             # Looks like a keep locator, and we didn't already try keep above
188             try:
189                 self._populate_from_keep()
190             except Exception as e:
191                 error_via_keep = e
192         if not self._manifest_text:
193             # Nothing worked!
194             raise arvados.errors.NotFoundError(
195                 ("Failed to retrieve collection '{}' " +
196                  "from either API server ({}) or Keep ({})."
197                  ).format(
198                     self._manifest_locator,
199                     error_via_api,
200                     error_via_keep))
201         self._streams = [sline.split()
202                          for sline in self._manifest_text.split("\n")
203                          if sline]
204
205     def normalize(self):
206         self._populate()
207
208         # Rearrange streams
209         streams = {}
210         for s in self.all_streams():
211             for f in s.all_files():
212                 filestream = s.name() + "/" + f.name()
213                 r = filestream.rindex("/")
214                 streamname = filestream[:r]
215                 filename = filestream[r+1:]
216                 if streamname not in streams:
217                     streams[streamname] = {}
218                 if filename not in streams[streamname]:
219                     streams[streamname][filename] = []
220                 for r in f.segments:
221                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
222
223         self._streams = []
224         sortedstreams = list(streams.keys())
225         sortedstreams.sort()
226         for s in sortedstreams:
227             self._streams.append(normalize_stream(s, streams[s]))
228
229         # Regenerate the manifest text based on the normalized streams
230         self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
231
232         return self
233
234     def all_streams(self):
235         self._populate()
236         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237                 for s in self._streams]
238
239     def all_files(self):
240         for s in self.all_streams():
241             for f in s.all_files():
242                 yield f
243
244     def manifest_text(self, strip=False):
245         if strip:
246             return self.stripped_manifest()
247         else:
248             self._populate()
249             return self._manifest_text
250
251
252 class CollectionWriter(CollectionBase):
253     KEEP_BLOCK_SIZE = 2**26
254
255     def __init__(self, api_client=None, num_retries=0):
256         """Instantiate a CollectionWriter.
257
258         CollectionWriter lets you build a new Arvados Collection from scratch.
259         Write files to it.  The CollectionWriter will upload data to Keep as
260         appropriate, and provide you with the Collection manifest text when
261         you're finished.
262
263         Arguments:
264         * api_client: The API client to use to look up Collections.  If not
265           provided, CollectionReader will build one from available Arvados
266           configuration.
267         * num_retries: The default number of times to retry failed
268           service requests.  Default 0.  You may change this value
269           after instantiation, but note those changes may not
270           propagate to related objects like the Keep client.
271         """
272         self._api_client = api_client
273         self.num_retries = num_retries
274         self._keep_client = None
275         self._data_buffer = []
276         self._data_buffer_len = 0
277         self._current_stream_files = []
278         self._current_stream_length = 0
279         self._current_stream_locators = []
280         self._current_stream_name = '.'
281         self._current_file_name = None
282         self._current_file_pos = 0
283         self._finished_streams = []
284         self._close_file = None
285         self._queued_file = None
286         self._queued_dirents = deque()
287         self._queued_trees = deque()
288
289     def __exit__(self):
290         self.finish()
291
292     def do_queued_work(self):
293         # The work queue consists of three pieces:
294         # * _queued_file: The file object we're currently writing to the
295         #   Collection.
296         # * _queued_dirents: Entries under the current directory
297         #   (_queued_trees[0]) that we want to write or recurse through.
298         #   This may contain files from subdirectories if
299         #   max_manifest_depth == 0 for this directory.
300         # * _queued_trees: Directories that should be written as separate
301         #   streams to the Collection.
302         # This function handles the smallest piece of work currently queued
303         # (current file, then current directory, then next directory) until
304         # no work remains.  The _work_THING methods each do a unit of work on
305         # THING.  _queue_THING methods add a THING to the work queue.
306         while True:
307             if self._queued_file:
308                 self._work_file()
309             elif self._queued_dirents:
310                 self._work_dirents()
311             elif self._queued_trees:
312                 self._work_trees()
313             else:
314                 break
315
316     def _work_file(self):
317         while True:
318             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
319             if not buf:
320                 break
321             self.write(buf)
322         self.finish_current_file()
323         if self._close_file:
324             self._queued_file.close()
325         self._close_file = None
326         self._queued_file = None
327
328     def _work_dirents(self):
329         path, stream_name, max_manifest_depth = self._queued_trees[0]
330         if stream_name != self.current_stream_name():
331             self.start_new_stream(stream_name)
332         while self._queued_dirents:
333             dirent = self._queued_dirents.popleft()
334             target = os.path.join(path, dirent)
335             if os.path.isdir(target):
336                 self._queue_tree(target,
337                                  os.path.join(stream_name, dirent),
338                                  max_manifest_depth - 1)
339             else:
340                 self._queue_file(target, dirent)
341                 break
342         if not self._queued_dirents:
343             self._queued_trees.popleft()
344
345     def _work_trees(self):
346         path, stream_name, max_manifest_depth = self._queued_trees[0]
347         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
348                         else os.listdir)
349         d = make_dirents(path)
350         if len(d) > 0:
351             self._queue_dirents(stream_name, d)
352         else:
353             self._queued_trees.popleft()
354
355     def _queue_file(self, source, filename=None):
356         assert (self._queued_file is None), "tried to queue more than one file"
357         if not hasattr(source, 'read'):
358             source = open(source, 'rb')
359             self._close_file = True
360         else:
361             self._close_file = False
362         if filename is None:
363             filename = os.path.basename(source.name)
364         self.start_new_file(filename)
365         self._queued_file = source
366
367     def _queue_dirents(self, stream_name, dirents):
368         assert (not self._queued_dirents), "tried to queue more than one tree"
369         self._queued_dirents = deque(sorted(dirents))
370
371     def _queue_tree(self, path, stream_name, max_manifest_depth):
372         self._queued_trees.append((path, stream_name, max_manifest_depth))
373
374     def write_file(self, source, filename=None):
375         self._queue_file(source, filename)
376         self.do_queued_work()
377
378     def write_directory_tree(self,
379                              path, stream_name='.', max_manifest_depth=-1):
380         self._queue_tree(path, stream_name, max_manifest_depth)
381         self.do_queued_work()
382
383     def write(self, newdata):
384         if hasattr(newdata, '__iter__'):
385             for s in newdata:
386                 self.write(s)
387             return
388         self._data_buffer.append(newdata)
389         self._data_buffer_len += len(newdata)
390         self._current_stream_length += len(newdata)
391         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
392             self.flush_data()
393
394     def flush_data(self):
395         data_buffer = ''.join(self._data_buffer)
396         if data_buffer:
397             self._current_stream_locators.append(
398                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
399             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
400             self._data_buffer_len = len(self._data_buffer[0])
401
402     def start_new_file(self, newfilename=None):
403         self.finish_current_file()
404         self.set_current_file_name(newfilename)
405
406     def set_current_file_name(self, newfilename):
407         if re.search(r'[\t\n]', newfilename):
408             raise errors.AssertionError(
409                 "Manifest filenames cannot contain whitespace: %s" %
410                 newfilename)
411         self._current_file_name = newfilename
412
413     def current_file_name(self):
414         return self._current_file_name
415
416     def finish_current_file(self):
417         if self._current_file_name == None:
418             if self._current_file_pos == self._current_stream_length:
419                 return
420             raise errors.AssertionError(
421                 "Cannot finish an unnamed file " +
422                 "(%d bytes at offset %d in '%s' stream)" %
423                 (self._current_stream_length - self._current_file_pos,
424                  self._current_file_pos,
425                  self._current_stream_name))
426         self._current_stream_files.append([
427                 self._current_file_pos,
428                 self._current_stream_length - self._current_file_pos,
429                 self._current_file_name])
430         self._current_file_pos = self._current_stream_length
431         self._current_file_name = None
432
433     def start_new_stream(self, newstreamname='.'):
434         self.finish_current_stream()
435         self.set_current_stream_name(newstreamname)
436
437     def set_current_stream_name(self, newstreamname):
438         if re.search(r'[\t\n]', newstreamname):
439             raise errors.AssertionError(
440                 "Manifest stream names cannot contain whitespace")
441         self._current_stream_name = '.' if newstreamname=='' else newstreamname
442
443     def current_stream_name(self):
444         return self._current_stream_name
445
446     def finish_current_stream(self):
447         self.finish_current_file()
448         self.flush_data()
449         if not self._current_stream_files:
450             pass
451         elif self._current_stream_name is None:
452             raise errors.AssertionError(
453                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
454                 (self._current_stream_length, len(self._current_stream_files)))
455         else:
456             if not self._current_stream_locators:
457                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
458             self._finished_streams.append([self._current_stream_name,
459                                            self._current_stream_locators,
460                                            self._current_stream_files])
461         self._current_stream_files = []
462         self._current_stream_length = 0
463         self._current_stream_locators = []
464         self._current_stream_name = None
465         self._current_file_pos = 0
466         self._current_file_name = None
467
468     def finish(self):
469         # Store the manifest in Keep and return its locator.
470         return self._my_keep().put(self.manifest_text())
471
472     def portable_data_hash(self):
473         stripped = self.stripped_manifest()
474         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
475
476     def manifest_text(self):
477         self.finish_current_stream()
478         manifest = ''
479
480         for stream in self._finished_streams:
481             if not re.search(r'^\.(/.*)?$', stream[0]):
482                 manifest += './'
483             manifest += stream[0].replace(' ', '\\040')
484             manifest += ' ' + ' '.join(stream[1])
485             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
486             manifest += "\n"
487
488         if manifest:
489             return manifest
490         else:
491             return ""
492
493     def data_locators(self):
494         ret = []
495         for name, locators, files in self._finished_streams:
496             ret += locators
497         return ret
498
499
500 class ResumableCollectionWriter(CollectionWriter):
501     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
502                    '_current_stream_locators', '_current_stream_name',
503                    '_current_file_name', '_current_file_pos', '_close_file',
504                    '_data_buffer', '_dependencies', '_finished_streams',
505                    '_queued_dirents', '_queued_trees']
506
507     def __init__(self, api_client=None, num_retries=0):
508         self._dependencies = {}
509         super(ResumableCollectionWriter, self).__init__(
510             api_client, num_retries=num_retries)
511
512     @classmethod
513     def from_state(cls, state, *init_args, **init_kwargs):
514         # Try to build a new writer from scratch with the given state.
515         # If the state is not suitable to resume (because files have changed,
516         # been deleted, aren't predictable, etc.), raise a
517         # StaleWriterStateError.  Otherwise, return the initialized writer.
518         # The caller is responsible for calling writer.do_queued_work()
519         # appropriately after it's returned.
520         writer = cls(*init_args, **init_kwargs)
521         for attr_name in cls.STATE_PROPS:
522             attr_value = state[attr_name]
523             attr_class = getattr(writer, attr_name).__class__
524             # Coerce the value into the same type as the initial value, if
525             # needed.
526             if attr_class not in (type(None), attr_value.__class__):
527                 attr_value = attr_class(attr_value)
528             setattr(writer, attr_name, attr_value)
529         # Check dependencies before we try to resume anything.
530         if any(KeepLocator(ls).permission_expired()
531                for ls in writer._current_stream_locators):
532             raise errors.StaleWriterStateError(
533                 "locators include expired permission hint")
534         writer.check_dependencies()
535         if state['_current_file'] is not None:
536             path, pos = state['_current_file']
537             try:
538                 writer._queued_file = open(path, 'rb')
539                 writer._queued_file.seek(pos)
540             except IOError as error:
541                 raise errors.StaleWriterStateError(
542                     "failed to reopen active file {}: {}".format(path, error))
543         return writer
544
545     def check_dependencies(self):
546         for path, orig_stat in self._dependencies.items():
547             if not S_ISREG(orig_stat[ST_MODE]):
548                 raise errors.StaleWriterStateError("{} not file".format(path))
549             try:
550                 now_stat = tuple(os.stat(path))
551             except OSError as error:
552                 raise errors.StaleWriterStateError(
553                     "failed to stat {}: {}".format(path, error))
554             if ((not S_ISREG(now_stat[ST_MODE])) or
555                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
556                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
557                 raise errors.StaleWriterStateError("{} changed".format(path))
558
559     def dump_state(self, copy_func=lambda x: x):
560         state = {attr: copy_func(getattr(self, attr))
561                  for attr in self.STATE_PROPS}
562         if self._queued_file is None:
563             state['_current_file'] = None
564         else:
565             state['_current_file'] = (os.path.realpath(self._queued_file.name),
566                                       self._queued_file.tell())
567         return state
568
569     def _queue_file(self, source, filename=None):
570         try:
571             src_path = os.path.realpath(source)
572         except Exception:
573             raise errors.AssertionError("{} not a file path".format(source))
574         try:
575             path_stat = os.stat(src_path)
576         except OSError as stat_error:
577             path_stat = None
578         super(ResumableCollectionWriter, self)._queue_file(source, filename)
579         fd_stat = os.fstat(self._queued_file.fileno())
580         if not S_ISREG(fd_stat.st_mode):
581             # We won't be able to resume from this cache anyway, so don't
582             # worry about further checks.
583             self._dependencies[source] = tuple(fd_stat)
584         elif path_stat is None:
585             raise errors.AssertionError(
586                 "could not stat {}: {}".format(source, stat_error))
587         elif path_stat.st_ino != fd_stat.st_ino:
588             raise errors.AssertionError(
589                 "{} changed between open and stat calls".format(source))
590         else:
591             self._dependencies[src_path] = tuple(fd_stat)
592
593     def write(self, data):
594         if self._queued_file is None:
595             raise errors.AssertionError(
596                 "resumable writer can't accept unsourced data")
597         return super(ResumableCollectionWriter, self).write(data)