3147: PySDK tests use mock>=1.0 and easier mock side_effect.
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if len(stream[f]) == 0:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71 def normalize(collection):
72     streams = {}
73     for s in collection.all_streams():
74         for f in s.all_files():
75             filestream = s.name() + "/" + f.name()
76             r = filestream.rindex("/")
77             streamname = filestream[:r]
78             filename = filestream[r+1:]
79             if streamname not in streams:
80                 streams[streamname] = {}
81             if filename not in streams[streamname]:
82                 streams[streamname][filename] = []
83             for r in f.segments:
84                 streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
85
86     normalized_streams = []
87     sortedstreams = list(streams.keys())
88     sortedstreams.sort()
89     for s in sortedstreams:
90         normalized_streams.append(normalize_stream(s, streams[s]))
91     return normalized_streams
92
93
94 class CollectionReader(object):
95     def __init__(self, manifest_locator_or_text, api_client=None):
96         self._api_client = api_client
97         self._keep_client = None
98         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
99             self._manifest_locator = manifest_locator_or_text
100             self._manifest_text = None
101         elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
102             self._manifest_locator = manifest_locator_or_text
103             self._manifest_text = None
104         elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
105             self._manifest_text = manifest_locator_or_text
106             self._manifest_locator = None
107         else:
108             raise errors.ArgumentError(
109                 "Argument to CollectionReader must be a manifest or a collection UUID")
110         self._streams = None
111
112     def __enter__(self):
113         pass
114
115     def __exit__(self):
116         pass
117
118     def _populate(self):
119         if self._streams is not None:
120             return
121         if not self._manifest_text:
122             try:
123                 # As in KeepClient itself, we must wait until the last possible
124                 # moment to instantiate an API client, in order to avoid
125                 # tripping up clients that don't have access to an API server.
126                 # If we do build one, make sure our Keep client uses it.
127                 # If instantiation fails, we'll fall back to the except clause,
128                 # just like any other Collection lookup failure.
129                 if self._api_client is None:
130                     self._api_client = arvados.api('v1')
131                     self._keep_client = KeepClient(api_client=self._api_client)
132                 if self._keep_client is None:
133                     self._keep_client = KeepClient(api_client=self._api_client)
134                 c = self._api_client.collections().get(
135                     uuid=self._manifest_locator).execute()
136                 self._manifest_text = c['manifest_text']
137             except Exception as e:
138                 if not util.portable_data_hash_pattern.match(
139                       self._manifest_locator):
140                     raise
141                 _logger.warning("API lookup failed for collection %s (%s: %s)",
142                                 self._manifest_locator, type(e), str(e))
143                 if self._keep_client is None:
144                     self._keep_client = KeepClient(api_client=self._api_client)
145                 self._manifest_text = self._keep_client.get(self._manifest_locator)
146         self._streams = []
147         for stream_line in self._manifest_text.split("\n"):
148             if stream_line != '':
149                 stream_tokens = stream_line.split()
150                 self._streams += [stream_tokens]
151         self._streams = normalize(self)
152
153         # now regenerate the manifest text based on the normalized stream
154
155         #print "normalizing", self._manifest_text
156         self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
157         #print "result", self._manifest_text
158
159
160     def all_streams(self):
161         self._populate()
162         resp = []
163         for s in self._streams:
164             resp.append(StreamReader(s))
165         return resp
166
167     def all_files(self):
168         for s in self.all_streams():
169             for f in s.all_files():
170                 yield f
171
172     def manifest_text(self, strip=False):
173         self._populate()
174         if strip:
175             m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
176             return m
177         else:
178             return self._manifest_text
179
180 class CollectionWriter(object):
181     KEEP_BLOCK_SIZE = 2**26
182
183     def __init__(self, api_client=None):
184         self._api_client = api_client
185         self._keep_client = None
186         self._data_buffer = []
187         self._data_buffer_len = 0
188         self._current_stream_files = []
189         self._current_stream_length = 0
190         self._current_stream_locators = []
191         self._current_stream_name = '.'
192         self._current_file_name = None
193         self._current_file_pos = 0
194         self._finished_streams = []
195         self._close_file = None
196         self._queued_file = None
197         self._queued_dirents = deque()
198         self._queued_trees = deque()
199
200     def __enter__(self):
201         pass
202
203     def __exit__(self):
204         self.finish()
205
206     def _prep_keep_client(self):
207         if self._keep_client is None:
208             self._keep_client = KeepClient(api_client=self._api_client)
209
210     def do_queued_work(self):
211         # The work queue consists of three pieces:
212         # * _queued_file: The file object we're currently writing to the
213         #   Collection.
214         # * _queued_dirents: Entries under the current directory
215         #   (_queued_trees[0]) that we want to write or recurse through.
216         #   This may contain files from subdirectories if
217         #   max_manifest_depth == 0 for this directory.
218         # * _queued_trees: Directories that should be written as separate
219         #   streams to the Collection.
220         # This function handles the smallest piece of work currently queued
221         # (current file, then current directory, then next directory) until
222         # no work remains.  The _work_THING methods each do a unit of work on
223         # THING.  _queue_THING methods add a THING to the work queue.
224         while True:
225             if self._queued_file:
226                 self._work_file()
227             elif self._queued_dirents:
228                 self._work_dirents()
229             elif self._queued_trees:
230                 self._work_trees()
231             else:
232                 break
233
234     def _work_file(self):
235         while True:
236             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
237             if not buf:
238                 break
239             self.write(buf)
240         self.finish_current_file()
241         if self._close_file:
242             self._queued_file.close()
243         self._close_file = None
244         self._queued_file = None
245
246     def _work_dirents(self):
247         path, stream_name, max_manifest_depth = self._queued_trees[0]
248         if stream_name != self.current_stream_name():
249             self.start_new_stream(stream_name)
250         while self._queued_dirents:
251             dirent = self._queued_dirents.popleft()
252             target = os.path.join(path, dirent)
253             if os.path.isdir(target):
254                 self._queue_tree(target,
255                                  os.path.join(stream_name, dirent),
256                                  max_manifest_depth - 1)
257             else:
258                 self._queue_file(target, dirent)
259                 break
260         if not self._queued_dirents:
261             self._queued_trees.popleft()
262
263     def _work_trees(self):
264         path, stream_name, max_manifest_depth = self._queued_trees[0]
265         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
266                         else os.listdir)
267         d = make_dirents(path)
268         if len(d) > 0:
269             self._queue_dirents(stream_name, d)
270         else:
271             self._queued_trees.popleft()
272
273     def _queue_file(self, source, filename=None):
274         assert (self._queued_file is None), "tried to queue more than one file"
275         if not hasattr(source, 'read'):
276             source = open(source, 'rb')
277             self._close_file = True
278         else:
279             self._close_file = False
280         if filename is None:
281             filename = os.path.basename(source.name)
282         self.start_new_file(filename)
283         self._queued_file = source
284
285     def _queue_dirents(self, stream_name, dirents):
286         assert (not self._queued_dirents), "tried to queue more than one tree"
287         self._queued_dirents = deque(sorted(dirents))
288
289     def _queue_tree(self, path, stream_name, max_manifest_depth):
290         self._queued_trees.append((path, stream_name, max_manifest_depth))
291
292     def write_file(self, source, filename=None):
293         self._queue_file(source, filename)
294         self.do_queued_work()
295
296     def write_directory_tree(self,
297                              path, stream_name='.', max_manifest_depth=-1):
298         self._queue_tree(path, stream_name, max_manifest_depth)
299         self.do_queued_work()
300
301     def write(self, newdata):
302         if hasattr(newdata, '__iter__'):
303             for s in newdata:
304                 self.write(s)
305             return
306         self._data_buffer += [newdata]
307         self._data_buffer_len += len(newdata)
308         self._current_stream_length += len(newdata)
309         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
310             self.flush_data()
311
312     def flush_data(self):
313         data_buffer = ''.join(self._data_buffer)
314         if data_buffer != '':
315             self._prep_keep_client()
316             self._current_stream_locators.append(
317                 self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
318             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
319             self._data_buffer_len = len(self._data_buffer[0])
320
321     def start_new_file(self, newfilename=None):
322         self.finish_current_file()
323         self.set_current_file_name(newfilename)
324
325     def set_current_file_name(self, newfilename):
326         if re.search(r'[\t\n]', newfilename):
327             raise errors.AssertionError(
328                 "Manifest filenames cannot contain whitespace: %s" %
329                 newfilename)
330         self._current_file_name = newfilename
331
332     def current_file_name(self):
333         return self._current_file_name
334
335     def finish_current_file(self):
336         if self._current_file_name == None:
337             if self._current_file_pos == self._current_stream_length:
338                 return
339             raise errors.AssertionError(
340                 "Cannot finish an unnamed file " +
341                 "(%d bytes at offset %d in '%s' stream)" %
342                 (self._current_stream_length - self._current_file_pos,
343                  self._current_file_pos,
344                  self._current_stream_name))
345         self._current_stream_files += [[self._current_file_pos,
346                                         self._current_stream_length - self._current_file_pos,
347                                         self._current_file_name]]
348         self._current_file_pos = self._current_stream_length
349
350     def start_new_stream(self, newstreamname='.'):
351         self.finish_current_stream()
352         self.set_current_stream_name(newstreamname)
353
354     def set_current_stream_name(self, newstreamname):
355         if re.search(r'[\t\n]', newstreamname):
356             raise errors.AssertionError(
357                 "Manifest stream names cannot contain whitespace")
358         self._current_stream_name = '.' if newstreamname=='' else newstreamname
359
360     def current_stream_name(self):
361         return self._current_stream_name
362
363     def finish_current_stream(self):
364         self.finish_current_file()
365         self.flush_data()
366         if len(self._current_stream_files) == 0:
367             pass
368         elif self._current_stream_name == None:
369             raise errors.AssertionError(
370                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
371                 (self._current_stream_length, len(self._current_stream_files)))
372         else:
373             if len(self._current_stream_locators) == 0:
374                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
375             self._finished_streams += [[self._current_stream_name,
376                                         self._current_stream_locators,
377                                         self._current_stream_files]]
378         self._current_stream_files = []
379         self._current_stream_length = 0
380         self._current_stream_locators = []
381         self._current_stream_name = None
382         self._current_file_pos = 0
383         self._current_file_name = None
384
385     def finish(self):
386         # Store the manifest in Keep and return its locator.
387         self._prep_keep_client()
388         return self._keep_client.put(self.manifest_text())
389
390     def stripped_manifest(self):
391         """
392         Return the manifest for the current collection with all permission
393         hints removed from the locators in the manifest.
394         """
395         raw = self.manifest_text()
396         clean = ''
397         for line in raw.split("\n"):
398             fields = line.split()
399             if len(fields) > 0:
400                 locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
401                              for x in fields[1:-1] ]
402                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
403         return clean
404
405     def manifest_text(self):
406         self.finish_current_stream()
407         manifest = ''
408
409         for stream in self._finished_streams:
410             if not re.search(r'^\.(/.*)?$', stream[0]):
411                 manifest += './'
412             manifest += stream[0].replace(' ', '\\040')
413             manifest += ' ' + ' '.join(stream[1])
414             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
415             manifest += "\n"
416
417         if len(manifest) > 0:
418             return CollectionReader(manifest).manifest_text()
419         else:
420             return ""
421
422     def data_locators(self):
423         ret = []
424         for name, locators, files in self._finished_streams:
425             ret += locators
426         return ret
427
428
429 class ResumableCollectionWriter(CollectionWriter):
430     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
431                    '_current_stream_locators', '_current_stream_name',
432                    '_current_file_name', '_current_file_pos', '_close_file',
433                    '_data_buffer', '_dependencies', '_finished_streams',
434                    '_queued_dirents', '_queued_trees']
435
436     def __init__(self, api_client=None):
437         self._dependencies = {}
438         super(ResumableCollectionWriter, self).__init__(api_client)
439
440     @classmethod
441     def from_state(cls, state, *init_args, **init_kwargs):
442         # Try to build a new writer from scratch with the given state.
443         # If the state is not suitable to resume (because files have changed,
444         # been deleted, aren't predictable, etc.), raise a
445         # StaleWriterStateError.  Otherwise, return the initialized writer.
446         # The caller is responsible for calling writer.do_queued_work()
447         # appropriately after it's returned.
448         writer = cls(*init_args, **init_kwargs)
449         for attr_name in cls.STATE_PROPS:
450             attr_value = state[attr_name]
451             attr_class = getattr(writer, attr_name).__class__
452             # Coerce the value into the same type as the initial value, if
453             # needed.
454             if attr_class not in (type(None), attr_value.__class__):
455                 attr_value = attr_class(attr_value)
456             setattr(writer, attr_name, attr_value)
457         # Check dependencies before we try to resume anything.
458         if any(KeepLocator(ls).permission_expired()
459                for ls in writer._current_stream_locators):
460             raise errors.StaleWriterStateError(
461                 "locators include expired permission hint")
462         writer.check_dependencies()
463         if state['_current_file'] is not None:
464             path, pos = state['_current_file']
465             try:
466                 writer._queued_file = open(path, 'rb')
467                 writer._queued_file.seek(pos)
468             except IOError as error:
469                 raise errors.StaleWriterStateError(
470                     "failed to reopen active file {}: {}".format(path, error))
471         return writer
472
473     def check_dependencies(self):
474         for path, orig_stat in self._dependencies.items():
475             if not S_ISREG(orig_stat[ST_MODE]):
476                 raise errors.StaleWriterStateError("{} not file".format(path))
477             try:
478                 now_stat = tuple(os.stat(path))
479             except OSError as error:
480                 raise errors.StaleWriterStateError(
481                     "failed to stat {}: {}".format(path, error))
482             if ((not S_ISREG(now_stat[ST_MODE])) or
483                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
484                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
485                 raise errors.StaleWriterStateError("{} changed".format(path))
486
487     def dump_state(self, copy_func=lambda x: x):
488         state = {attr: copy_func(getattr(self, attr))
489                  for attr in self.STATE_PROPS}
490         if self._queued_file is None:
491             state['_current_file'] = None
492         else:
493             state['_current_file'] = (os.path.realpath(self._queued_file.name),
494                                       self._queued_file.tell())
495         return state
496
497     def _queue_file(self, source, filename=None):
498         try:
499             src_path = os.path.realpath(source)
500         except Exception:
501             raise errors.AssertionError("{} not a file path".format(source))
502         try:
503             path_stat = os.stat(src_path)
504         except OSError as stat_error:
505             path_stat = None
506         super(ResumableCollectionWriter, self)._queue_file(source, filename)
507         fd_stat = os.fstat(self._queued_file.fileno())
508         if not S_ISREG(fd_stat.st_mode):
509             # We won't be able to resume from this cache anyway, so don't
510             # worry about further checks.
511             self._dependencies[source] = tuple(fd_stat)
512         elif path_stat is None:
513             raise errors.AssertionError(
514                 "could not stat {}: {}".format(source, stat_error))
515         elif path_stat.st_ino != fd_stat.st_ino:
516             raise errors.AssertionError(
517                 "{} changed between open and stat calls".format(source))
518         else:
519             self._dependencies[src_path] = tuple(fd_stat)
520
521     def write(self, data):
522         if self._queued_file is None:
523             raise errors.AssertionError(
524                 "resumable writer can't accept unsourced data")
525         return super(ResumableCollectionWriter, self).write(data)