2800: Delay API client creation in Python CollectionReader.
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text, api_client=None):
96         self._api_client = api_client
97         self._keep_client = None
98         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
99             self._manifest_locator = manifest_locator_or_text
100             self._manifest_text = None
101         elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
102             self._manifest_text = manifest_locator_or_text
103             self._manifest_locator = None
104         else:
105             raise errors.ArgumentError(
106                 "Argument to CollectionReader must be a manifest or a collection UUID")
107         self._streams = None
108
109     def __enter__(self):
110         pass
111
112     def __exit__(self):
113         pass
114
115     def _populate(self):
116         if self._streams is not None:
117             return
118         if not self._manifest_text:
119             try:
120                 # As in KeepClient itself, we must wait until the last possible
121                 # moment to instantiate an API client, in order to avoid
122                 # tripping up clients that don't have access to an API server.
123                 # If we do build one, make sure our Keep client uses it.
124                 # If instantiation fails, we'll fall back to the except clause,
125                 # just like any other Collection lookup failure.
126                 if self._api_client is None:
127                     self._api_client = arvados.api('v1')
128                     self._keep_client = KeepClient(api_client=self._api_client)
129                 if self._keep_client is None:
130                     self._keep_client = KeepClient(api_client=self._api_client)
131                 c = self._api_client.collections().get(
132                     uuid=self._manifest_locator).execute()
133                 self._manifest_text = c['manifest_text']
134             except Exception as e:
135                 _logger.warning("API lookup failed for collection %s (%s: %s)",
136                                 self._manifest_locator, type(e), str(e))
137                 if self._keep_client is None:
138                     self._keep_client = KeepClient(api_client=self._api_client)
139                 self._manifest_text = self._keep_client.get(self._manifest_locator)
140         self._streams = []
141         for stream_line in self._manifest_text.split("\n"):
142             if stream_line != '':
143                 stream_tokens = stream_line.split()
144                 self._streams += [stream_tokens]
145         self._streams = normalize(self)
146
147         # now regenerate the manifest text based on the normalized stream
148
149         #print "normalizing", self._manifest_text
150         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
151         #print "result", self._manifest_text
152
153
154     def all_streams(self):
155         self._populate()
156         resp = []
157         for s in self._streams:
158             resp.append(StreamReader(s))
159         return resp
160
161     def all_files(self):
162         for s in self.all_streams():
163             for f in s.all_files():
164                 yield f
165
166     def manifest_text(self, strip=False):
167         self._populate()
168         if strip:
169             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
170             return m
171         else:
172             return self._manifest_text
173
174 class CollectionWriter(object):
175     KEEP_BLOCK_SIZE = 2**26
176
177     def __init__(self, api_client=None):
178         self._api_client = api_client
179         self._keep_client = None
180         self._data_buffer = []
181         self._data_buffer_len = 0
182         self._current_stream_files = []
183         self._current_stream_length = 0
184         self._current_stream_locators = []
185         self._current_stream_name = '.'
186         self._current_file_name = None
187         self._current_file_pos = 0
188         self._finished_streams = []
189         self._close_file = None
190         self._queued_file = None
191         self._queued_dirents = deque()
192         self._queued_trees = deque()
193
194     def __enter__(self):
195         pass
196
197     def __exit__(self):
198         self.finish()
199
200     def _prep_keep_client(self):
201         if self._keep_client is None:
202             self._keep_client = KeepClient(api_client=self._api_client)
203
204     def do_queued_work(self):
205         # The work queue consists of three pieces:
206         # * _queued_file: The file object we're currently writing to the
207         #   Collection.
208         # * _queued_dirents: Entries under the current directory
209         #   (_queued_trees[0]) that we want to write or recurse through.
210         #   This may contain files from subdirectories if
211         #   max_manifest_depth == 0 for this directory.
212         # * _queued_trees: Directories that should be written as separate
213         #   streams to the Collection.
214         # This function handles the smallest piece of work currently queued
215         # (current file, then current directory, then next directory) until
216         # no work remains.  The _work_THING methods each do a unit of work on
217         # THING.  _queue_THING methods add a THING to the work queue.
218         while True:
219             if self._queued_file:
220                 self._work_file()
221             elif self._queued_dirents:
222                 self._work_dirents()
223             elif self._queued_trees:
224                 self._work_trees()
225             else:
226                 break
227
228     def _work_file(self):
229         while True:
230             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
231             if not buf:
232                 break
233             self.write(buf)
234         self.finish_current_file()
235         if self._close_file:
236             self._queued_file.close()
237         self._close_file = None
238         self._queued_file = None
239
240     def _work_dirents(self):
241         path, stream_name, max_manifest_depth = self._queued_trees[0]
242         if stream_name != self.current_stream_name():
243             self.start_new_stream(stream_name)
244         while self._queued_dirents:
245             dirent = self._queued_dirents.popleft()
246             target = os.path.join(path, dirent)
247             if os.path.isdir(target):
248                 self._queue_tree(target,
249                                  os.path.join(stream_name, dirent),
250                                  max_manifest_depth - 1)
251             else:
252                 self._queue_file(target, dirent)
253                 break
254         if not self._queued_dirents:
255             self._queued_trees.popleft()
256
257     def _work_trees(self):
258         path, stream_name, max_manifest_depth = self._queued_trees[0]
259         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
260                         else os.listdir)
261         d = make_dirents(path)
262         if len(d) > 0:
263             self._queue_dirents(stream_name, d)
264         else:
265             self._queued_trees.popleft()
266
267     def _queue_file(self, source, filename=None):
268         assert (self._queued_file is None), "tried to queue more than one file"
269         if not hasattr(source, 'read'):
270             source = open(source, 'rb')
271             self._close_file = True
272         else:
273             self._close_file = False
274         if filename is None:
275             filename = os.path.basename(source.name)
276         self.start_new_file(filename)
277         self._queued_file = source
278
279     def _queue_dirents(self, stream_name, dirents):
280         assert (not self._queued_dirents), "tried to queue more than one tree"
281         self._queued_dirents = deque(sorted(dirents))
282
283     def _queue_tree(self, path, stream_name, max_manifest_depth):
284         self._queued_trees.append((path, stream_name, max_manifest_depth))
285
286     def write_file(self, source, filename=None):
287         self._queue_file(source, filename)
288         self.do_queued_work()
289
290     def write_directory_tree(self,
291                              path, stream_name='.', max_manifest_depth=-1):
292         self._queue_tree(path, stream_name, max_manifest_depth)
293         self.do_queued_work()
294
295     def write(self, newdata):
296         if hasattr(newdata, '__iter__'):
297             for s in newdata:
298                 self.write(s)
299             return
300         self._data_buffer += [newdata]
301         self._data_buffer_len += len(newdata)
302         self._current_stream_length += len(newdata)
303         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
304             self.flush_data()
305
306     def flush_data(self):
307         data_buffer = ''.join(self._data_buffer)
308         if data_buffer != '':
309             self._prep_keep_client()
310             self._current_stream_locators.append(
311                 self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
312             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
313             self._data_buffer_len = len(self._data_buffer[0])
314
315     def start_new_file(self, newfilename=None):
316         self.finish_current_file()
317         self.set_current_file_name(newfilename)
318
319     def set_current_file_name(self, newfilename):
320         if re.search(r'[\t\n]', newfilename):
321             raise errors.AssertionError(
322                 "Manifest filenames cannot contain whitespace: %s" %
323                 newfilename)
324         self._current_file_name = newfilename
325
326     def current_file_name(self):
327         return self._current_file_name
328
329     def finish_current_file(self):
330         if self._current_file_name == None:
331             if self._current_file_pos == self._current_stream_length:
332                 return
333             raise errors.AssertionError(
334                 "Cannot finish an unnamed file " +
335                 "(%d bytes at offset %d in '%s' stream)" %
336                 (self._current_stream_length - self._current_file_pos,
337                  self._current_file_pos,
338                  self._current_stream_name))
339         self._current_stream_files += [[self._current_file_pos,
340                                         self._current_stream_length - self._current_file_pos,
341                                         self._current_file_name]]
342         self._current_file_pos = self._current_stream_length
343
344     def start_new_stream(self, newstreamname='.'):
345         self.finish_current_stream()
346         self.set_current_stream_name(newstreamname)
347
348     def set_current_stream_name(self, newstreamname):
349         if re.search(r'[\t\n]', newstreamname):
350             raise errors.AssertionError(
351                 "Manifest stream names cannot contain whitespace")
352         self._current_stream_name = '.' if newstreamname=='' else newstreamname
353
354     def current_stream_name(self):
355         return self._current_stream_name
356
357     def finish_current_stream(self):
358         self.finish_current_file()
359         self.flush_data()
360         if len(self._current_stream_files) == 0:
361             pass
362         elif self._current_stream_name == None:
363             raise errors.AssertionError(
364                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
365                 (self._current_stream_length, len(self._current_stream_files)))
366         else:
367             if len(self._current_stream_locators) == 0:
368                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
369             self._finished_streams += [[self._current_stream_name,
370                                         self._current_stream_locators,
371                                         self._current_stream_files]]
372         self._current_stream_files = []
373         self._current_stream_length = 0
374         self._current_stream_locators = []
375         self._current_stream_name = None
376         self._current_file_pos = 0
377         self._current_file_name = None
378
379     def finish(self):
380         # Store the manifest in Keep and return its locator.
381         self._prep_keep_client()
382         return self._keep_client.put(self.manifest_text())
383
384     def stripped_manifest(self):
385         """
386         Return the manifest for the current collection with all permission
387         hints removed from the locators in the manifest.
388         """
389         raw = self.manifest_text()
390         clean = ''
391         for line in raw.split("\n"):
392             fields = line.split()
393             if len(fields) > 0:
394                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
395                              for x in fields[1:-1] ]
396                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
397         return clean
398
399     def manifest_text(self):
400         self.finish_current_stream()
401         manifest = ''
402
403         for stream in self._finished_streams:
404             if not re.search(r'^\.(/.*)?$', stream[0]):
405                 manifest += './'
406             manifest += stream[0].replace(' ', '\\040')
407             manifest += ' ' + ' '.join(stream[1])
408             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
409             manifest += "\n"
410
411         if len(manifest) > 0:
412             return CollectionReader(manifest).manifest_text()
413         else:
414             return ""
415
416     def data_locators(self):
417         ret = []
418         for name, locators, files in self._finished_streams:
419             ret += locators
420         return ret
421
422
423 class ResumableCollectionWriter(CollectionWriter):
424     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
425                    '_current_stream_locators', '_current_stream_name',
426                    '_current_file_name', '_current_file_pos', '_close_file',
427                    '_data_buffer', '_dependencies', '_finished_streams',
428                    '_queued_dirents', '_queued_trees']
429
430     def __init__(self, api_client=None):
431         self._dependencies = {}
432         super(ResumableCollectionWriter, self).__init__(api_client)
433
434     @classmethod
435     def from_state(cls, state, *init_args, **init_kwargs):
436         # Try to build a new writer from scratch with the given state.
437         # If the state is not suitable to resume (because files have changed,
438         # been deleted, aren't predictable, etc.), raise a
439         # StaleWriterStateError.  Otherwise, return the initialized writer.
440         # The caller is responsible for calling writer.do_queued_work()
441         # appropriately after it's returned.
442         writer = cls(*init_args, **init_kwargs)
443         for attr_name in cls.STATE_PROPS:
444             attr_value = state[attr_name]
445             attr_class = getattr(writer, attr_name).__class__
446             # Coerce the value into the same type as the initial value, if
447             # needed.
448             if attr_class not in (type(None), attr_value.__class__):
449                 attr_value = attr_class(attr_value)
450             setattr(writer, attr_name, attr_value)
451         # Check dependencies before we try to resume anything.
452         if any(KeepLocator(ls).permission_expired()
453                for ls in writer._current_stream_locators):
454             raise errors.StaleWriterStateError(
455                 "locators include expired permission hint")
456         writer.check_dependencies()
457         if state['_current_file'] is not None:
458             path, pos = state['_current_file']
459             try:
460                 writer._queued_file = open(path, 'rb')
461                 writer._queued_file.seek(pos)
462             except IOError as error:
463                 raise errors.StaleWriterStateError(
464                     "failed to reopen active file {}: {}".format(path, error))
465         return writer
466
467     def check_dependencies(self):
468         for path, orig_stat in self._dependencies.items():
469             if not S_ISREG(orig_stat[ST_MODE]):
470                 raise errors.StaleWriterStateError("{} not file".format(path))
471             try:
472                 now_stat = tuple(os.stat(path))
473             except OSError as error:
474                 raise errors.StaleWriterStateError(
475                     "failed to stat {}: {}".format(path, error))
476             if ((not S_ISREG(now_stat[ST_MODE])) or
477                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
478                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
479                 raise errors.StaleWriterStateError("{} changed".format(path))
480
481     def dump_state(self, copy_func=lambda x: x):
482         state = {attr: copy_func(getattr(self, attr))
483                  for attr in self.STATE_PROPS}
484         if self._queued_file is None:
485             state['_current_file'] = None
486         else:
487             state['_current_file'] = (os.path.realpath(self._queued_file.name),
488                                       self._queued_file.tell())
489         return state
490
491     def _queue_file(self, source, filename=None):
492         try:
493             src_path = os.path.realpath(source)
494         except Exception:
495             raise errors.AssertionError("{} not a file path".format(source))
496         try:
497             path_stat = os.stat(src_path)
498         except OSError as stat_error:
499             path_stat = None
500         super(ResumableCollectionWriter, self)._queue_file(source, filename)
501         fd_stat = os.fstat(self._queued_file.fileno())
502         if not S_ISREG(fd_stat.st_mode):
503             # We won't be able to resume from this cache anyway, so don't
504             # worry about further checks.
505             self._dependencies[source] = tuple(fd_stat)
506         elif path_stat is None:
507             raise errors.AssertionError(
508                 "could not stat {}: {}".format(source, stat_error))
509         elif path_stat.st_ino != fd_stat.st_ino:
510             raise errors.AssertionError(
511                 "{} changed between open and stat calls".format(source))
512         else:
513             self._dependencies[src_path] = tuple(fd_stat)
514
515     def write(self, data):
516         if self._queued_file is None:
517             raise errors.AssertionError(
518                 "resumable writer can't accept unsourced data")
519         return super(ResumableCollectionWriter, self).write(data)