3706: Prefer join() to many string concatenations.
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if not stream[f]:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71
72 class CollectionBase(object):
73     def __enter__(self):
74         pass
75
76     def __exit__(self):
77         pass
78
79     def _my_keep(self):
80         if self._keep_client is None:
81             self._keep_client = KeepClient(api_client=self._api_client,
82                                            num_retries=self.num_retries)
83         return self._keep_client
84
85     def stripped_manifest(self):
86         """
87         Return the manifest for the current collection with all
88         non-portable hints (i.e., permission signatures and other
89         hints other than size hints) removed from the locators.
90         """
91         raw = self.manifest_text()
92         clean = []
93         for line in raw.split("\n"):
94             fields = line.split()
95             if fields:
96                 clean_fields = fields[:1] + [
97                     (re.sub(r'\+[^\d][^\+]*', '', x)
98                      if re.match(util.keep_locator_pattern, x)
99                      else x)
100                     for x in fields[1:]]
101                 clean += [' '.join(clean_fields), "\n"]
102         return ''.join(clean)
103
104
105 class CollectionReader(CollectionBase):
106     def __init__(self, manifest_locator_or_text, api_client=None,
107                  keep_client=None, num_retries=0):
108         """Instantiate a CollectionReader.
109
110         This class parses Collection manifests to provide a simple interface
111         to read its underlying files.
112
113         Arguments:
114         * manifest_locator_or_text: One of a Collection UUID, portable data
115           hash, or full manifest text.
116         * api_client: The API client to use to look up Collections.  If not
117           provided, CollectionReader will build one from available Arvados
118           configuration.
119         * keep_client: The KeepClient to use to download Collection data.
120           If not provided, CollectionReader will build one from available
121           Arvados configuration.
122         * num_retries: The default number of times to retry failed
123           service requests.  Default 0.  You may change this value
124           after instantiation, but note those changes may not
125           propagate to related objects like the Keep client.
126         """
127         self._api_client = api_client
128         self._keep_client = keep_client
129         self.num_retries = num_retries
130         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
131             self._manifest_locator = manifest_locator_or_text
132             self._manifest_text = None
133         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
134             self._manifest_locator = manifest_locator_or_text
135             self._manifest_text = None
136         elif re.match(util.manifest_pattern, manifest_locator_or_text):
137             self._manifest_text = manifest_locator_or_text
138             self._manifest_locator = None
139         else:
140             raise errors.ArgumentError(
141                 "Argument to CollectionReader must be a manifest or a collection UUID")
142         self._streams = None
143
144     def _populate_from_api_server(self):
145         # As in KeepClient itself, we must wait until the last
146         # possible moment to instantiate an API client, in order to
147         # avoid tripping up clients that don't have access to an API
148         # server.  If we do build one, make sure our Keep client uses
149         # it.  If instantiation fails, we'll fall back to the except
150         # clause, just like any other Collection lookup
151         # failure. Return an exception, or None if successful.
152         try:
153             if self._api_client is None:
154                 self._api_client = arvados.api('v1')
155                 self._keep_client = None  # Make a new one with the new api.
156             c = self._api_client.collections().get(
157                 uuid=self._manifest_locator).execute(
158                 num_retries=self.num_retries)
159             self._manifest_text = c['manifest_text']
160             return None
161         except Exception as e:
162             return e
163
164     def _populate_from_keep(self):
165         # Retrieve a manifest directly from Keep. This has a chance of
166         # working if [a] the locator includes a permission signature
167         # or [b] the Keep services are operating in world-readable
168         # mode. Return an exception, or None if successful.
169         try:
170             self._manifest_text = self._my_keep().get(
171                 self._manifest_locator, num_retries=self.num_retries)
172         except Exception as e:
173             return e
174
175     def _populate(self):
176         if self._streams is not None:
177             return
178         error_via_api = None
179         error_via_keep = None
180         should_try_keep = (not self._manifest_text and
181                            util.keep_locator_pattern.match(
182                 self._manifest_locator))
183         if (not self._manifest_text and
184             util.signed_locator_pattern.match(self._manifest_locator)):
185             error_via_keep = self._populate_from_keep()
186         if not self._manifest_text:
187             error_via_api = self._populate_from_api_server()
188             if error_via_api != None and not should_try_keep:
189                 raise error_via_api
190         if (not self._manifest_text and
191             not error_via_keep and
192             should_try_keep):
193             # Looks like a keep locator, and we didn't already try keep above
194             error_via_keep = self._populate_from_keep()
195         if not self._manifest_text:
196             # Nothing worked!
197             raise arvados.errors.NotFoundError(
198                 ("Failed to retrieve collection '{}' " +
199                  "from either API server ({}) or Keep ({})."
200                  ).format(
201                     self._manifest_locator,
202                     error_via_api,
203                     error_via_keep))
204         self._streams = [sline.split()
205                          for sline in self._manifest_text.split("\n")
206                          if sline]
207
208     def normalize(self):
209         self._populate()
210
211         # Rearrange streams
212         streams = {}
213         for s in self.all_streams():
214             for f in s.all_files():
215                 filestream = s.name() + "/" + f.name()
216                 r = filestream.rindex("/")
217                 streamname = filestream[:r]
218                 filename = filestream[r+1:]
219                 if streamname not in streams:
220                     streams[streamname] = {}
221                 if filename not in streams[streamname]:
222                     streams[streamname][filename] = []
223                 for r in f.segments:
224                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
225
226         self._streams = [normalize_stream(s, streams[s])
227                          for s in sorted(streams)]
228
229         # Regenerate the manifest text based on the normalized streams
230         self._manifest_text = ''.join(
231             [StreamReader(stream, keep=self._my_keep()).manifest_text()
232              for stream in self._streams])
233
234     def all_streams(self):
235         self._populate()
236         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237                 for s in self._streams]
238
239     def all_files(self):
240         for s in self.all_streams():
241             for f in s.all_files():
242                 yield f
243
244     def manifest_text(self, strip=False, normalize=False):
245         if normalize:
246             cr = CollectionReader(self.manifest_text())
247             cr.normalize()
248             return cr.manifest_text(strip=strip, normalize=False)
249         elif strip:
250             return self.stripped_manifest()
251         else:
252             self._populate()
253             return self._manifest_text
254
255
256 class CollectionWriter(CollectionBase):
257     KEEP_BLOCK_SIZE = 2**26
258
259     def __init__(self, api_client=None, num_retries=0):
260         """Instantiate a CollectionWriter.
261
262         CollectionWriter lets you build a new Arvados Collection from scratch.
263         Write files to it.  The CollectionWriter will upload data to Keep as
264         appropriate, and provide you with the Collection manifest text when
265         you're finished.
266
267         Arguments:
268         * api_client: The API client to use to look up Collections.  If not
269           provided, CollectionReader will build one from available Arvados
270           configuration.
271         * num_retries: The default number of times to retry failed
272           service requests.  Default 0.  You may change this value
273           after instantiation, but note those changes may not
274           propagate to related objects like the Keep client.
275         """
276         self._api_client = api_client
277         self.num_retries = num_retries
278         self._keep_client = None
279         self._data_buffer = []
280         self._data_buffer_len = 0
281         self._current_stream_files = []
282         self._current_stream_length = 0
283         self._current_stream_locators = []
284         self._current_stream_name = '.'
285         self._current_file_name = None
286         self._current_file_pos = 0
287         self._finished_streams = []
288         self._close_file = None
289         self._queued_file = None
290         self._queued_dirents = deque()
291         self._queued_trees = deque()
292
293     def __exit__(self):
294         self.finish()
295
296     def do_queued_work(self):
297         # The work queue consists of three pieces:
298         # * _queued_file: The file object we're currently writing to the
299         #   Collection.
300         # * _queued_dirents: Entries under the current directory
301         #   (_queued_trees[0]) that we want to write or recurse through.
302         #   This may contain files from subdirectories if
303         #   max_manifest_depth == 0 for this directory.
304         # * _queued_trees: Directories that should be written as separate
305         #   streams to the Collection.
306         # This function handles the smallest piece of work currently queued
307         # (current file, then current directory, then next directory) until
308         # no work remains.  The _work_THING methods each do a unit of work on
309         # THING.  _queue_THING methods add a THING to the work queue.
310         while True:
311             if self._queued_file:
312                 self._work_file()
313             elif self._queued_dirents:
314                 self._work_dirents()
315             elif self._queued_trees:
316                 self._work_trees()
317             else:
318                 break
319
320     def _work_file(self):
321         while True:
322             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
323             if not buf:
324                 break
325             self.write(buf)
326         self.finish_current_file()
327         if self._close_file:
328             self._queued_file.close()
329         self._close_file = None
330         self._queued_file = None
331
332     def _work_dirents(self):
333         path, stream_name, max_manifest_depth = self._queued_trees[0]
334         if stream_name != self.current_stream_name():
335             self.start_new_stream(stream_name)
336         while self._queued_dirents:
337             dirent = self._queued_dirents.popleft()
338             target = os.path.join(path, dirent)
339             if os.path.isdir(target):
340                 self._queue_tree(target,
341                                  os.path.join(stream_name, dirent),
342                                  max_manifest_depth - 1)
343             else:
344                 self._queue_file(target, dirent)
345                 break
346         if not self._queued_dirents:
347             self._queued_trees.popleft()
348
349     def _work_trees(self):
350         path, stream_name, max_manifest_depth = self._queued_trees[0]
351         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
352                         else os.listdir)
353         d = make_dirents(path)
354         if d:
355             self._queue_dirents(stream_name, d)
356         else:
357             self._queued_trees.popleft()
358
359     def _queue_file(self, source, filename=None):
360         assert (self._queued_file is None), "tried to queue more than one file"
361         if not hasattr(source, 'read'):
362             source = open(source, 'rb')
363             self._close_file = True
364         else:
365             self._close_file = False
366         if filename is None:
367             filename = os.path.basename(source.name)
368         self.start_new_file(filename)
369         self._queued_file = source
370
371     def _queue_dirents(self, stream_name, dirents):
372         assert (not self._queued_dirents), "tried to queue more than one tree"
373         self._queued_dirents = deque(sorted(dirents))
374
375     def _queue_tree(self, path, stream_name, max_manifest_depth):
376         self._queued_trees.append((path, stream_name, max_manifest_depth))
377
378     def write_file(self, source, filename=None):
379         self._queue_file(source, filename)
380         self.do_queued_work()
381
382     def write_directory_tree(self,
383                              path, stream_name='.', max_manifest_depth=-1):
384         self._queue_tree(path, stream_name, max_manifest_depth)
385         self.do_queued_work()
386
387     def write(self, newdata):
388         if hasattr(newdata, '__iter__'):
389             for s in newdata:
390                 self.write(s)
391             return
392         self._data_buffer.append(newdata)
393         self._data_buffer_len += len(newdata)
394         self._current_stream_length += len(newdata)
395         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
396             self.flush_data()
397
398     def flush_data(self):
399         data_buffer = ''.join(self._data_buffer)
400         if data_buffer:
401             self._current_stream_locators.append(
402                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
403             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
404             self._data_buffer_len = len(self._data_buffer[0])
405
406     def start_new_file(self, newfilename=None):
407         self.finish_current_file()
408         self.set_current_file_name(newfilename)
409
410     def set_current_file_name(self, newfilename):
411         if re.search(r'[\t\n]', newfilename):
412             raise errors.AssertionError(
413                 "Manifest filenames cannot contain whitespace: %s" %
414                 newfilename)
415         self._current_file_name = newfilename
416
417     def current_file_name(self):
418         return self._current_file_name
419
420     def finish_current_file(self):
421         if self._current_file_name == None:
422             if self._current_file_pos == self._current_stream_length:
423                 return
424             raise errors.AssertionError(
425                 "Cannot finish an unnamed file " +
426                 "(%d bytes at offset %d in '%s' stream)" %
427                 (self._current_stream_length - self._current_file_pos,
428                  self._current_file_pos,
429                  self._current_stream_name))
430         self._current_stream_files.append([
431                 self._current_file_pos,
432                 self._current_stream_length - self._current_file_pos,
433                 self._current_file_name])
434         self._current_file_pos = self._current_stream_length
435         self._current_file_name = None
436
437     def start_new_stream(self, newstreamname='.'):
438         self.finish_current_stream()
439         self.set_current_stream_name(newstreamname)
440
441     def set_current_stream_name(self, newstreamname):
442         if re.search(r'[\t\n]', newstreamname):
443             raise errors.AssertionError(
444                 "Manifest stream names cannot contain whitespace")
445         self._current_stream_name = '.' if newstreamname=='' else newstreamname
446
447     def current_stream_name(self):
448         return self._current_stream_name
449
450     def finish_current_stream(self):
451         self.finish_current_file()
452         self.flush_data()
453         if not self._current_stream_files:
454             pass
455         elif self._current_stream_name is None:
456             raise errors.AssertionError(
457                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
458                 (self._current_stream_length, len(self._current_stream_files)))
459         else:
460             if not self._current_stream_locators:
461                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
462             self._finished_streams.append([self._current_stream_name,
463                                            self._current_stream_locators,
464                                            self._current_stream_files])
465         self._current_stream_files = []
466         self._current_stream_length = 0
467         self._current_stream_locators = []
468         self._current_stream_name = None
469         self._current_file_pos = 0
470         self._current_file_name = None
471
472     def finish(self):
473         # Store the manifest in Keep and return its locator.
474         return self._my_keep().put(self.manifest_text())
475
476     def portable_data_hash(self):
477         stripped = self.stripped_manifest()
478         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
479
480     def manifest_text(self):
481         self.finish_current_stream()
482         manifest = ''
483
484         for stream in self._finished_streams:
485             if not re.search(r'^\.(/.*)?$', stream[0]):
486                 manifest += './'
487             manifest += stream[0].replace(' ', '\\040')
488             manifest += ' ' + ' '.join(stream[1])
489             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
490             manifest += "\n"
491
492         if manifest:
493             return manifest
494         else:
495             return ""
496
497     def data_locators(self):
498         ret = []
499         for name, locators, files in self._finished_streams:
500             ret += locators
501         return ret
502
503
504 class ResumableCollectionWriter(CollectionWriter):
505     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
506                    '_current_stream_locators', '_current_stream_name',
507                    '_current_file_name', '_current_file_pos', '_close_file',
508                    '_data_buffer', '_dependencies', '_finished_streams',
509                    '_queued_dirents', '_queued_trees']
510
511     def __init__(self, api_client=None, num_retries=0):
512         self._dependencies = {}
513         super(ResumableCollectionWriter, self).__init__(
514             api_client, num_retries=num_retries)
515
516     @classmethod
517     def from_state(cls, state, *init_args, **init_kwargs):
518         # Try to build a new writer from scratch with the given state.
519         # If the state is not suitable to resume (because files have changed,
520         # been deleted, aren't predictable, etc.), raise a
521         # StaleWriterStateError.  Otherwise, return the initialized writer.
522         # The caller is responsible for calling writer.do_queued_work()
523         # appropriately after it's returned.
524         writer = cls(*init_args, **init_kwargs)
525         for attr_name in cls.STATE_PROPS:
526             attr_value = state[attr_name]
527             attr_class = getattr(writer, attr_name).__class__
528             # Coerce the value into the same type as the initial value, if
529             # needed.
530             if attr_class not in (type(None), attr_value.__class__):
531                 attr_value = attr_class(attr_value)
532             setattr(writer, attr_name, attr_value)
533         # Check dependencies before we try to resume anything.
534         if any(KeepLocator(ls).permission_expired()
535                for ls in writer._current_stream_locators):
536             raise errors.StaleWriterStateError(
537                 "locators include expired permission hint")
538         writer.check_dependencies()
539         if state['_current_file'] is not None:
540             path, pos = state['_current_file']
541             try:
542                 writer._queued_file = open(path, 'rb')
543                 writer._queued_file.seek(pos)
544             except IOError as error:
545                 raise errors.StaleWriterStateError(
546                     "failed to reopen active file {}: {}".format(path, error))
547         return writer
548
549     def check_dependencies(self):
550         for path, orig_stat in self._dependencies.items():
551             if not S_ISREG(orig_stat[ST_MODE]):
552                 raise errors.StaleWriterStateError("{} not file".format(path))
553             try:
554                 now_stat = tuple(os.stat(path))
555             except OSError as error:
556                 raise errors.StaleWriterStateError(
557                     "failed to stat {}: {}".format(path, error))
558             if ((not S_ISREG(now_stat[ST_MODE])) or
559                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
560                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
561                 raise errors.StaleWriterStateError("{} changed".format(path))
562
563     def dump_state(self, copy_func=lambda x: x):
564         state = {attr: copy_func(getattr(self, attr))
565                  for attr in self.STATE_PROPS}
566         if self._queued_file is None:
567             state['_current_file'] = None
568         else:
569             state['_current_file'] = (os.path.realpath(self._queued_file.name),
570                                       self._queued_file.tell())
571         return state
572
573     def _queue_file(self, source, filename=None):
574         try:
575             src_path = os.path.realpath(source)
576         except Exception:
577             raise errors.AssertionError("{} not a file path".format(source))
578         try:
579             path_stat = os.stat(src_path)
580         except OSError as stat_error:
581             path_stat = None
582         super(ResumableCollectionWriter, self)._queue_file(source, filename)
583         fd_stat = os.fstat(self._queued_file.fileno())
584         if not S_ISREG(fd_stat.st_mode):
585             # We won't be able to resume from this cache anyway, so don't
586             # worry about further checks.
587             self._dependencies[source] = tuple(fd_stat)
588         elif path_stat is None:
589             raise errors.AssertionError(
590                 "could not stat {}: {}".format(source, stat_error))
591         elif path_stat.st_ino != fd_stat.st_ino:
592             raise errors.AssertionError(
593                 "{} changed between open and stat calls".format(source))
594         else:
595             self._dependencies[src_path] = tuple(fd_stat)
596
597     def write(self, data):
598         if self._queued_file is None:
599             raise errors.AssertionError(
600                 "resumable writer can't accept unsourced data")
601         return super(ResumableCollectionWriter, self).write(data)