Merge branch 'master' into 3112-report-bug. Also, break search testing into a separat...
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text):
96         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
97             self._manifest_locator = manifest_locator_or_text
98             self._manifest_text = None
99         elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
100             self._manifest_text = manifest_locator_or_text
101             self._manifest_locator = None
102         else:
103             raise errors.ArgumentError(
104                 "Argument to CollectionReader must be a manifest or a collection UUID")
105         self._streams = None
106
107     def __enter__(self):
108         pass
109
110     def __exit__(self):
111         pass
112
113     def _populate(self):
114         if self._streams != None:
115             return
116         if not self._manifest_text:
117             try:
118                 c = arvados.api('v1').collections().get(
119                     uuid=self._manifest_locator).execute()
120                 self._manifest_text = c['manifest_text']
121             except Exception as e:
122                 _logger.warning("API lookup failed for collection %s (%s: %s)",
123                                 self._manifest_locator, type(e), str(e))
124                 self._manifest_text = Keep.get(self._manifest_locator)
125         self._streams = []
126         for stream_line in self._manifest_text.split("\n"):
127             if stream_line != '':
128                 stream_tokens = stream_line.split()
129                 self._streams += [stream_tokens]
130         self._streams = normalize(self)
131
132         # now regenerate the manifest text based on the normalized stream
133
134         #print "normalizing", self._manifest_text
135         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
136         #print "result", self._manifest_text
137
138
139     def all_streams(self):
140         self._populate()
141         resp = []
142         for s in self._streams:
143             resp.append(StreamReader(s))
144         return resp
145
146     def all_files(self):
147         for s in self.all_streams():
148             for f in s.all_files():
149                 yield f
150
151     def manifest_text(self, strip=False):
152         self._populate()
153         if strip:
154             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
155             return m
156         else:
157             return self._manifest_text
158
159 class CollectionWriter(object):
160     KEEP_BLOCK_SIZE = 2**26
161
162     def __init__(self):
163         self._data_buffer = []
164         self._data_buffer_len = 0
165         self._current_stream_files = []
166         self._current_stream_length = 0
167         self._current_stream_locators = []
168         self._current_stream_name = '.'
169         self._current_file_name = None
170         self._current_file_pos = 0
171         self._finished_streams = []
172         self._close_file = None
173         self._queued_file = None
174         self._queued_dirents = deque()
175         self._queued_trees = deque()
176
177     def __enter__(self):
178         pass
179
180     def __exit__(self):
181         self.finish()
182
183     def do_queued_work(self):
184         # The work queue consists of three pieces:
185         # * _queued_file: The file object we're currently writing to the
186         #   Collection.
187         # * _queued_dirents: Entries under the current directory
188         #   (_queued_trees[0]) that we want to write or recurse through.
189         #   This may contain files from subdirectories if
190         #   max_manifest_depth == 0 for this directory.
191         # * _queued_trees: Directories that should be written as separate
192         #   streams to the Collection.
193         # This function handles the smallest piece of work currently queued
194         # (current file, then current directory, then next directory) until
195         # no work remains.  The _work_THING methods each do a unit of work on
196         # THING.  _queue_THING methods add a THING to the work queue.
197         while True:
198             if self._queued_file:
199                 self._work_file()
200             elif self._queued_dirents:
201                 self._work_dirents()
202             elif self._queued_trees:
203                 self._work_trees()
204             else:
205                 break
206
207     def _work_file(self):
208         while True:
209             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
210             if not buf:
211                 break
212             self.write(buf)
213         self.finish_current_file()
214         if self._close_file:
215             self._queued_file.close()
216         self._close_file = None
217         self._queued_file = None
218
219     def _work_dirents(self):
220         path, stream_name, max_manifest_depth = self._queued_trees[0]
221         if stream_name != self.current_stream_name():
222             self.start_new_stream(stream_name)
223         while self._queued_dirents:
224             dirent = self._queued_dirents.popleft()
225             target = os.path.join(path, dirent)
226             if os.path.isdir(target):
227                 self._queue_tree(target,
228                                  os.path.join(stream_name, dirent),
229                                  max_manifest_depth - 1)
230             else:
231                 self._queue_file(target, dirent)
232                 break
233         if not self._queued_dirents:
234             self._queued_trees.popleft()
235
236     def _work_trees(self):
237         path, stream_name, max_manifest_depth = self._queued_trees[0]
238         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
239                         else os.listdir)
240         d = make_dirents(path)
241         if len(d) > 0:
242             self._queue_dirents(stream_name, d)
243         else:
244             self._queued_trees.popleft()
245
246     def _queue_file(self, source, filename=None):
247         assert (self._queued_file is None), "tried to queue more than one file"
248         if not hasattr(source, 'read'):
249             source = open(source, 'rb')
250             self._close_file = True
251         else:
252             self._close_file = False
253         if filename is None:
254             filename = os.path.basename(source.name)
255         self.start_new_file(filename)
256         self._queued_file = source
257
258     def _queue_dirents(self, stream_name, dirents):
259         assert (not self._queued_dirents), "tried to queue more than one tree"
260         self._queued_dirents = deque(sorted(dirents))
261
262     def _queue_tree(self, path, stream_name, max_manifest_depth):
263         self._queued_trees.append((path, stream_name, max_manifest_depth))
264
265     def write_file(self, source, filename=None):
266         self._queue_file(source, filename)
267         self.do_queued_work()
268
269     def write_directory_tree(self,
270                              path, stream_name='.', max_manifest_depth=-1):
271         self._queue_tree(path, stream_name, max_manifest_depth)
272         self.do_queued_work()
273
274     def write(self, newdata):
275         if hasattr(newdata, '__iter__'):
276             for s in newdata:
277                 self.write(s)
278             return
279         self._data_buffer += [newdata]
280         self._data_buffer_len += len(newdata)
281         self._current_stream_length += len(newdata)
282         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
283             self.flush_data()
284
285     def flush_data(self):
286         data_buffer = ''.join(self._data_buffer)
287         if data_buffer != '':
288             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
289             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
290             self._data_buffer_len = len(self._data_buffer[0])
291
292     def start_new_file(self, newfilename=None):
293         self.finish_current_file()
294         self.set_current_file_name(newfilename)
295
296     def set_current_file_name(self, newfilename):
297         if re.search(r'[\t\n]', newfilename):
298             raise errors.AssertionError(
299                 "Manifest filenames cannot contain whitespace: %s" %
300                 newfilename)
301         self._current_file_name = newfilename
302
303     def current_file_name(self):
304         return self._current_file_name
305
306     def finish_current_file(self):
307         if self._current_file_name == None:
308             if self._current_file_pos == self._current_stream_length:
309                 return
310             raise errors.AssertionError(
311                 "Cannot finish an unnamed file " +
312                 "(%d bytes at offset %d in '%s' stream)" %
313                 (self._current_stream_length - self._current_file_pos,
314                  self._current_file_pos,
315                  self._current_stream_name))
316         self._current_stream_files += [[self._current_file_pos,
317                                         self._current_stream_length - self._current_file_pos,
318                                         self._current_file_name]]
319         self._current_file_pos = self._current_stream_length
320
321     def start_new_stream(self, newstreamname='.'):
322         self.finish_current_stream()
323         self.set_current_stream_name(newstreamname)
324
325     def set_current_stream_name(self, newstreamname):
326         if re.search(r'[\t\n]', newstreamname):
327             raise errors.AssertionError(
328                 "Manifest stream names cannot contain whitespace")
329         self._current_stream_name = '.' if newstreamname=='' else newstreamname
330
331     def current_stream_name(self):
332         return self._current_stream_name
333
334     def finish_current_stream(self):
335         self.finish_current_file()
336         self.flush_data()
337         if len(self._current_stream_files) == 0:
338             pass
339         elif self._current_stream_name == None:
340             raise errors.AssertionError(
341                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
342                 (self._current_stream_length, len(self._current_stream_files)))
343         else:
344             if len(self._current_stream_locators) == 0:
345                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
346             self._finished_streams += [[self._current_stream_name,
347                                         self._current_stream_locators,
348                                         self._current_stream_files]]
349         self._current_stream_files = []
350         self._current_stream_length = 0
351         self._current_stream_locators = []
352         self._current_stream_name = None
353         self._current_file_pos = 0
354         self._current_file_name = None
355
356     def finish(self):
357         # Store the manifest in Keep and return its locator.
358         return Keep.put(self.manifest_text())
359
360     def stripped_manifest(self):
361         """
362         Return the manifest for the current collection with all permission
363         hints removed from the locators in the manifest.
364         """
365         raw = self.manifest_text()
366         clean = ''
367         for line in raw.split("\n"):
368             fields = line.split()
369             if len(fields) > 0:
370                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
371                              for x in fields[1:-1] ]
372                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
373         return clean
374
375     def manifest_text(self):
376         self.finish_current_stream()
377         manifest = ''
378
379         for stream in self._finished_streams:
380             if not re.search(r'^\.(/.*)?$', stream[0]):
381                 manifest += './'
382             manifest += stream[0].replace(' ', '\\040')
383             manifest += ' ' + ' '.join(stream[1])
384             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
385             manifest += "\n"
386
387         if len(manifest) > 0:
388             return CollectionReader(manifest).manifest_text()
389         else:
390             return ""
391
392     def data_locators(self):
393         ret = []
394         for name, locators, files in self._finished_streams:
395             ret += locators
396         return ret
397
398
399 class ResumableCollectionWriter(CollectionWriter):
400     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
401                    '_current_stream_locators', '_current_stream_name',
402                    '_current_file_name', '_current_file_pos', '_close_file',
403                    '_data_buffer', '_dependencies', '_finished_streams',
404                    '_queued_dirents', '_queued_trees']
405
406     def __init__(self):
407         self._dependencies = {}
408         super(ResumableCollectionWriter, self).__init__()
409
410     @classmethod
411     def from_state(cls, state, *init_args, **init_kwargs):
412         # Try to build a new writer from scratch with the given state.
413         # If the state is not suitable to resume (because files have changed,
414         # been deleted, aren't predictable, etc.), raise a
415         # StaleWriterStateError.  Otherwise, return the initialized writer.
416         # The caller is responsible for calling writer.do_queued_work()
417         # appropriately after it's returned.
418         writer = cls(*init_args, **init_kwargs)
419         for attr_name in cls.STATE_PROPS:
420             attr_value = state[attr_name]
421             attr_class = getattr(writer, attr_name).__class__
422             # Coerce the value into the same type as the initial value, if
423             # needed.
424             if attr_class not in (type(None), attr_value.__class__):
425                 attr_value = attr_class(attr_value)
426             setattr(writer, attr_name, attr_value)
427         # Check dependencies before we try to resume anything.
428         if any(KeepLocator(ls).permission_expired()
429                for ls in writer._current_stream_locators):
430             raise errors.StaleWriterStateError(
431                 "locators include expired permission hint")
432         writer.check_dependencies()
433         if state['_current_file'] is not None:
434             path, pos = state['_current_file']
435             try:
436                 writer._queued_file = open(path, 'rb')
437                 writer._queued_file.seek(pos)
438             except IOError as error:
439                 raise errors.StaleWriterStateError(
440                     "failed to reopen active file {}: {}".format(path, error))
441         return writer
442
443     def check_dependencies(self):
444         for path, orig_stat in self._dependencies.items():
445             if not S_ISREG(orig_stat[ST_MODE]):
446                 raise errors.StaleWriterStateError("{} not file".format(path))
447             try:
448                 now_stat = tuple(os.stat(path))
449             except OSError as error:
450                 raise errors.StaleWriterStateError(
451                     "failed to stat {}: {}".format(path, error))
452             if ((not S_ISREG(now_stat[ST_MODE])) or
453                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
454                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
455                 raise errors.StaleWriterStateError("{} changed".format(path))
456
457     def dump_state(self, copy_func=lambda x: x):
458         state = {attr: copy_func(getattr(self, attr))
459                  for attr in self.STATE_PROPS}
460         if self._queued_file is None:
461             state['_current_file'] = None
462         else:
463             state['_current_file'] = (os.path.realpath(self._queued_file.name),
464                                       self._queued_file.tell())
465         return state
466
467     def _queue_file(self, source, filename=None):
468         try:
469             src_path = os.path.realpath(source)
470         except Exception:
471             raise errors.AssertionError("{} not a file path".format(source))
472         try:
473             path_stat = os.stat(src_path)
474         except OSError as stat_error:
475             path_stat = None
476         super(ResumableCollectionWriter, self)._queue_file(source, filename)
477         fd_stat = os.fstat(self._queued_file.fileno())
478         if not S_ISREG(fd_stat.st_mode):
479             # We won't be able to resume from this cache anyway, so don't
480             # worry about further checks.
481             self._dependencies[source] = tuple(fd_stat)
482         elif path_stat is None:
483             raise errors.AssertionError(
484                 "could not stat {}: {}".format(source, stat_error))
485         elif path_stat.st_ino != fd_stat.st_ino:
486             raise errors.AssertionError(
487                 "{} changed between open and stat calls".format(source))
488         else:
489             self._dependencies[src_path] = tuple(fd_stat)
490
491     def write(self, data):
492         if self._queued_file is None:
493             raise errors.AssertionError(
494                 "resumable writer can't accept unsourced data")
495         return super(ResumableCollectionWriter, self).write(data)