Merge branch '2800-pysdk-no-global-keep-client-wip'
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text, api_client=None):
96         self._api_client = api_client
97         self._keep_client = None
98         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
99             self._manifest_locator = manifest_locator_or_text
100             self._manifest_text = None
101         elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
102             self._manifest_text = manifest_locator_or_text
103             self._manifest_locator = None
104         else:
105             raise errors.ArgumentError(
106                 "Argument to CollectionReader must be a manifest or a collection UUID")
107         self._streams = None
108
109     def __enter__(self):
110         pass
111
112     def __exit__(self):
113         pass
114
115     def _populate(self):
116         if self._streams is not None:
117             return
118         if not self._manifest_text:
119             if self._api_client is None:
120                 self._api_client = arvados.api('v1')
121             if self._keep_client is None:
122                 self._keep_client = KeepClient(api_client=self._api_client)
123             try:
124                 c = self._api_client.collections().get(
125                     uuid=self._manifest_locator).execute()
126                 self._manifest_text = c['manifest_text']
127             except Exception as e:
128                 _logger.warning("API lookup failed for collection %s (%s: %s)",
129                                 self._manifest_locator, type(e), str(e))
130                 self._manifest_text = self._keep_client.get(self._manifest_locator)
131         self._streams = []
132         for stream_line in self._manifest_text.split("\n"):
133             if stream_line != '':
134                 stream_tokens = stream_line.split()
135                 self._streams += [stream_tokens]
136         self._streams = normalize(self)
137
138         # now regenerate the manifest text based on the normalized stream
139
140         #print "normalizing", self._manifest_text
141         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
142         #print "result", self._manifest_text
143
144
145     def all_streams(self):
146         self._populate()
147         resp = []
148         for s in self._streams:
149             resp.append(StreamReader(s))
150         return resp
151
152     def all_files(self):
153         for s in self.all_streams():
154             for f in s.all_files():
155                 yield f
156
157     def manifest_text(self, strip=False):
158         self._populate()
159         if strip:
160             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
161             return m
162         else:
163             return self._manifest_text
164
165 class CollectionWriter(object):
166     KEEP_BLOCK_SIZE = 2**26
167
168     def __init__(self, api_client=None):
169         self._api_client = api_client
170         self._keep_client = None
171         self._data_buffer = []
172         self._data_buffer_len = 0
173         self._current_stream_files = []
174         self._current_stream_length = 0
175         self._current_stream_locators = []
176         self._current_stream_name = '.'
177         self._current_file_name = None
178         self._current_file_pos = 0
179         self._finished_streams = []
180         self._close_file = None
181         self._queued_file = None
182         self._queued_dirents = deque()
183         self._queued_trees = deque()
184
185     def __enter__(self):
186         pass
187
188     def __exit__(self):
189         self.finish()
190
191     def _prep_keep_client(self):
192         if self._keep_client is None:
193             self._keep_client = KeepClient(api_client=self._api_client)
194
195     def do_queued_work(self):
196         # The work queue consists of three pieces:
197         # * _queued_file: The file object we're currently writing to the
198         #   Collection.
199         # * _queued_dirents: Entries under the current directory
200         #   (_queued_trees[0]) that we want to write or recurse through.
201         #   This may contain files from subdirectories if
202         #   max_manifest_depth == 0 for this directory.
203         # * _queued_trees: Directories that should be written as separate
204         #   streams to the Collection.
205         # This function handles the smallest piece of work currently queued
206         # (current file, then current directory, then next directory) until
207         # no work remains.  The _work_THING methods each do a unit of work on
208         # THING.  _queue_THING methods add a THING to the work queue.
209         while True:
210             if self._queued_file:
211                 self._work_file()
212             elif self._queued_dirents:
213                 self._work_dirents()
214             elif self._queued_trees:
215                 self._work_trees()
216             else:
217                 break
218
219     def _work_file(self):
220         while True:
221             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
222             if not buf:
223                 break
224             self.write(buf)
225         self.finish_current_file()
226         if self._close_file:
227             self._queued_file.close()
228         self._close_file = None
229         self._queued_file = None
230
231     def _work_dirents(self):
232         path, stream_name, max_manifest_depth = self._queued_trees[0]
233         if stream_name != self.current_stream_name():
234             self.start_new_stream(stream_name)
235         while self._queued_dirents:
236             dirent = self._queued_dirents.popleft()
237             target = os.path.join(path, dirent)
238             if os.path.isdir(target):
239                 self._queue_tree(target,
240                                  os.path.join(stream_name, dirent),
241                                  max_manifest_depth - 1)
242             else:
243                 self._queue_file(target, dirent)
244                 break
245         if not self._queued_dirents:
246             self._queued_trees.popleft()
247
248     def _work_trees(self):
249         path, stream_name, max_manifest_depth = self._queued_trees[0]
250         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
251                         else os.listdir)
252         d = make_dirents(path)
253         if len(d) > 0:
254             self._queue_dirents(stream_name, d)
255         else:
256             self._queued_trees.popleft()
257
258     def _queue_file(self, source, filename=None):
259         assert (self._queued_file is None), "tried to queue more than one file"
260         if not hasattr(source, 'read'):
261             source = open(source, 'rb')
262             self._close_file = True
263         else:
264             self._close_file = False
265         if filename is None:
266             filename = os.path.basename(source.name)
267         self.start_new_file(filename)
268         self._queued_file = source
269
270     def _queue_dirents(self, stream_name, dirents):
271         assert (not self._queued_dirents), "tried to queue more than one tree"
272         self._queued_dirents = deque(sorted(dirents))
273
274     def _queue_tree(self, path, stream_name, max_manifest_depth):
275         self._queued_trees.append((path, stream_name, max_manifest_depth))
276
277     def write_file(self, source, filename=None):
278         self._queue_file(source, filename)
279         self.do_queued_work()
280
281     def write_directory_tree(self,
282                              path, stream_name='.', max_manifest_depth=-1):
283         self._queue_tree(path, stream_name, max_manifest_depth)
284         self.do_queued_work()
285
286     def write(self, newdata):
287         if hasattr(newdata, '__iter__'):
288             for s in newdata:
289                 self.write(s)
290             return
291         self._data_buffer += [newdata]
292         self._data_buffer_len += len(newdata)
293         self._current_stream_length += len(newdata)
294         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
295             self.flush_data()
296
297     def flush_data(self):
298         data_buffer = ''.join(self._data_buffer)
299         if data_buffer != '':
300             self._prep_keep_client()
301             self._current_stream_locators.append(
302                 self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
303             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
304             self._data_buffer_len = len(self._data_buffer[0])
305
306     def start_new_file(self, newfilename=None):
307         self.finish_current_file()
308         self.set_current_file_name(newfilename)
309
310     def set_current_file_name(self, newfilename):
311         if re.search(r'[\t\n]', newfilename):
312             raise errors.AssertionError(
313                 "Manifest filenames cannot contain whitespace: %s" %
314                 newfilename)
315         self._current_file_name = newfilename
316
317     def current_file_name(self):
318         return self._current_file_name
319
320     def finish_current_file(self):
321         if self._current_file_name == None:
322             if self._current_file_pos == self._current_stream_length:
323                 return
324             raise errors.AssertionError(
325                 "Cannot finish an unnamed file " +
326                 "(%d bytes at offset %d in '%s' stream)" %
327                 (self._current_stream_length - self._current_file_pos,
328                  self._current_file_pos,
329                  self._current_stream_name))
330         self._current_stream_files += [[self._current_file_pos,
331                                         self._current_stream_length - self._current_file_pos,
332                                         self._current_file_name]]
333         self._current_file_pos = self._current_stream_length
334
335     def start_new_stream(self, newstreamname='.'):
336         self.finish_current_stream()
337         self.set_current_stream_name(newstreamname)
338
339     def set_current_stream_name(self, newstreamname):
340         if re.search(r'[\t\n]', newstreamname):
341             raise errors.AssertionError(
342                 "Manifest stream names cannot contain whitespace")
343         self._current_stream_name = '.' if newstreamname=='' else newstreamname
344
345     def current_stream_name(self):
346         return self._current_stream_name
347
348     def finish_current_stream(self):
349         self.finish_current_file()
350         self.flush_data()
351         if len(self._current_stream_files) == 0:
352             pass
353         elif self._current_stream_name == None:
354             raise errors.AssertionError(
355                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
356                 (self._current_stream_length, len(self._current_stream_files)))
357         else:
358             if len(self._current_stream_locators) == 0:
359                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
360             self._finished_streams += [[self._current_stream_name,
361                                         self._current_stream_locators,
362                                         self._current_stream_files]]
363         self._current_stream_files = []
364         self._current_stream_length = 0
365         self._current_stream_locators = []
366         self._current_stream_name = None
367         self._current_file_pos = 0
368         self._current_file_name = None
369
370     def finish(self):
371         # Store the manifest in Keep and return its locator.
372         self._prep_keep_client()
373         return self._keep_client.put(self.manifest_text())
374
375     def stripped_manifest(self):
376         """
377         Return the manifest for the current collection with all permission
378         hints removed from the locators in the manifest.
379         """
380         raw = self.manifest_text()
381         clean = ''
382         for line in raw.split("\n"):
383             fields = line.split()
384             if len(fields) > 0:
385                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
386                              for x in fields[1:-1] ]
387                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
388         return clean
389
390     def manifest_text(self):
391         self.finish_current_stream()
392         manifest = ''
393
394         for stream in self._finished_streams:
395             if not re.search(r'^\.(/.*)?$', stream[0]):
396                 manifest += './'
397             manifest += stream[0].replace(' ', '\\040')
398             manifest += ' ' + ' '.join(stream[1])
399             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
400             manifest += "\n"
401
402         if len(manifest) > 0:
403             return CollectionReader(manifest).manifest_text()
404         else:
405             return ""
406
407     def data_locators(self):
408         ret = []
409         for name, locators, files in self._finished_streams:
410             ret += locators
411         return ret
412
413
414 class ResumableCollectionWriter(CollectionWriter):
415     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
416                    '_current_stream_locators', '_current_stream_name',
417                    '_current_file_name', '_current_file_pos', '_close_file',
418                    '_data_buffer', '_dependencies', '_finished_streams',
419                    '_queued_dirents', '_queued_trees']
420
421     def __init__(self, api_client=None):
422         self._dependencies = {}
423         super(ResumableCollectionWriter, self).__init__(api_client)
424
425     @classmethod
426     def from_state(cls, state, *init_args, **init_kwargs):
427         # Try to build a new writer from scratch with the given state.
428         # If the state is not suitable to resume (because files have changed,
429         # been deleted, aren't predictable, etc.), raise a
430         # StaleWriterStateError.  Otherwise, return the initialized writer.
431         # The caller is responsible for calling writer.do_queued_work()
432         # appropriately after it's returned.
433         writer = cls(*init_args, **init_kwargs)
434         for attr_name in cls.STATE_PROPS:
435             attr_value = state[attr_name]
436             attr_class = getattr(writer, attr_name).__class__
437             # Coerce the value into the same type as the initial value, if
438             # needed.
439             if attr_class not in (type(None), attr_value.__class__):
440                 attr_value = attr_class(attr_value)
441             setattr(writer, attr_name, attr_value)
442         # Check dependencies before we try to resume anything.
443         if any(KeepLocator(ls).permission_expired()
444                for ls in writer._current_stream_locators):
445             raise errors.StaleWriterStateError(
446                 "locators include expired permission hint")
447         writer.check_dependencies()
448         if state['_current_file'] is not None:
449             path, pos = state['_current_file']
450             try:
451                 writer._queued_file = open(path, 'rb')
452                 writer._queued_file.seek(pos)
453             except IOError as error:
454                 raise errors.StaleWriterStateError(
455                     "failed to reopen active file {}: {}".format(path, error))
456         return writer
457
458     def check_dependencies(self):
459         for path, orig_stat in self._dependencies.items():
460             if not S_ISREG(orig_stat[ST_MODE]):
461                 raise errors.StaleWriterStateError("{} not file".format(path))
462             try:
463                 now_stat = tuple(os.stat(path))
464             except OSError as error:
465                 raise errors.StaleWriterStateError(
466                     "failed to stat {}: {}".format(path, error))
467             if ((not S_ISREG(now_stat[ST_MODE])) or
468                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
469                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
470                 raise errors.StaleWriterStateError("{} changed".format(path))
471
472     def dump_state(self, copy_func=lambda x: x):
473         state = {attr: copy_func(getattr(self, attr))
474                  for attr in self.STATE_PROPS}
475         if self._queued_file is None:
476             state['_current_file'] = None
477         else:
478             state['_current_file'] = (os.path.realpath(self._queued_file.name),
479                                       self._queued_file.tell())
480         return state
481
482     def _queue_file(self, source, filename=None):
483         try:
484             src_path = os.path.realpath(source)
485         except Exception:
486             raise errors.AssertionError("{} not a file path".format(source))
487         try:
488             path_stat = os.stat(src_path)
489         except OSError as stat_error:
490             path_stat = None
491         super(ResumableCollectionWriter, self)._queue_file(source, filename)
492         fd_stat = os.fstat(self._queued_file.fileno())
493         if not S_ISREG(fd_stat.st_mode):
494             # We won't be able to resume from this cache anyway, so don't
495             # worry about further checks.
496             self._dependencies[source] = tuple(fd_stat)
497         elif path_stat is None:
498             raise errors.AssertionError(
499                 "could not stat {}: {}".format(source, stat_error))
500         elif path_stat.st_ino != fd_stat.st_ino:
501             raise errors.AssertionError(
502                 "{} changed between open and stat calls".format(source))
503         else:
504             self._dependencies[src_path] = tuple(fd_stat)
505
506     def write(self, data):
507         if self._queued_file is None:
508             raise errors.AssertionError(
509                 "resumable writer can't accept unsourced data")
510         return super(ResumableCollectionWriter, self).write(data)