Merge branch 'master' into 2919-provenance-graph-cutoff
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     for f in sortedfiles:
45         current_span = None
46         fout = f.replace(' ', '\\040')
47         for segment in stream[f]:
48             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
49             if current_span == None:
50                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
51             else:
52                 if segmentoffset == current_span[1]:
53                     current_span[1] += segment[arvados.SEGMENTSIZE]
54                 else:
55                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57
58         if current_span != None:
59             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
60
61         if len(stream[f]) == 0:
62             stream_tokens.append("0:0:{0}".format(fout))
63
64     return stream_tokens
65
66 def normalize(collection):
67     streams = {}
68     for s in collection.all_streams():
69         for f in s.all_files():
70             filestream = s.name() + "/" + f.name()
71             r = filestream.rindex("/")
72             streamname = filestream[:r]
73             filename = filestream[r+1:]
74             if streamname not in streams:
75                 streams[streamname] = {}
76             if filename not in streams[streamname]:
77                 streams[streamname][filename] = []
78             for r in f.segments:
79                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
80
81     normalized_streams = []
82     sortedstreams = list(streams.keys())
83     sortedstreams.sort()
84     for s in sortedstreams:
85         normalized_streams.append(normalize_stream(s, streams[s]))
86     return normalized_streams
87
88
89 class CollectionReader(object):
90     def __init__(self, manifest_locator_or_text):
91         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
92             self._manifest_locator = manifest_locator_or_text
93             self._manifest_text = None
94         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
95             self._manifest_text = manifest_locator_or_text
96             self._manifest_locator = None
97         else:
98             raise errors.ArgumentError(
99                 "Argument to CollectionReader must be a manifest or a collection UUID")
100         self._streams = None
101
102     def __enter__(self):
103         pass
104
105     def __exit__(self):
106         pass
107
108     def _populate(self):
109         if self._streams != None:
110             return
111         if not self._manifest_text:
112             try:
113                 c = arvados.api('v1').collections().get(
114                     uuid=self._manifest_locator).execute()
115                 self._manifest_text = c['manifest_text']
116             except Exception as e:
117                 logging.warning("API lookup failed for collection %s (%s: %s)" %
118                                 (self._manifest_locator, type(e), str(e)))
119                 self._manifest_text = Keep.get(self._manifest_locator)
120         self._streams = []
121         for stream_line in self._manifest_text.split("\n"):
122             if stream_line != '':
123                 stream_tokens = stream_line.split()
124                 self._streams += [stream_tokens]
125         self._streams = normalize(self)
126
127         # now regenerate the manifest text based on the normalized stream
128
129         #print "normalizing", self._manifest_text
130         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
131         #print "result", self._manifest_text
132
133
134     def all_streams(self):
135         self._populate()
136         resp = []
137         for s in self._streams:
138             resp.append(StreamReader(s))
139         return resp
140
141     def all_files(self):
142         for s in self.all_streams():
143             for f in s.all_files():
144                 yield f
145
146     def manifest_text(self):
147         self._populate()
148         return self._manifest_text
149
150 class CollectionWriter(object):
151     KEEP_BLOCK_SIZE = 2**26
152
153     def __init__(self):
154         self._data_buffer = []
155         self._data_buffer_len = 0
156         self._current_stream_files = []
157         self._current_stream_length = 0
158         self._current_stream_locators = []
159         self._current_stream_name = '.'
160         self._current_file_name = None
161         self._current_file_pos = 0
162         self._finished_streams = []
163         self._close_file = None
164         self._queued_file = None
165         self._queued_dirents = deque()
166         self._queued_trees = deque()
167
168     def __enter__(self):
169         pass
170
171     def __exit__(self):
172         self.finish()
173
174     def do_queued_work(self):
175         # The work queue consists of three pieces:
176         # * _queued_file: The file object we're currently writing to the
177         #   Collection.
178         # * _queued_dirents: Entries under the current directory
179         #   (_queued_trees[0]) that we want to write or recurse through.
180         #   This may contain files from subdirectories if
181         #   max_manifest_depth == 0 for this directory.
182         # * _queued_trees: Directories that should be written as separate
183         #   streams to the Collection.
184         # This function handles the smallest piece of work currently queued
185         # (current file, then current directory, then next directory) until
186         # no work remains.  The _work_THING methods each do a unit of work on
187         # THING.  _queue_THING methods add a THING to the work queue.
188         while True:
189             if self._queued_file:
190                 self._work_file()
191             elif self._queued_dirents:
192                 self._work_dirents()
193             elif self._queued_trees:
194                 self._work_trees()
195             else:
196                 break
197
198     def _work_file(self):
199         while True:
200             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
201             if not buf:
202                 break
203             self.write(buf)
204         self.finish_current_file()
205         if self._close_file:
206             self._queued_file.close()
207         self._close_file = None
208         self._queued_file = None
209
210     def _work_dirents(self):
211         path, stream_name, max_manifest_depth = self._queued_trees[0]
212         if stream_name != self.current_stream_name():
213             self.start_new_stream(stream_name)
214         while self._queued_dirents:
215             dirent = self._queued_dirents.popleft()
216             target = os.path.join(path, dirent)
217             if os.path.isdir(target):
218                 self._queue_tree(target,
219                                  os.path.join(stream_name, dirent),
220                                  max_manifest_depth - 1)
221             else:
222                 self._queue_file(target, dirent)
223                 break
224         if not self._queued_dirents:
225             self._queued_trees.popleft()
226
227     def _work_trees(self):
228         path, stream_name, max_manifest_depth = self._queued_trees[0]
229         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
230                         else os.listdir)
231         self._queue_dirents(stream_name, make_dirents(path))
232
233     def _queue_file(self, source, filename=None):
234         assert (self._queued_file is None), "tried to queue more than one file"
235         if not hasattr(source, 'read'):
236             source = open(source, 'rb')
237             self._close_file = True
238         else:
239             self._close_file = False
240         if filename is None:
241             filename = os.path.basename(source.name)
242         self.start_new_file(filename)
243         self._queued_file = source
244
245     def _queue_dirents(self, stream_name, dirents):
246         assert (not self._queued_dirents), "tried to queue more than one tree"
247         self._queued_dirents = deque(sorted(dirents))
248
249     def _queue_tree(self, path, stream_name, max_manifest_depth):
250         self._queued_trees.append((path, stream_name, max_manifest_depth))
251
252     def write_file(self, source, filename=None):
253         self._queue_file(source, filename)
254         self.do_queued_work()
255
256     def write_directory_tree(self,
257                              path, stream_name='.', max_manifest_depth=-1):
258         self._queue_tree(path, stream_name, max_manifest_depth)
259         self.do_queued_work()
260
261     def write(self, newdata):
262         if hasattr(newdata, '__iter__'):
263             for s in newdata:
264                 self.write(s)
265             return
266         self._data_buffer += [newdata]
267         self._data_buffer_len += len(newdata)
268         self._current_stream_length += len(newdata)
269         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
270             self.flush_data()
271
272     def flush_data(self):
273         data_buffer = ''.join(self._data_buffer)
274         if data_buffer != '':
275             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
276             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
277             self._data_buffer_len = len(self._data_buffer[0])
278
279     def start_new_file(self, newfilename=None):
280         self.finish_current_file()
281         self.set_current_file_name(newfilename)
282
283     def set_current_file_name(self, newfilename):
284         if re.search(r'[\t\n]', newfilename):
285             raise errors.AssertionError(
286                 "Manifest filenames cannot contain whitespace: %s" %
287                 newfilename)
288         self._current_file_name = newfilename
289
290     def current_file_name(self):
291         return self._current_file_name
292
293     def finish_current_file(self):
294         if self._current_file_name == None:
295             if self._current_file_pos == self._current_stream_length:
296                 return
297             raise errors.AssertionError(
298                 "Cannot finish an unnamed file " +
299                 "(%d bytes at offset %d in '%s' stream)" %
300                 (self._current_stream_length - self._current_file_pos,
301                  self._current_file_pos,
302                  self._current_stream_name))
303         self._current_stream_files += [[self._current_file_pos,
304                                        self._current_stream_length - self._current_file_pos,
305                                        self._current_file_name]]
306         self._current_file_pos = self._current_stream_length
307
308     def start_new_stream(self, newstreamname='.'):
309         self.finish_current_stream()
310         self.set_current_stream_name(newstreamname)
311
312     def set_current_stream_name(self, newstreamname):
313         if re.search(r'[\t\n]', newstreamname):
314             raise errors.AssertionError(
315                 "Manifest stream names cannot contain whitespace")
316         self._current_stream_name = '.' if newstreamname=='' else newstreamname
317
318     def current_stream_name(self):
319         return self._current_stream_name
320
321     def finish_current_stream(self):
322         self.finish_current_file()
323         self.flush_data()
324         if len(self._current_stream_files) == 0:
325             pass
326         elif self._current_stream_name == None:
327             raise errors.AssertionError(
328                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
329                 (self._current_stream_length, len(self._current_stream_files)))
330         else:
331             if len(self._current_stream_locators) == 0:
332                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
333             self._finished_streams += [[self._current_stream_name,
334                                        self._current_stream_locators,
335                                        self._current_stream_files]]
336         self._current_stream_files = []
337         self._current_stream_length = 0
338         self._current_stream_locators = []
339         self._current_stream_name = None
340         self._current_file_pos = 0
341         self._current_file_name = None
342
343     def finish(self):
344         # Send the stripped manifest to Keep, to ensure that we use the
345         # same UUID regardless of what hints are used on the collection.
346         return Keep.put(self.stripped_manifest())
347
348     def stripped_manifest(self):
349         """
350         Return the manifest for the current collection with all permission
351         hints removed from the locators in the manifest.
352         """
353         raw = self.manifest_text()
354         clean = ''
355         for line in raw.split("\n"):
356             fields = line.split()
357             if len(fields) > 0:
358                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
359                              for x in fields[1:-1] ]
360                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
361         return clean
362         
363     def manifest_text(self):
364         self.finish_current_stream()
365         manifest = ''
366
367         for stream in self._finished_streams:
368             if not re.search(r'^\.(/.*)?$', stream[0]):
369                 manifest += './'
370             manifest += stream[0].replace(' ', '\\040')
371             manifest += ' ' + ' '.join(stream[1])
372             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
373             manifest += "\n"
374
375         #print 'writer',manifest
376         #print 'after reader',CollectionReader(manifest).manifest_text()
377
378         return CollectionReader(manifest).manifest_text()
379
380     def data_locators(self):
381         ret = []
382         for name, locators, files in self._finished_streams:
383             ret += locators
384         return ret
385
386
387 class ResumableCollectionWriter(CollectionWriter):
388     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
389                    '_current_stream_locators', '_current_stream_name',
390                    '_current_file_name', '_current_file_pos', '_close_file',
391                    '_data_buffer', '_dependencies', '_finished_streams',
392                    '_queued_dirents', '_queued_trees']
393
394     def __init__(self):
395         self._dependencies = {}
396         super(ResumableCollectionWriter, self).__init__()
397
398     @classmethod
399     def from_state(cls, state, *init_args, **init_kwargs):
400         # Try to build a new writer from scratch with the given state.
401         # If the state is not suitable to resume (because files have changed,
402         # been deleted, aren't predictable, etc.), raise a
403         # StaleWriterStateError.  Otherwise, return the initialized writer.
404         # The caller is responsible for calling writer.do_queued_work()
405         # appropriately after it's returned.
406         writer = cls(*init_args, **init_kwargs)
407         for attr_name in cls.STATE_PROPS:
408             attr_value = state[attr_name]
409             attr_class = getattr(writer, attr_name).__class__
410             # Coerce the value into the same type as the initial value, if
411             # needed.
412             if attr_class not in (type(None), attr_value.__class__):
413                 attr_value = attr_class(attr_value)
414             setattr(writer, attr_name, attr_value)
415         # Check dependencies before we try to resume anything.
416         if any(KeepLocator(ls).permission_expired()
417                for ls in writer._current_stream_locators):
418             raise errors.StaleWriterStateError(
419                 "locators include expired permission hint")
420         writer.check_dependencies()
421         if state['_current_file'] is not None:
422             path, pos = state['_current_file']
423             try:
424                 writer._queued_file = open(path, 'rb')
425                 writer._queued_file.seek(pos)
426             except IOError as error:
427                 raise errors.StaleWriterStateError(
428                     "failed to reopen active file {}: {}".format(path, error))
429         return writer
430
431     def check_dependencies(self):
432         for path, orig_stat in self._dependencies.items():
433             if not S_ISREG(orig_stat[ST_MODE]):
434                 raise errors.StaleWriterStateError("{} not file".format(path))
435             try:
436                 now_stat = tuple(os.stat(path))
437             except OSError as error:
438                 raise errors.StaleWriterStateError(
439                     "failed to stat {}: {}".format(path, error))
440             if ((not S_ISREG(now_stat[ST_MODE])) or
441                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
442                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
443                 raise errors.StaleWriterStateError("{} changed".format(path))
444
445     def dump_state(self, copy_func=lambda x: x):
446         state = {attr: copy_func(getattr(self, attr))
447                  for attr in self.STATE_PROPS}
448         if self._queued_file is None:
449             state['_current_file'] = None
450         else:
451             state['_current_file'] = (os.path.realpath(self._queued_file.name),
452                                       self._queued_file.tell())
453         return state
454
455     def _queue_file(self, source, filename=None):
456         try:
457             src_path = os.path.realpath(source)
458         except Exception:
459             raise errors.AssertionError("{} not a file path".format(source))
460         try:
461             path_stat = os.stat(src_path)
462         except OSError as stat_error:
463             path_stat = None
464         super(ResumableCollectionWriter, self)._queue_file(source, filename)
465         fd_stat = os.fstat(self._queued_file.fileno())
466         if not S_ISREG(fd_stat.st_mode):
467             # We won't be able to resume from this cache anyway, so don't
468             # worry about further checks.
469             self._dependencies[source] = tuple(fd_stat)
470         elif path_stat is None:
471             raise errors.AssertionError(
472                 "could not stat {}: {}".format(source, stat_error))
473         elif path_stat.st_ino != fd_stat.st_ino:
474             raise errors.AssertionError(
475                 "{} changed between open and stat calls".format(source))
476         else:
477             self._dependencies[src_path] = tuple(fd_stat)
478
479     def write(self, data):
480         if self._queued_file is None:
481             raise errors.AssertionError(
482                 "resumable writer can't accept unsourced data")
483         return super(ResumableCollectionWriter, self).write(data)