Merge branch 'master' into 3505-virtual-work-dir
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text):
96         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
97             self._manifest_locator = manifest_locator_or_text
98             self._manifest_text = None
99         elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+', manifest_locator_or_text):
100             self._manifest_text = manifest_locator_or_text
101             self._manifest_locator = None
102         else:
103             raise errors.ArgumentError(
104                 "Argument to CollectionReader must be a manifest or a collection UUID")
105         self._streams = None
106
107     def __enter__(self):
108         pass
109
110     def __exit__(self):
111         pass
112
113     def _populate(self):
114         if self._streams != None:
115             return
116         if not self._manifest_text:
117             try:
118                 c = arvados.api('v1').collections().get(
119                     uuid=self._manifest_locator).execute()
120                 self._manifest_text = c['manifest_text']
121             except Exception as e:
122                 _logger.warning("API lookup failed for collection %s (%s: %s)",
123                                 self._manifest_locator, type(e), str(e))
124                 self._manifest_text = Keep.get(self._manifest_locator)
125         self._streams = []
126         for stream_line in self._manifest_text.split("\n"):
127             if stream_line != '':
128                 stream_tokens = stream_line.split()
129                 self._streams += [stream_tokens]
130         self._streams = normalize(self)
131
132         # now regenerate the manifest text based on the normalized stream
133
134         #print "normalizing", self._manifest_text
135         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
136         #print "result", self._manifest_text
137
138
139     def all_streams(self):
140         self._populate()
141         resp = []
142         for s in self._streams:
143             resp.append(StreamReader(s))
144         return resp
145
146     def all_files(self):
147         for s in self.all_streams():
148             for f in s.all_files():
149                 yield f
150
151     def manifest_text(self, strip=False):
152         self._populate()
153         if strip:
154             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
155             return m
156         else:
157             return self._manifest_text
158
159 class CollectionWriter(object):
160     KEEP_BLOCK_SIZE = 2**26
161
162     def __init__(self):
163         self._data_buffer = []
164         self._data_buffer_len = 0
165         self._current_stream_files = []
166         self._current_stream_length = 0
167         self._current_stream_locators = []
168         self._current_stream_name = '.'
169         self._current_file_name = None
170         self._current_file_pos = 0
171         self._finished_streams = []
172         self._close_file = None
173         self._queued_file = None
174         self._queued_dirents = deque()
175         self._queued_trees = deque()
176
177     def __enter__(self):
178         pass
179
180     def __exit__(self):
181         self.finish()
182
183     def do_queued_work(self):
184         # The work queue consists of three pieces:
185         # * _queued_file: The file object we're currently writing to the
186         #   Collection.
187         # * _queued_dirents: Entries under the current directory
188         #   (_queued_trees[0]) that we want to write or recurse through.
189         #   This may contain files from subdirectories if
190         #   max_manifest_depth == 0 for this directory.
191         # * _queued_trees: Directories that should be written as separate
192         #   streams to the Collection.
193         # This function handles the smallest piece of work currently queued
194         # (current file, then current directory, then next directory) until
195         # no work remains.  The _work_THING methods each do a unit of work on
196         # THING.  _queue_THING methods add a THING to the work queue.
197         while True:
198             if self._queued_file:
199                 self._work_file()
200             elif self._queued_dirents:
201                 self._work_dirents()
202             elif self._queued_trees:
203                 self._work_trees()
204             else:
205                 break
206
207     def _work_file(self):
208         while True:
209             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
210             if not buf:
211                 break
212             self.write(buf)
213         self.finish_current_file()
214         if self._close_file:
215             self._queued_file.close()
216         self._close_file = None
217         self._queued_file = None
218
219     def _work_dirents(self):
220         path, stream_name, max_manifest_depth = self._queued_trees[0]
221         if stream_name != self.current_stream_name():
222             self.start_new_stream(stream_name)
223         while self._queued_dirents:
224             dirent = self._queued_dirents.popleft()
225             target = os.path.join(path, dirent)
226             if os.path.isdir(target):
227                 self._queue_tree(target,
228                                  os.path.join(stream_name, dirent),
229                                  max_manifest_depth - 1)
230             else:
231                 self._queue_file(target, dirent)
232                 break
233         if not self._queued_dirents:
234             self._queued_trees.popleft()
235
236     def _work_trees(self):
237         path, stream_name, max_manifest_depth = self._queued_trees[0]
238         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
239                         else os.listdir)
240         d = make_dirents(path)
241         if len(d) > 0:
242             self._queue_dirents(stream_name, d)
243         else:
244             self._queued_trees.popleft()
245
246     def _queue_file(self, source, filename=None):
247         assert (self._queued_file is None), "tried to queue more than one file"
248         if not hasattr(source, 'read'):
249             source = open(source, 'rb')
250             self._close_file = True
251         else:
252             self._close_file = False
253         if filename is None:
254             filename = os.path.basename(source.name)
255         self.start_new_file(filename)
256         self._queued_file = source
257
258     def _queue_dirents(self, stream_name, dirents):
259         assert (not self._queued_dirents), "tried to queue more than one tree"
260         self._queued_dirents = deque(sorted(dirents))
261
262     def _queue_tree(self, path, stream_name, max_manifest_depth):
263         self._queued_trees.append((path, stream_name, max_manifest_depth))
264
265     def write_file(self, source, filename=None):
266         self._queue_file(source, filename)
267         self.do_queued_work()
268
269     def write_directory_tree(self,
270                              path, stream_name='.', max_manifest_depth=-1):
271         self._queue_tree(path, stream_name, max_manifest_depth)
272         self.do_queued_work()
273
274     def write(self, newdata):
275         if hasattr(newdata, '__iter__'):
276             for s in newdata:
277                 self.write(s)
278             return
279         self._data_buffer += [newdata]
280         self._data_buffer_len += len(newdata)
281         self._current_stream_length += len(newdata)
282         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
283             self.flush_data()
284
285     def flush_data(self):
286         data_buffer = ''.join(self._data_buffer)
287         if data_buffer != '':
288             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
289             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
290             self._data_buffer_len = len(self._data_buffer[0])
291
292     def start_new_file(self, newfilename=None):
293         self.finish_current_file()
294         self.set_current_file_name(newfilename)
295
296     def set_current_file_name(self, newfilename):
297         if re.search(r'[\t\n]', newfilename):
298             raise errors.AssertionError(
299                 "Manifest filenames cannot contain whitespace: %s" %
300                 newfilename)
301         self._current_file_name = newfilename
302
303     def current_file_name(self):
304         return self._current_file_name
305
306     def finish_current_file(self):
307         if self._current_file_name == None:
308             if self._current_file_pos == self._current_stream_length:
309                 return
310             raise errors.AssertionError(
311                 "Cannot finish an unnamed file " +
312                 "(%d bytes at offset %d in '%s' stream)" %
313                 (self._current_stream_length - self._current_file_pos,
314                  self._current_file_pos,
315                  self._current_stream_name))
316         self._current_stream_files += [[self._current_file_pos,
317                                        self._current_stream_length - self._current_file_pos,
318                                        self._current_file_name]]
319         self._current_file_pos = self._current_stream_length
320
321     def start_new_stream(self, newstreamname='.'):
322         self.finish_current_stream()
323         self.set_current_stream_name(newstreamname)
324
325     def set_current_stream_name(self, newstreamname):
326         if re.search(r'[\t\n]', newstreamname):
327             raise errors.AssertionError(
328                 "Manifest stream names cannot contain whitespace")
329         self._current_stream_name = '.' if newstreamname=='' else newstreamname
330
331     def current_stream_name(self):
332         return self._current_stream_name
333
334     def finish_current_stream(self):
335         self.finish_current_file()
336         self.flush_data()
337         if len(self._current_stream_files) == 0:
338             pass
339         elif self._current_stream_name == None:
340             raise errors.AssertionError(
341                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
342                 (self._current_stream_length, len(self._current_stream_files)))
343         else:
344             if len(self._current_stream_locators) == 0:
345                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
346             self._finished_streams += [[self._current_stream_name,
347                                        self._current_stream_locators,
348                                        self._current_stream_files]]
349         self._current_stream_files = []
350         self._current_stream_length = 0
351         self._current_stream_locators = []
352         self._current_stream_name = None
353         self._current_file_pos = 0
354         self._current_file_name = None
355
356     def finish(self):
357         # Send the stripped manifest to Keep, to ensure that we use the
358         # same UUID regardless of what hints are used on the collection.
359         return Keep.put(self.stripped_manifest())
360
361     def stripped_manifest(self):
362         """
363         Return the manifest for the current collection with all permission
364         hints removed from the locators in the manifest.
365         """
366         raw = self.manifest_text()
367         clean = ''
368         for line in raw.split("\n"):
369             fields = line.split()
370             if len(fields) > 0:
371                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
372                              for x in fields[1:-1] ]
373                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
374         return clean
375
376     def manifest_text(self):
377         self.finish_current_stream()
378         manifest = ''
379
380         for stream in self._finished_streams:
381             if not re.search(r'^\.(/.*)?$', stream[0]):
382                 manifest += './'
383             manifest += stream[0].replace(' ', '\\040')
384             manifest += ' ' + ' '.join(stream[1])
385             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
386             manifest += "\n"
387
388         if len(manifest) > 0:
389             return CollectionReader(manifest).manifest_text()
390         else:
391             return ""
392
393     def data_locators(self):
394         ret = []
395         for name, locators, files in self._finished_streams:
396             ret += locators
397         return ret
398
399
400 class ResumableCollectionWriter(CollectionWriter):
401     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
402                    '_current_stream_locators', '_current_stream_name',
403                    '_current_file_name', '_current_file_pos', '_close_file',
404                    '_data_buffer', '_dependencies', '_finished_streams',
405                    '_queued_dirents', '_queued_trees']
406
407     def __init__(self):
408         self._dependencies = {}
409         super(ResumableCollectionWriter, self).__init__()
410
411     @classmethod
412     def from_state(cls, state, *init_args, **init_kwargs):
413         # Try to build a new writer from scratch with the given state.
414         # If the state is not suitable to resume (because files have changed,
415         # been deleted, aren't predictable, etc.), raise a
416         # StaleWriterStateError.  Otherwise, return the initialized writer.
417         # The caller is responsible for calling writer.do_queued_work()
418         # appropriately after it's returned.
419         writer = cls(*init_args, **init_kwargs)
420         for attr_name in cls.STATE_PROPS:
421             attr_value = state[attr_name]
422             attr_class = getattr(writer, attr_name).__class__
423             # Coerce the value into the same type as the initial value, if
424             # needed.
425             if attr_class not in (type(None), attr_value.__class__):
426                 attr_value = attr_class(attr_value)
427             setattr(writer, attr_name, attr_value)
428         # Check dependencies before we try to resume anything.
429         if any(KeepLocator(ls).permission_expired()
430                for ls in writer._current_stream_locators):
431             raise errors.StaleWriterStateError(
432                 "locators include expired permission hint")
433         writer.check_dependencies()
434         if state['_current_file'] is not None:
435             path, pos = state['_current_file']
436             try:
437                 writer._queued_file = open(path, 'rb')
438                 writer._queued_file.seek(pos)
439             except IOError as error:
440                 raise errors.StaleWriterStateError(
441                     "failed to reopen active file {}: {}".format(path, error))
442         return writer
443
444     def check_dependencies(self):
445         for path, orig_stat in self._dependencies.items():
446             if not S_ISREG(orig_stat[ST_MODE]):
447                 raise errors.StaleWriterStateError("{} not file".format(path))
448             try:
449                 now_stat = tuple(os.stat(path))
450             except OSError as error:
451                 raise errors.StaleWriterStateError(
452                     "failed to stat {}: {}".format(path, error))
453             if ((not S_ISREG(now_stat[ST_MODE])) or
454                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
455                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
456                 raise errors.StaleWriterStateError("{} changed".format(path))
457
458     def dump_state(self, copy_func=lambda x: x):
459         state = {attr: copy_func(getattr(self, attr))
460                  for attr in self.STATE_PROPS}
461         if self._queued_file is None:
462             state['_current_file'] = None
463         else:
464             state['_current_file'] = (os.path.realpath(self._queued_file.name),
465                                       self._queued_file.tell())
466         return state
467
468     def _queue_file(self, source, filename=None):
469         try:
470             src_path = os.path.realpath(source)
471         except Exception:
472             raise errors.AssertionError("{} not a file path".format(source))
473         try:
474             path_stat = os.stat(src_path)
475         except OSError as stat_error:
476             path_stat = None
477         super(ResumableCollectionWriter, self)._queue_file(source, filename)
478         fd_stat = os.fstat(self._queued_file.fileno())
479         if not S_ISREG(fd_stat.st_mode):
480             # We won't be able to resume from this cache anyway, so don't
481             # worry about further checks.
482             self._dependencies[source] = tuple(fd_stat)
483         elif path_stat is None:
484             raise errors.AssertionError(
485                 "could not stat {}: {}".format(source, stat_error))
486         elif path_stat.st_ino != fd_stat.st_ino:
487             raise errors.AssertionError(
488                 "{} changed between open and stat calls".format(source))
489         else:
490             self._dependencies[src_path] = tuple(fd_stat)
491
492     def write(self, data):
493         if self._queued_file is None:
494             raise errors.AssertionError(
495                 "resumable writer can't accept unsourced data")
496         return super(ResumableCollectionWriter, self).write(data)