Merge branch 'master' into 2659-anonymous-server-side
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     if len(stream_tokens) == 1:
45         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
46
47     for f in sortedfiles:
48         current_span = None
49         fout = f.replace(' ', '\\040')
50         for segment in stream[f]:
51             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
52             if current_span == None:
53                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
54             else:
55                 if segmentoffset == current_span[1]:
56                     current_span[1] += segment[arvados.SEGMENTSIZE]
57                 else:
58                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
60
61         if current_span != None:
62             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
63
64         if len(stream[f]) == 0:
65             stream_tokens.append("0:0:{0}".format(fout))
66
67     return stream_tokens
68
69 def normalize(collection):
70     streams = {}
71     for s in collection.all_streams():
72         for f in s.all_files():
73             filestream = s.name() + "/" + f.name()
74             r = filestream.rindex("/")
75             streamname = filestream[:r]
76             filename = filestream[r+1:]
77             if streamname not in streams:
78                 streams[streamname] = {}
79             if filename not in streams[streamname]:
80                 streams[streamname][filename] = []
81             for r in f.segments:
82                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
83
84     normalized_streams = []
85     sortedstreams = list(streams.keys())
86     sortedstreams.sort()
87     for s in sortedstreams:
88         normalized_streams.append(normalize_stream(s, streams[s]))
89     return normalized_streams
90
91
92 class CollectionReader(object):
93     def __init__(self, manifest_locator_or_text):
94         if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
95             self._manifest_locator = manifest_locator_or_text
96             self._manifest_text = None
97         elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
98             self._manifest_text = manifest_locator_or_text
99             self._manifest_locator = None
100         else:
101             raise errors.ArgumentError(
102                 "Argument to CollectionReader must be a manifest or a collection UUID")
103         self._streams = None
104
105     def __enter__(self):
106         pass
107
108     def __exit__(self):
109         pass
110
111     def _populate(self):
112         if self._streams != None:
113             return
114         if not self._manifest_text:
115             try:
116                 c = arvados.api('v1').collections().get(
117                     uuid=self._manifest_locator).execute()
118                 self._manifest_text = c['manifest_text']
119             except Exception as e:
120                 logging.warning("API lookup failed for collection %s (%s: %s)" %
121                                 (self._manifest_locator, type(e), str(e)))
122                 self._manifest_text = Keep.get(self._manifest_locator)
123         self._streams = []
124         for stream_line in self._manifest_text.split("\n"):
125             if stream_line != '':
126                 stream_tokens = stream_line.split()
127                 self._streams += [stream_tokens]
128         self._streams = normalize(self)
129
130         # now regenerate the manifest text based on the normalized stream
131
132         #print "normalizing", self._manifest_text
133         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
134         #print "result", self._manifest_text
135
136
137     def all_streams(self):
138         self._populate()
139         resp = []
140         for s in self._streams:
141             resp.append(StreamReader(s))
142         return resp
143
144     def all_files(self):
145         for s in self.all_streams():
146             for f in s.all_files():
147                 yield f
148
149     def manifest_text(self, strip=False):
150         self._populate()
151         if strip:
152             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
153             return m
154         else:
155             return self._manifest_text
156
157 class CollectionWriter(object):
158     KEEP_BLOCK_SIZE = 2**26
159
160     def __init__(self):
161         self._data_buffer = []
162         self._data_buffer_len = 0
163         self._current_stream_files = []
164         self._current_stream_length = 0
165         self._current_stream_locators = []
166         self._current_stream_name = '.'
167         self._current_file_name = None
168         self._current_file_pos = 0
169         self._finished_streams = []
170         self._close_file = None
171         self._queued_file = None
172         self._queued_dirents = deque()
173         self._queued_trees = deque()
174
175     def __enter__(self):
176         pass
177
178     def __exit__(self):
179         self.finish()
180
181     def do_queued_work(self):
182         # The work queue consists of three pieces:
183         # * _queued_file: The file object we're currently writing to the
184         #   Collection.
185         # * _queued_dirents: Entries under the current directory
186         #   (_queued_trees[0]) that we want to write or recurse through.
187         #   This may contain files from subdirectories if
188         #   max_manifest_depth == 0 for this directory.
189         # * _queued_trees: Directories that should be written as separate
190         #   streams to the Collection.
191         # This function handles the smallest piece of work currently queued
192         # (current file, then current directory, then next directory) until
193         # no work remains.  The _work_THING methods each do a unit of work on
194         # THING.  _queue_THING methods add a THING to the work queue.
195         while True:
196             if self._queued_file:
197                 self._work_file()
198             elif self._queued_dirents:
199                 self._work_dirents()
200             elif self._queued_trees:
201                 self._work_trees()
202             else:
203                 break
204
205     def _work_file(self):
206         while True:
207             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
208             if not buf:
209                 break
210             self.write(buf)
211         self.finish_current_file()
212         if self._close_file:
213             self._queued_file.close()
214         self._close_file = None
215         self._queued_file = None
216
217     def _work_dirents(self):
218         path, stream_name, max_manifest_depth = self._queued_trees[0]
219         if stream_name != self.current_stream_name():
220             self.start_new_stream(stream_name)
221         while self._queued_dirents:
222             dirent = self._queued_dirents.popleft()
223             target = os.path.join(path, dirent)
224             if os.path.isdir(target):
225                 self._queue_tree(target,
226                                  os.path.join(stream_name, dirent),
227                                  max_manifest_depth - 1)
228             else:
229                 self._queue_file(target, dirent)
230                 break
231         if not self._queued_dirents:
232             self._queued_trees.popleft()
233
234     def _work_trees(self):
235         path, stream_name, max_manifest_depth = self._queued_trees[0]
236         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
237                         else os.listdir)
238         d = make_dirents(path)
239         if len(d) > 0:
240             self._queue_dirents(stream_name, d)
241         else:
242             self._queued_trees.popleft()
243
244     def _queue_file(self, source, filename=None):
245         assert (self._queued_file is None), "tried to queue more than one file"
246         if not hasattr(source, 'read'):
247             source = open(source, 'rb')
248             self._close_file = True
249         else:
250             self._close_file = False
251         if filename is None:
252             filename = os.path.basename(source.name)
253         self.start_new_file(filename)
254         self._queued_file = source
255
256     def _queue_dirents(self, stream_name, dirents):
257         assert (not self._queued_dirents), "tried to queue more than one tree"
258         self._queued_dirents = deque(sorted(dirents))
259
260     def _queue_tree(self, path, stream_name, max_manifest_depth):
261         self._queued_trees.append((path, stream_name, max_manifest_depth))
262
263     def write_file(self, source, filename=None):
264         self._queue_file(source, filename)
265         self.do_queued_work()
266
267     def write_directory_tree(self,
268                              path, stream_name='.', max_manifest_depth=-1):
269         self._queue_tree(path, stream_name, max_manifest_depth)
270         self.do_queued_work()
271
272     def write(self, newdata):
273         if hasattr(newdata, '__iter__'):
274             for s in newdata:
275                 self.write(s)
276             return
277         self._data_buffer += [newdata]
278         self._data_buffer_len += len(newdata)
279         self._current_stream_length += len(newdata)
280         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
281             self.flush_data()
282
283     def flush_data(self):
284         data_buffer = ''.join(self._data_buffer)
285         if data_buffer != '':
286             self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
287             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
288             self._data_buffer_len = len(self._data_buffer[0])
289
290     def start_new_file(self, newfilename=None):
291         self.finish_current_file()
292         self.set_current_file_name(newfilename)
293
294     def set_current_file_name(self, newfilename):
295         if re.search(r'[\t\n]', newfilename):
296             raise errors.AssertionError(
297                 "Manifest filenames cannot contain whitespace: %s" %
298                 newfilename)
299         self._current_file_name = newfilename
300
301     def current_file_name(self):
302         return self._current_file_name
303
304     def finish_current_file(self):
305         if self._current_file_name == None:
306             if self._current_file_pos == self._current_stream_length:
307                 return
308             raise errors.AssertionError(
309                 "Cannot finish an unnamed file " +
310                 "(%d bytes at offset %d in '%s' stream)" %
311                 (self._current_stream_length - self._current_file_pos,
312                  self._current_file_pos,
313                  self._current_stream_name))
314         self._current_stream_files += [[self._current_file_pos,
315                                        self._current_stream_length - self._current_file_pos,
316                                        self._current_file_name]]
317         self._current_file_pos = self._current_stream_length
318
319     def start_new_stream(self, newstreamname='.'):
320         self.finish_current_stream()
321         self.set_current_stream_name(newstreamname)
322
323     def set_current_stream_name(self, newstreamname):
324         if re.search(r'[\t\n]', newstreamname):
325             raise errors.AssertionError(
326                 "Manifest stream names cannot contain whitespace")
327         self._current_stream_name = '.' if newstreamname=='' else newstreamname
328
329     def current_stream_name(self):
330         return self._current_stream_name
331
332     def finish_current_stream(self):
333         self.finish_current_file()
334         self.flush_data()
335         if len(self._current_stream_files) == 0:
336             pass
337         elif self._current_stream_name == None:
338             raise errors.AssertionError(
339                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
340                 (self._current_stream_length, len(self._current_stream_files)))
341         else:
342             if len(self._current_stream_locators) == 0:
343                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
344             self._finished_streams += [[self._current_stream_name,
345                                        self._current_stream_locators,
346                                        self._current_stream_files]]
347         self._current_stream_files = []
348         self._current_stream_length = 0
349         self._current_stream_locators = []
350         self._current_stream_name = None
351         self._current_file_pos = 0
352         self._current_file_name = None
353
354     def finish(self):
355         # Send the stripped manifest to Keep, to ensure that we use the
356         # same UUID regardless of what hints are used on the collection.
357         return Keep.put(self.stripped_manifest())
358
359     def stripped_manifest(self):
360         """
361         Return the manifest for the current collection with all permission
362         hints removed from the locators in the manifest.
363         """
364         raw = self.manifest_text()
365         clean = ''
366         for line in raw.split("\n"):
367             fields = line.split()
368             if len(fields) > 0:
369                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
370                              for x in fields[1:-1] ]
371                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
372         return clean
373
374     def manifest_text(self):
375         self.finish_current_stream()
376         manifest = ''
377
378         for stream in self._finished_streams:
379             if not re.search(r'^\.(/.*)?$', stream[0]):
380                 manifest += './'
381             manifest += stream[0].replace(' ', '\\040')
382             manifest += ' ' + ' '.join(stream[1])
383             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
384             manifest += "\n"
385
386         if len(manifest) > 0:
387             return CollectionReader(manifest).manifest_text()
388         else:
389             return ""
390
391     def data_locators(self):
392         ret = []
393         for name, locators, files in self._finished_streams:
394             ret += locators
395         return ret
396
397
398 class ResumableCollectionWriter(CollectionWriter):
399     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
400                    '_current_stream_locators', '_current_stream_name',
401                    '_current_file_name', '_current_file_pos', '_close_file',
402                    '_data_buffer', '_dependencies', '_finished_streams',
403                    '_queued_dirents', '_queued_trees']
404
405     def __init__(self):
406         self._dependencies = {}
407         super(ResumableCollectionWriter, self).__init__()
408
409     @classmethod
410     def from_state(cls, state, *init_args, **init_kwargs):
411         # Try to build a new writer from scratch with the given state.
412         # If the state is not suitable to resume (because files have changed,
413         # been deleted, aren't predictable, etc.), raise a
414         # StaleWriterStateError.  Otherwise, return the initialized writer.
415         # The caller is responsible for calling writer.do_queued_work()
416         # appropriately after it's returned.
417         writer = cls(*init_args, **init_kwargs)
418         for attr_name in cls.STATE_PROPS:
419             attr_value = state[attr_name]
420             attr_class = getattr(writer, attr_name).__class__
421             # Coerce the value into the same type as the initial value, if
422             # needed.
423             if attr_class not in (type(None), attr_value.__class__):
424                 attr_value = attr_class(attr_value)
425             setattr(writer, attr_name, attr_value)
426         # Check dependencies before we try to resume anything.
427         if any(KeepLocator(ls).permission_expired()
428                for ls in writer._current_stream_locators):
429             raise errors.StaleWriterStateError(
430                 "locators include expired permission hint")
431         writer.check_dependencies()
432         if state['_current_file'] is not None:
433             path, pos = state['_current_file']
434             try:
435                 writer._queued_file = open(path, 'rb')
436                 writer._queued_file.seek(pos)
437             except IOError as error:
438                 raise errors.StaleWriterStateError(
439                     "failed to reopen active file {}: {}".format(path, error))
440         return writer
441
442     def check_dependencies(self):
443         for path, orig_stat in self._dependencies.items():
444             if not S_ISREG(orig_stat[ST_MODE]):
445                 raise errors.StaleWriterStateError("{} not file".format(path))
446             try:
447                 now_stat = tuple(os.stat(path))
448             except OSError as error:
449                 raise errors.StaleWriterStateError(
450                     "failed to stat {}: {}".format(path, error))
451             if ((not S_ISREG(now_stat[ST_MODE])) or
452                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
453                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
454                 raise errors.StaleWriterStateError("{} changed".format(path))
455
456     def dump_state(self, copy_func=lambda x: x):
457         state = {attr: copy_func(getattr(self, attr))
458                  for attr in self.STATE_PROPS}
459         if self._queued_file is None:
460             state['_current_file'] = None
461         else:
462             state['_current_file'] = (os.path.realpath(self._queued_file.name),
463                                       self._queued_file.tell())
464         return state
465
466     def _queue_file(self, source, filename=None):
467         try:
468             src_path = os.path.realpath(source)
469         except Exception:
470             raise errors.AssertionError("{} not a file path".format(source))
471         try:
472             path_stat = os.stat(src_path)
473         except OSError as stat_error:
474             path_stat = None
475         super(ResumableCollectionWriter, self)._queue_file(source, filename)
476         fd_stat = os.fstat(self._queued_file.fileno())
477         if not S_ISREG(fd_stat.st_mode):
478             # We won't be able to resume from this cache anyway, so don't
479             # worry about further checks.
480             self._dependencies[source] = tuple(fd_stat)
481         elif path_stat is None:
482             raise errors.AssertionError(
483                 "could not stat {}: {}".format(source, stat_error))
484         elif path_stat.st_ino != fd_stat.st_ino:
485             raise errors.AssertionError(
486                 "{} changed between open and stat calls".format(source))
487         else:
488             self._dependencies[src_path] = tuple(fd_stat)
489
490     def write(self, data):
491         if self._queued_file is None:
492             raise errors.AssertionError(
493                 "resumable writer can't accept unsourced data")
494         return super(ResumableCollectionWriter, self).write(data)