Merge branch 'master' into 3177-collection-choose-files
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import logging
3 import os
4 import pprint
5 import sys
6 import types
7 import subprocess
8 import json
9 import UserDict
10 import re
11 import hashlib
12 import string
13 import bz2
14 import zlib
15 import fcntl
16 import time
17 import threading
18
19 from collections import deque
20 from stat import *
21
22 from keep import *
23 from stream import *
24 import config
25 import errors
26 import util
27
28 _logger = logging.getLogger('arvados.collection')
29
30 def normalize_stream(s, stream):
31     stream_tokens = [s]
32     sortedfiles = list(stream.keys())
33     sortedfiles.sort()
34
35     blocks = {}
36     streamoffset = 0L
37     for f in sortedfiles:
38         for b in stream[f]:
39             if b[arvados.LOCATOR] not in blocks:
40                 stream_tokens.append(b[arvados.LOCATOR])
41                 blocks[b[arvados.LOCATOR]] = streamoffset
42                 streamoffset += b[arvados.BLOCKSIZE]
43
44     if len(stream_tokens) == 1:
45         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
46
47     for f in sortedfiles:
48         current_span = None
49         fout = f.replace(' ', '\\040')
50         for segment in stream[f]:
51             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
52             if current_span is None:
53                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
54             else:
55                 if segmentoffset == current_span[1]:
56                     current_span[1] += segment[arvados.SEGMENTSIZE]
57                 else:
58                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
59                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
60
61         if current_span is not None:
62             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
63
64         if not stream[f]:
65             stream_tokens.append("0:0:{0}".format(fout))
66
67     return stream_tokens
68
69
70 class CollectionBase(object):
71     def __enter__(self):
72         pass
73
74     def __exit__(self):
75         pass
76
77     def _my_keep(self):
78         if self._keep_client is None:
79             self._keep_client = KeepClient(api_client=self._api_client,
80                                            num_retries=self.num_retries)
81         return self._keep_client
82
83     def stripped_manifest(self):
84         """
85         Return the manifest for the current collection with all
86         non-portable hints (i.e., permission signatures and other
87         hints other than size hints) removed from the locators.
88         """
89         raw = self.manifest_text()
90         clean = []
91         for line in raw.split("\n"):
92             fields = line.split()
93             if fields:
94                 clean_fields = fields[:1] + [
95                     (re.sub(r'\+[^\d][^\+]*', '', x)
96                      if re.match(util.keep_locator_pattern, x)
97                      else x)
98                     for x in fields[1:]]
99                 clean += [' '.join(clean_fields), "\n"]
100         return ''.join(clean)
101
102
103 class CollectionReader(CollectionBase):
104     def __init__(self, manifest_locator_or_text, api_client=None,
105                  keep_client=None, num_retries=0):
106         """Instantiate a CollectionReader.
107
108         This class parses Collection manifests to provide a simple interface
109         to read its underlying files.
110
111         Arguments:
112         * manifest_locator_or_text: One of a Collection UUID, portable data
113           hash, or full manifest text.
114         * api_client: The API client to use to look up Collections.  If not
115           provided, CollectionReader will build one from available Arvados
116           configuration.
117         * keep_client: The KeepClient to use to download Collection data.
118           If not provided, CollectionReader will build one from available
119           Arvados configuration.
120         * num_retries: The default number of times to retry failed
121           service requests.  Default 0.  You may change this value
122           after instantiation, but note those changes may not
123           propagate to related objects like the Keep client.
124         """
125         self._api_client = api_client
126         self._keep_client = keep_client
127         self.num_retries = num_retries
128         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
129             self._manifest_locator = manifest_locator_or_text
130             self._manifest_text = None
131         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
132             self._manifest_locator = manifest_locator_or_text
133             self._manifest_text = None
134         elif re.match(util.manifest_pattern, manifest_locator_or_text):
135             self._manifest_text = manifest_locator_or_text
136             self._manifest_locator = None
137         else:
138             raise errors.ArgumentError(
139                 "Argument to CollectionReader must be a manifest or a collection UUID")
140         self._streams = None
141
142     def _populate_from_api_server(self):
143         # As in KeepClient itself, we must wait until the last
144         # possible moment to instantiate an API client, in order to
145         # avoid tripping up clients that don't have access to an API
146         # server.  If we do build one, make sure our Keep client uses
147         # it.  If instantiation fails, we'll fall back to the except
148         # clause, just like any other Collection lookup
149         # failure. Return an exception, or None if successful.
150         try:
151             if self._api_client is None:
152                 self._api_client = arvados.api('v1')
153                 self._keep_client = None  # Make a new one with the new api.
154             c = self._api_client.collections().get(
155                 uuid=self._manifest_locator).execute(
156                 num_retries=self.num_retries)
157             self._manifest_text = c['manifest_text']
158             return None
159         except Exception as e:
160             return e
161
162     def _populate_from_keep(self):
163         # Retrieve a manifest directly from Keep. This has a chance of
164         # working if [a] the locator includes a permission signature
165         # or [b] the Keep services are operating in world-readable
166         # mode. Return an exception, or None if successful.
167         try:
168             self._manifest_text = self._my_keep().get(
169                 self._manifest_locator, num_retries=self.num_retries)
170         except Exception as e:
171             return e
172
173     def _populate(self):
174         if self._streams is not None:
175             return
176         error_via_api = None
177         error_via_keep = None
178         should_try_keep = ((self._manifest_text is None) and
179                            util.keep_locator_pattern.match(
180                 self._manifest_locator))
181         if ((self._manifest_text is None) and
182             util.signed_locator_pattern.match(self._manifest_locator)):
183             error_via_keep = self._populate_from_keep()
184         if self._manifest_text is None:
185             error_via_api = self._populate_from_api_server()
186             if error_via_api is not None and not should_try_keep:
187                 raise error_via_api
188         if ((self._manifest_text is None) and
189             not error_via_keep and
190             should_try_keep):
191             # Looks like a keep locator, and we didn't already try keep above
192             error_via_keep = self._populate_from_keep()
193         if self._manifest_text is None:
194             # Nothing worked!
195             raise arvados.errors.NotFoundError(
196                 ("Failed to retrieve collection '{}' " +
197                  "from either API server ({}) or Keep ({})."
198                  ).format(
199                     self._manifest_locator,
200                     error_via_api,
201                     error_via_keep))
202         self._streams = [sline.split()
203                          for sline in self._manifest_text.split("\n")
204                          if sline]
205
206     def normalize(self):
207         self._populate()
208
209         # Rearrange streams
210         streams = {}
211         for s in self.all_streams():
212             for f in s.all_files():
213                 filestream = s.name() + "/" + f.name()
214                 r = filestream.rindex("/")
215                 streamname = filestream[:r]
216                 filename = filestream[r+1:]
217                 if streamname not in streams:
218                     streams[streamname] = {}
219                 if filename not in streams[streamname]:
220                     streams[streamname][filename] = []
221                 for r in f.segments:
222                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
223
224         self._streams = [normalize_stream(s, streams[s])
225                          for s in sorted(streams)]
226
227         # Regenerate the manifest text based on the normalized streams
228         self._manifest_text = ''.join(
229             [StreamReader(stream, keep=self._my_keep()).manifest_text()
230              for stream in self._streams])
231
232     def all_streams(self):
233         self._populate()
234         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
235                 for s in self._streams]
236
237     def all_files(self):
238         for s in self.all_streams():
239             for f in s.all_files():
240                 yield f
241
242     def manifest_text(self, strip=False, normalize=False):
243         if normalize:
244             cr = CollectionReader(self.manifest_text())
245             cr.normalize()
246             return cr.manifest_text(strip=strip, normalize=False)
247         elif strip:
248             return self.stripped_manifest()
249         else:
250             self._populate()
251             return self._manifest_text
252
253
254 class CollectionWriter(CollectionBase):
255     KEEP_BLOCK_SIZE = 2**26
256
257     def __init__(self, api_client=None, num_retries=0):
258         """Instantiate a CollectionWriter.
259
260         CollectionWriter lets you build a new Arvados Collection from scratch.
261         Write files to it.  The CollectionWriter will upload data to Keep as
262         appropriate, and provide you with the Collection manifest text when
263         you're finished.
264
265         Arguments:
266         * api_client: The API client to use to look up Collections.  If not
267           provided, CollectionReader will build one from available Arvados
268           configuration.
269         * num_retries: The default number of times to retry failed
270           service requests.  Default 0.  You may change this value
271           after instantiation, but note those changes may not
272           propagate to related objects like the Keep client.
273         """
274         self._api_client = api_client
275         self.num_retries = num_retries
276         self._keep_client = None
277         self._data_buffer = []
278         self._data_buffer_len = 0
279         self._current_stream_files = []
280         self._current_stream_length = 0
281         self._current_stream_locators = []
282         self._current_stream_name = '.'
283         self._current_file_name = None
284         self._current_file_pos = 0
285         self._finished_streams = []
286         self._close_file = None
287         self._queued_file = None
288         self._queued_dirents = deque()
289         self._queued_trees = deque()
290
291     def __exit__(self):
292         self.finish()
293
294     def do_queued_work(self):
295         # The work queue consists of three pieces:
296         # * _queued_file: The file object we're currently writing to the
297         #   Collection.
298         # * _queued_dirents: Entries under the current directory
299         #   (_queued_trees[0]) that we want to write or recurse through.
300         #   This may contain files from subdirectories if
301         #   max_manifest_depth == 0 for this directory.
302         # * _queued_trees: Directories that should be written as separate
303         #   streams to the Collection.
304         # This function handles the smallest piece of work currently queued
305         # (current file, then current directory, then next directory) until
306         # no work remains.  The _work_THING methods each do a unit of work on
307         # THING.  _queue_THING methods add a THING to the work queue.
308         while True:
309             if self._queued_file:
310                 self._work_file()
311             elif self._queued_dirents:
312                 self._work_dirents()
313             elif self._queued_trees:
314                 self._work_trees()
315             else:
316                 break
317
318     def _work_file(self):
319         while True:
320             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
321             if not buf:
322                 break
323             self.write(buf)
324         self.finish_current_file()
325         if self._close_file:
326             self._queued_file.close()
327         self._close_file = None
328         self._queued_file = None
329
330     def _work_dirents(self):
331         path, stream_name, max_manifest_depth = self._queued_trees[0]
332         if stream_name != self.current_stream_name():
333             self.start_new_stream(stream_name)
334         while self._queued_dirents:
335             dirent = self._queued_dirents.popleft()
336             target = os.path.join(path, dirent)
337             if os.path.isdir(target):
338                 self._queue_tree(target,
339                                  os.path.join(stream_name, dirent),
340                                  max_manifest_depth - 1)
341             else:
342                 self._queue_file(target, dirent)
343                 break
344         if not self._queued_dirents:
345             self._queued_trees.popleft()
346
347     def _work_trees(self):
348         path, stream_name, max_manifest_depth = self._queued_trees[0]
349         d = util.listdir_recursive(
350             path, max_depth = (None if max_manifest_depth == 0 else 0))
351         if d:
352             self._queue_dirents(stream_name, d)
353         else:
354             self._queued_trees.popleft()
355
356     def _queue_file(self, source, filename=None):
357         assert (self._queued_file is None), "tried to queue more than one file"
358         if not hasattr(source, 'read'):
359             source = open(source, 'rb')
360             self._close_file = True
361         else:
362             self._close_file = False
363         if filename is None:
364             filename = os.path.basename(source.name)
365         self.start_new_file(filename)
366         self._queued_file = source
367
368     def _queue_dirents(self, stream_name, dirents):
369         assert (not self._queued_dirents), "tried to queue more than one tree"
370         self._queued_dirents = deque(sorted(dirents))
371
372     def _queue_tree(self, path, stream_name, max_manifest_depth):
373         self._queued_trees.append((path, stream_name, max_manifest_depth))
374
375     def write_file(self, source, filename=None):
376         self._queue_file(source, filename)
377         self.do_queued_work()
378
379     def write_directory_tree(self,
380                              path, stream_name='.', max_manifest_depth=-1):
381         self._queue_tree(path, stream_name, max_manifest_depth)
382         self.do_queued_work()
383
384     def write(self, newdata):
385         if hasattr(newdata, '__iter__'):
386             for s in newdata:
387                 self.write(s)
388             return
389         self._data_buffer.append(newdata)
390         self._data_buffer_len += len(newdata)
391         self._current_stream_length += len(newdata)
392         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
393             self.flush_data()
394
395     def flush_data(self):
396         data_buffer = ''.join(self._data_buffer)
397         if data_buffer:
398             self._current_stream_locators.append(
399                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
400             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
401             self._data_buffer_len = len(self._data_buffer[0])
402
403     def start_new_file(self, newfilename=None):
404         self.finish_current_file()
405         self.set_current_file_name(newfilename)
406
407     def set_current_file_name(self, newfilename):
408         if re.search(r'[\t\n]', newfilename):
409             raise errors.AssertionError(
410                 "Manifest filenames cannot contain whitespace: %s" %
411                 newfilename)
412         self._current_file_name = newfilename
413
414     def current_file_name(self):
415         return self._current_file_name
416
417     def finish_current_file(self):
418         if self._current_file_name is None:
419             if self._current_file_pos == self._current_stream_length:
420                 return
421             raise errors.AssertionError(
422                 "Cannot finish an unnamed file " +
423                 "(%d bytes at offset %d in '%s' stream)" %
424                 (self._current_stream_length - self._current_file_pos,
425                  self._current_file_pos,
426                  self._current_stream_name))
427         self._current_stream_files.append([
428                 self._current_file_pos,
429                 self._current_stream_length - self._current_file_pos,
430                 self._current_file_name])
431         self._current_file_pos = self._current_stream_length
432         self._current_file_name = None
433
434     def start_new_stream(self, newstreamname='.'):
435         self.finish_current_stream()
436         self.set_current_stream_name(newstreamname)
437
438     def set_current_stream_name(self, newstreamname):
439         if re.search(r'[\t\n]', newstreamname):
440             raise errors.AssertionError(
441                 "Manifest stream names cannot contain whitespace")
442         self._current_stream_name = '.' if newstreamname=='' else newstreamname
443
444     def current_stream_name(self):
445         return self._current_stream_name
446
447     def finish_current_stream(self):
448         self.finish_current_file()
449         self.flush_data()
450         if not self._current_stream_files:
451             pass
452         elif self._current_stream_name is None:
453             raise errors.AssertionError(
454                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
455                 (self._current_stream_length, len(self._current_stream_files)))
456         else:
457             if not self._current_stream_locators:
458                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
459             self._finished_streams.append([self._current_stream_name,
460                                            self._current_stream_locators,
461                                            self._current_stream_files])
462         self._current_stream_files = []
463         self._current_stream_length = 0
464         self._current_stream_locators = []
465         self._current_stream_name = None
466         self._current_file_pos = 0
467         self._current_file_name = None
468
469     def finish(self):
470         # Store the manifest in Keep and return its locator.
471         return self._my_keep().put(self.manifest_text())
472
473     def portable_data_hash(self):
474         stripped = self.stripped_manifest()
475         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
476
477     def manifest_text(self):
478         self.finish_current_stream()
479         manifest = ''
480
481         for stream in self._finished_streams:
482             if not re.search(r'^\.(/.*)?$', stream[0]):
483                 manifest += './'
484             manifest += stream[0].replace(' ', '\\040')
485             manifest += ' ' + ' '.join(stream[1])
486             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
487             manifest += "\n"
488
489         return manifest
490
491     def data_locators(self):
492         ret = []
493         for name, locators, files in self._finished_streams:
494             ret += locators
495         return ret
496
497
498 class ResumableCollectionWriter(CollectionWriter):
499     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
500                    '_current_stream_locators', '_current_stream_name',
501                    '_current_file_name', '_current_file_pos', '_close_file',
502                    '_data_buffer', '_dependencies', '_finished_streams',
503                    '_queued_dirents', '_queued_trees']
504
505     def __init__(self, api_client=None, num_retries=0):
506         self._dependencies = {}
507         super(ResumableCollectionWriter, self).__init__(
508             api_client, num_retries=num_retries)
509
510     @classmethod
511     def from_state(cls, state, *init_args, **init_kwargs):
512         # Try to build a new writer from scratch with the given state.
513         # If the state is not suitable to resume (because files have changed,
514         # been deleted, aren't predictable, etc.), raise a
515         # StaleWriterStateError.  Otherwise, return the initialized writer.
516         # The caller is responsible for calling writer.do_queued_work()
517         # appropriately after it's returned.
518         writer = cls(*init_args, **init_kwargs)
519         for attr_name in cls.STATE_PROPS:
520             attr_value = state[attr_name]
521             attr_class = getattr(writer, attr_name).__class__
522             # Coerce the value into the same type as the initial value, if
523             # needed.
524             if attr_class not in (type(None), attr_value.__class__):
525                 attr_value = attr_class(attr_value)
526             setattr(writer, attr_name, attr_value)
527         # Check dependencies before we try to resume anything.
528         if any(KeepLocator(ls).permission_expired()
529                for ls in writer._current_stream_locators):
530             raise errors.StaleWriterStateError(
531                 "locators include expired permission hint")
532         writer.check_dependencies()
533         if state['_current_file'] is not None:
534             path, pos = state['_current_file']
535             try:
536                 writer._queued_file = open(path, 'rb')
537                 writer._queued_file.seek(pos)
538             except IOError as error:
539                 raise errors.StaleWriterStateError(
540                     "failed to reopen active file {}: {}".format(path, error))
541         return writer
542
543     def check_dependencies(self):
544         for path, orig_stat in self._dependencies.items():
545             if not S_ISREG(orig_stat[ST_MODE]):
546                 raise errors.StaleWriterStateError("{} not file".format(path))
547             try:
548                 now_stat = tuple(os.stat(path))
549             except OSError as error:
550                 raise errors.StaleWriterStateError(
551                     "failed to stat {}: {}".format(path, error))
552             if ((not S_ISREG(now_stat[ST_MODE])) or
553                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
554                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
555                 raise errors.StaleWriterStateError("{} changed".format(path))
556
557     def dump_state(self, copy_func=lambda x: x):
558         state = {attr: copy_func(getattr(self, attr))
559                  for attr in self.STATE_PROPS}
560         if self._queued_file is None:
561             state['_current_file'] = None
562         else:
563             state['_current_file'] = (os.path.realpath(self._queued_file.name),
564                                       self._queued_file.tell())
565         return state
566
567     def _queue_file(self, source, filename=None):
568         try:
569             src_path = os.path.realpath(source)
570         except Exception:
571             raise errors.AssertionError("{} not a file path".format(source))
572         try:
573             path_stat = os.stat(src_path)
574         except OSError as stat_error:
575             path_stat = None
576         super(ResumableCollectionWriter, self)._queue_file(source, filename)
577         fd_stat = os.fstat(self._queued_file.fileno())
578         if not S_ISREG(fd_stat.st_mode):
579             # We won't be able to resume from this cache anyway, so don't
580             # worry about further checks.
581             self._dependencies[source] = tuple(fd_stat)
582         elif path_stat is None:
583             raise errors.AssertionError(
584                 "could not stat {}: {}".format(source, stat_error))
585         elif path_stat.st_ino != fd_stat.st_ino:
586             raise errors.AssertionError(
587                 "{} changed between open and stat calls".format(source))
588         else:
589             self._dependencies[src_path] = tuple(fd_stat)
590
591     def write(self, data):
592         if self._queued_file is None:
593             raise errors.AssertionError(
594                 "resumable writer can't accept unsourced data")
595         return super(ResumableCollectionWriter, self).write(data)