Merge branch 'master' into 2919-provenance-graph-cutoff
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     for f in sortedfiles:
45         current_span = None
46         fout = f.replace(' ', '\\040')
47         for segment in stream[f]:
48             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
49             if current_span == None:
50                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
51             else:
52                 if segmentoffset == current_span[1]:
53                     current_span[1] += segment[arvados.SEGMENTSIZE]
54                 else:
55                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
56                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
57
58         if current_span != None:
59             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
60
61         if len(stream[f]) == 0:
62             stream_tokens.append("0:0:{0}".format(fout))
63
64     return stream_tokens
65
66 def normalize(collection):
67     streams = {}
68     for s in collection.all_streams():
69         for f in s.all_files():
70             filestream = s.name() + "/" + f.name()
71             r = filestream.rindex("/")
72             streamname = filestream[:r]
73             filename = filestream[r+1:]
74             if streamname not in streams:
75                 streams[streamname] = {}
76             if filename not in streams[streamname]:
77                 streams[streamname][filename] = []
78             for r in f.segments:
79                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
80
81     normalized_streams = []
82     sortedstreams = list(streams.keys())
83     sortedstreams.sort()
84     for s in sortedstreams:
85         normalized_streams.append(normalize_stream(s, streams[s]))
86     return normalized_streams
87
88
89 class CollectionReader(object):
90     def __init__(self, manifest_locator_or_text):
91         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
92             self._manifest_locator = manifest_locator_or_text
93             self._manifest_text = None
94         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
95             self._manifest_text = manifest_locator_or_text
96             self._manifest_locator = None
97         else:
98             raise errors.ArgumentError(
99                 "Argument to CollectionReader must be a manifest or a collection UUID")
100         self._streams = None
101
102     def __enter__(self):
103         pass
104
105     def __exit__(self):
106         pass
107
108     def _populate(self):
109         if self._streams != None:
110             return
111         if not self._manifest_text:
112             try:
113                 c = arvados.api('v1').collections().get(
114                     uuid=self._manifest_locator).execute()
115                 self._manifest_text = c['manifest_text']
116             except Exception as e:
117                 logging.warning("API lookup failed for collection %s (%s: %s)" %
118                                 (self._manifest_locator, type(e), str(e)))
119                 self._manifest_text = Keep.get(self._manifest_locator)
120         self._streams = []
121         for stream_line in self._manifest_text.split("\n"):
122             if stream_line != '':
123                 stream_tokens = stream_line.split()
124                 self._streams += [stream_tokens]
125         self._streams = normalize(self)
126
127         # now regenerate the manifest text based on the normalized stream
128
129         #print "normalizing", self._manifest_text
130         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
131         #print "result", self._manifest_text
132
133
134     def all_streams(self):
135         self._populate()
136         resp = []
137         for s in self._streams:
138             resp.append(StreamReader(s))
139         return resp
140
141     def all_files(self):
142         for s in self.all_streams():
143             for f in s.all_files():
144                 yield f
145
146     def manifest_text(self):
147         self._populate()
148         return self._manifest_text
149
150 class CollectionWriter(object):
151     KEEP_BLOCK_SIZE = 2**26
152
153     def __init__(self):
154         self._data_buffer = []
155         self._data_buffer_len = 0
156         self._current_stream_files = []
157         self._current_stream_length = 0
158         self._current_stream_locators = []
159         self._current_stream_name = '.'
160         self._current_file_name = None
161         self._current_file_pos = 0
162         self._finished_streams = []
163         self._close_file = None
164         self._queued_file = None
165         self._queued_dirents = deque()
166         self._queued_trees = deque()
167
168     def __enter__(self):
169         pass
170
171     def __exit__(self):
172         self.finish()
173
174     def do_queued_work(self):
175         # The work queue consists of three pieces:
176         # * _queued_file: The file object we're currently writing to the
177         #   Collection.
178         # * _queued_dirents: Entries under the current directory
179         #   (_queued_trees[0]) that we want to write or recurse through.
180         #   This may contain files from subdirectories if
181         #   max_manifest_depth == 0 for this directory.
182         # * _queued_trees: Directories that should be written as separate
183         #   streams to the Collection.
184         # This function handles the smallest piece of work currently queued
185         # (current file, then current directory, then next directory) until
186         # no work remains.  The _work_THING methods each do a unit of work on
187         # THING.  _queue_THING methods add a THING to the work queue.
188         while True:
189             if self._queued_file:
190                 self._work_file()
191             elif self._queued_dirents:
192                 self._work_dirents()
193             elif self._queued_trees:
194                 self._work_trees()
195             else:
196                 break
197
198     def _work_file(self):
199         while True:
200             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
201             if not buf:
202                 break
203             self.write(buf)
204         self.finish_current_file()
205         if self._close_file:
206             self._queued_file.close()
207         self._close_file = None
208         self._queued_file = None
209
210     def _work_dirents(self):
211         path, stream_name, max_manifest_depth = self._queued_trees[0]
212         if stream_name != self.current_stream_name():
213             self.start_new_stream(stream_name)
214         while self._queued_dirents:
215             dirent = self._queued_dirents.popleft()
216             target = os.path.join(path, dirent)
217             if os.path.isdir(target):
218                 self._queue_tree(target,
219                                  os.path.join(stream_name, dirent),
220                                  max_manifest_depth - 1)
221             else:
222                 self._queue_file(target, dirent)
223                 break
224         if not self._queued_dirents:
225             self._queued_trees.popleft()
226
227     def _work_trees(self):
228         path, stream_name, max_manifest_depth = self._queued_trees[0]
229         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
230                         else os.listdir)
231         self._queue_dirents(stream_name, make_dirents(path))
232
233     def _queue_file(self, source, filename=None):
234         assert (self._queued_file is None), "tried to queue more than one file"
235         if not hasattr(source, 'read'):
236             source = open(source, 'rb')
237             self._close_file = True
238         else:
239             self._close_file = False
240         if filename is None:
241             filename = os.path.basename(source.name)
242         self.start_new_file(filename)
243         self._queued_file = source
244
245     def _queue_dirents(self, stream_name, dirents):
246         assert (not self._queued_dirents), "tried to queue more than one tree"
247         self._queued_dirents = deque(sorted(dirents))
248
249     def _queue_tree(self, path, stream_name, max_manifest_depth):
250         self._queued_trees.append((path, stream_name, max_manifest_depth))
251
252     def write_file(self, source, filename=None):
253         self._queue_file(source, filename)
254         self.do_queued_work()
255
256     def write_directory_tree(self,
257                              path, stream_name='.', max_manifest_depth=-1):
258         self._queue_tree(path, stream_name, max_manifest_depth)
259         self.do_queued_work()
260
261     def write(self, newdata):
262         if hasattr(newdata, '__iter__'):
263             for s in newdata:
264                 self.write(s)
265             return
266         self._data_buffer += [newdata]
267         self._data_buffer_len += len(newdata)
268         self._current_stream_length += len(newdata)
269         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
270             self.flush_data()
271
272     def flush_data(self):
273         data_buffer = ''.join(self._data_buffer)
274         if data_buffer != '':
275             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
276             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
277             self._data_buffer_len = len(self._data_buffer[0])
278
279     def start_new_file(self, newfilename=None):
280         self.finish_current_file()
281         self.set_current_file_name(newfilename)
282
283     def set_current_file_name(self, newfilename):
284         if re.search(r'[\t\n]', newfilename):
285             raise errors.AssertionError(
286                 "Manifest filenames cannot contain whitespace: %s" %
287                 newfilename)
288         self._current_file_name = newfilename
289
290     def current_file_name(self):
291         return self._current_file_name
292
293     def finish_current_file(self):
294         if self._current_file_name == None:
295             if self._current_file_pos == self._current_stream_length:
296                 return
297             raise errors.AssertionError(
298                 "Cannot finish an unnamed file " +
299                 "(%d bytes at offset %d in '%s' stream)" %
300                 (self._current_stream_length - self._current_file_pos,
301                  self._current_file_pos,
302                  self._current_stream_name))
303         self._current_stream_files += [[self._current_file_pos,
304                                        self._current_stream_length - self._current_file_pos,
305                                        self._current_file_name]]
306         self._current_file_pos = self._current_stream_length
307
308     def start_new_stream(self, newstreamname='.'):
309         self.finish_current_stream()
310         self.set_current_stream_name(newstreamname)
311
312     def set_current_stream_name(self, newstreamname):
313         if re.search(r'[\t\n]', newstreamname):
314             raise errors.AssertionError(
315                 "Manifest stream names cannot contain whitespace")
316         self._current_stream_name = '.' if newstreamname=='' else newstreamname
317
318     def current_stream_name(self):
319         return self._current_stream_name
320
321     def finish_current_stream(self):
322         self.finish_current_file()
323         self.flush_data()
324         if len(self._current_stream_files) == 0:
325             pass
326         elif self._current_stream_name == None:
327             raise errors.AssertionError(
328                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
329                 (self._current_stream_length, len(self._current_stream_files)))
330         else:
331             if len(self._current_stream_locators) == 0:
332                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
333             self._finished_streams += [[self._current_stream_name,
334                                        self._current_stream_locators,
335                                        self._current_stream_files]]
336         self._current_stream_files = []
337         self._current_stream_length = 0
338         self._current_stream_locators = []
339         self._current_stream_name = None
340         self._current_file_pos = 0
341         self._current_file_name = None
342
343     def finish(self):
344         return Keep.put(self.manifest_text())
345
346     def manifest_text(self):
347         self.finish_current_stream()
348         manifest = ''
349
350         for stream in self._finished_streams:
351             if not re.search(r'^\.(/.*)?$', stream[0]):
352                 manifest += './'
353             manifest += stream[0].replace(' ', '\\040')
354             manifest += ' ' + ' '.join(stream[1])
355             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
356             manifest += "\n"
357
358         #print 'writer',manifest
359         #print 'after reader',CollectionReader(manifest).manifest_text()
360
361         return CollectionReader(manifest).manifest_text()
362
363     def data_locators(self):
364         ret = []
365         for name, locators, files in self._finished_streams:
366             ret += locators
367         return ret
368
369
370 class ResumableCollectionWriter(CollectionWriter):
371     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
372                    '_current_stream_locators', '_current_stream_name',
373                    '_current_file_name', '_current_file_pos', '_close_file',
374                    '_data_buffer', '_dependencies', '_finished_streams',
375                    '_queued_dirents', '_queued_trees']
376
377     def __init__(self):
378         self._dependencies = {}
379         super(ResumableCollectionWriter, self).__init__()
380
381     @classmethod
382     def from_state(cls, state, *init_args, **init_kwargs):
383         # Try to build a new writer from scratch with the given state.
384         # If the state is not suitable to resume (because files have changed,
385         # been deleted, aren't predictable, etc.), raise a
386         # StaleWriterStateError.  Otherwise, return the initialized writer.
387         # The caller is responsible for calling writer.do_queued_work()
388         # appropriately after it's returned.
389         writer = cls(*init_args, **init_kwargs)
390         for attr_name in cls.STATE_PROPS:
391             attr_value = state[attr_name]
392             attr_class = getattr(writer, attr_name).__class__
393             # Coerce the value into the same type as the initial value, if
394             # needed.
395             if attr_class not in (type(None), attr_value.__class__):
396                 attr_value = attr_class(attr_value)
397             setattr(writer, attr_name, attr_value)
398         # Check dependencies before we try to resume anything.
399         if any(KeepLocator(ls).permission_expired()
400                for ls in writer._current_stream_locators):
401             raise errors.StaleWriterStateError(
402                 "locators include expired permission hint")
403         writer.check_dependencies()
404         if state['_current_file'] is not None:
405             path, pos = state['_current_file']
406             try:
407                 writer._queued_file = open(path, 'rb')
408                 writer._queued_file.seek(pos)
409             except IOError as error:
410                 raise errors.StaleWriterStateError(
411                     "failed to reopen active file {}: {}".format(path, error))
412         return writer
413
414     def check_dependencies(self):
415         for path, orig_stat in self._dependencies.items():
416             if not S_ISREG(orig_stat[ST_MODE]):
417                 raise errors.StaleWriterStateError("{} not file".format(path))
418             try:
419                 now_stat = tuple(os.stat(path))
420             except OSError as error:
421                 raise errors.StaleWriterStateError(
422                     "failed to stat {}: {}".format(path, error))
423             if ((not S_ISREG(now_stat[ST_MODE])) or
424                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
425                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
426                 raise errors.StaleWriterStateError("{} changed".format(path))
427
428     def dump_state(self, copy_func=lambda x: x):
429         state = {attr: copy_func(getattr(self, attr))
430                  for attr in self.STATE_PROPS}
431         if self._queued_file is None:
432             state['_current_file'] = None
433         else:
434             state['_current_file'] = (os.path.realpath(self._queued_file.name),
435                                       self._queued_file.tell())
436         return state
437
438     def _queue_file(self, source, filename=None):
439         try:
440             src_path = os.path.realpath(source)
441         except Exception:
442             raise errors.AssertionError("{} not a file path".format(source))
443         try:
444             path_stat = os.stat(src_path)
445         except OSError as stat_error:
446             path_stat = None
447         super(ResumableCollectionWriter, self)._queue_file(source, filename)
448         fd_stat = os.fstat(self._queued_file.fileno())
449         if not S_ISREG(fd_stat.st_mode):
450             # We won't be able to resume from this cache anyway, so don't
451             # worry about further checks.
452             self._dependencies[source] = tuple(fd_stat)
453         elif path_stat is None:
454             raise errors.AssertionError(
455                 "could not stat {}: {}".format(source, stat_error))
456         elif path_stat.st_ino != fd_stat.st_ino:
457             raise errors.AssertionError(
458                 "{} changed between open and stat calls".format(source))
459         else:
460             self._dependencies[src_path] = tuple(fd_stat)
461
462     def write(self, data):
463         if self._queued_file is None:
464             raise errors.AssertionError(
465                 "resumable writer can't accept unsourced data")
466         return super(ResumableCollectionWriter, self).write(data)