Merge branch 'master' into 3177-collection-choose-files
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span is None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span is not None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if not stream[f]:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71
72 class CollectionBase(object):
73     def __enter__(self):
74         pass
75
76     def __exit__(self):
77         pass
78
79     def _my_keep(self):
80         if self._keep_client is None:
81             self._keep_client = KeepClient(api_client=self._api_client,
82                                            num_retries=self.num_retries)
83         return self._keep_client
84
85     def stripped_manifest(self):
86         """
87         Return the manifest for the current collection with all
88         non-portable hints (i.e., permission signatures and other
89         hints other than size hints) removed from the locators.
90         """
91         raw = self.manifest_text()
92         clean = []
93         for line in raw.split("\n"):
94             fields = line.split()
95             if fields:
96                 clean_fields = fields[:1] + [
97                     (re.sub(r'\+[^\d][^\+]*', '', x)
98                      if re.match(util.keep_locator_pattern, x)
99                      else x)
100                     for x in fields[1:]]
101                 clean += [' '.join(clean_fields), "\n"]
102         return ''.join(clean)
103
104
105 class CollectionReader(CollectionBase):
106     def __init__(self, manifest_locator_or_text, api_client=None,
107                  keep_client=None, num_retries=0):
108         """Instantiate a CollectionReader.
109
110         This class parses Collection manifests to provide a simple interface
111         to read its underlying files.
112
113         Arguments:
114         * manifest_locator_or_text: One of a Collection UUID, portable data
115           hash, or full manifest text.
116         * api_client: The API client to use to look up Collections.  If not
117           provided, CollectionReader will build one from available Arvados
118           configuration.
119         * keep_client: The KeepClient to use to download Collection data.
120           If not provided, CollectionReader will build one from available
121           Arvados configuration.
122         * num_retries: The default number of times to retry failed
123           service requests.  Default 0.  You may change this value
124           after instantiation, but note those changes may not
125           propagate to related objects like the Keep client.
126         """
127         self._api_client = api_client
128         self._keep_client = keep_client
129         self.num_retries = num_retries
130         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
131             self._manifest_locator = manifest_locator_or_text
132             self._manifest_text = None
133         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
134             self._manifest_locator = manifest_locator_or_text
135             self._manifest_text = None
136         elif re.match(util.manifest_pattern, manifest_locator_or_text):
137             self._manifest_text = manifest_locator_or_text
138             self._manifest_locator = None
139         else:
140             raise errors.ArgumentError(
141                 "Argument to CollectionReader must be a manifest or a collection UUID")
142         self._streams = None
143
144     def _populate_from_api_server(self):
145         # As in KeepClient itself, we must wait until the last
146         # possible moment to instantiate an API client, in order to
147         # avoid tripping up clients that don't have access to an API
148         # server.  If we do build one, make sure our Keep client uses
149         # it.  If instantiation fails, we'll fall back to the except
150         # clause, just like any other Collection lookup
151         # failure. Return an exception, or None if successful.
152         try:
153             if self._api_client is None:
154                 self._api_client = arvados.api('v1')
155                 self._keep_client = None  # Make a new one with the new api.
156             c = self._api_client.collections().get(
157                 uuid=self._manifest_locator).execute(
158                 num_retries=self.num_retries)
159             self._manifest_text = c['manifest_text']
160             return None
161         except Exception as e:
162             return e
163
164     def _populate_from_keep(self):
165         # Retrieve a manifest directly from Keep. This has a chance of
166         # working if [a] the locator includes a permission signature
167         # or [b] the Keep services are operating in world-readable
168         # mode. Return an exception, or None if successful.
169         try:
170             self._manifest_text = self._my_keep().get(
171                 self._manifest_locator, num_retries=self.num_retries)
172         except Exception as e:
173             return e
174
175     def _populate(self):
176         if self._streams is not None:
177             return
178         error_via_api = None
179         error_via_keep = None
180         should_try_keep = ((self._manifest_text is None) and
181                            util.keep_locator_pattern.match(
182                 self._manifest_locator))
183         if ((self._manifest_text is None) and
184             util.signed_locator_pattern.match(self._manifest_locator)):
185             error_via_keep = self._populate_from_keep()
186         if self._manifest_text is None:
187             error_via_api = self._populate_from_api_server()
188             if error_via_api is not None and not should_try_keep:
189                 raise error_via_api
190         if ((self._manifest_text is None) and
191             not error_via_keep and
192             should_try_keep):
193             # Looks like a keep locator, and we didn't already try keep above
194             error_via_keep = self._populate_from_keep()
195         if self._manifest_text is None:
196             # Nothing worked!
197             raise arvados.errors.NotFoundError(
198                 ("Failed to retrieve collection '{}' " +
199                  "from either API server ({}) or Keep ({})."
200                  ).format(
201                     self._manifest_locator,
202                     error_via_api,
203                     error_via_keep))
204         self._streams = [sline.split()
205                          for sline in self._manifest_text.split("\n")
206                          if sline]
207
208     def normalize(self):
209         self._populate()
210
211         # Rearrange streams
212         streams = {}
213         for s in self.all_streams():
214             for f in s.all_files():
215                 filestream = s.name() + "/" + f.name()
216                 r = filestream.rindex("/")
217                 streamname = filestream[:r]
218                 filename = filestream[r+1:]
219                 if streamname not in streams:
220                     streams[streamname] = {}
221                 if filename not in streams[streamname]:
222                     streams[streamname][filename] = []
223                 for r in f.segments:
224                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
225
226         self._streams = [normalize_stream(s, streams[s])
227                          for s in sorted(streams)]
228
229         # Regenerate the manifest text based on the normalized streams
230         self._manifest_text = ''.join(
231             [StreamReader(stream, keep=self._my_keep()).manifest_text()
232              for stream in self._streams])
233
234     def all_streams(self):
235         self._populate()
236         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
237                 for s in self._streams]
238
239     def all_files(self):
240         for s in self.all_streams():
241             for f in s.all_files():
242                 yield f
243
244     def manifest_text(self, strip=False, normalize=False):
245         if normalize:
246             cr = CollectionReader(self.manifest_text())
247             cr.normalize()
248             return cr.manifest_text(strip=strip, normalize=False)
249         elif strip:
250             return self.stripped_manifest()
251         else:
252             self._populate()
253             return self._manifest_text
254
255
256 class CollectionWriter(CollectionBase):
257     KEEP_BLOCK_SIZE = 2**26
258
259     def __init__(self, api_client=None, num_retries=0):
260         """Instantiate a CollectionWriter.
261
262         CollectionWriter lets you build a new Arvados Collection from scratch.
263         Write files to it.  The CollectionWriter will upload data to Keep as
264         appropriate, and provide you with the Collection manifest text when
265         you're finished.
266
267         Arguments:
268         * api_client: The API client to use to look up Collections.  If not
269           provided, CollectionReader will build one from available Arvados
270           configuration.
271         * num_retries: The default number of times to retry failed
272           service requests.  Default 0.  You may change this value
273           after instantiation, but note those changes may not
274           propagate to related objects like the Keep client.
275         """
276         self._api_client = api_client
277         self.num_retries = num_retries
278         self._keep_client = None
279         self._data_buffer = []
280         self._data_buffer_len = 0
281         self._current_stream_files = []
282         self._current_stream_length = 0
283         self._current_stream_locators = []
284         self._current_stream_name = '.'
285         self._current_file_name = None
286         self._current_file_pos = 0
287         self._finished_streams = []
288         self._close_file = None
289         self._queued_file = None
290         self._queued_dirents = deque()
291         self._queued_trees = deque()
292
293     def __exit__(self):
294         self.finish()
295
296     def do_queued_work(self):
297         # The work queue consists of three pieces:
298         # * _queued_file: The file object we're currently writing to the
299         #   Collection.
300         # * _queued_dirents: Entries under the current directory
301         #   (_queued_trees[0]) that we want to write or recurse through.
302         #   This may contain files from subdirectories if
303         #   max_manifest_depth == 0 for this directory.
304         # * _queued_trees: Directories that should be written as separate
305         #   streams to the Collection.
306         # This function handles the smallest piece of work currently queued
307         # (current file, then current directory, then next directory) until
308         # no work remains.  The _work_THING methods each do a unit of work on
309         # THING.  _queue_THING methods add a THING to the work queue.
310         while True:
311             if self._queued_file:
312                 self._work_file()
313             elif self._queued_dirents:
314                 self._work_dirents()
315             elif self._queued_trees:
316                 self._work_trees()
317             else:
318                 break
319
320     def _work_file(self):
321         while True:
322             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
323             if not buf:
324                 break
325             self.write(buf)
326         self.finish_current_file()
327         if self._close_file:
328             self._queued_file.close()
329         self._close_file = None
330         self._queued_file = None
331
332     def _work_dirents(self):
333         path, stream_name, max_manifest_depth = self._queued_trees[0]
334         if stream_name != self.current_stream_name():
335             self.start_new_stream(stream_name)
336         while self._queued_dirents:
337             dirent = self._queued_dirents.popleft()
338             target = os.path.join(path, dirent)
339             if os.path.isdir(target):
340                 self._queue_tree(target,
341                                  os.path.join(stream_name, dirent),
342                                  max_manifest_depth - 1)
343             else:
344                 self._queue_file(target, dirent)
345                 break
346         if not self._queued_dirents:
347             self._queued_trees.popleft()
348
349     def _work_trees(self):
350         path, stream_name, max_manifest_depth = self._queued_trees[0]
351         d = util.listdir_recursive(
352             path, max_depth = (None if max_manifest_depth == 0 else 0))
353         if d:
354             self._queue_dirents(stream_name, d)
355         else:
356             self._queued_trees.popleft()
357
358     def _queue_file(self, source, filename=None):
359         assert (self._queued_file is None), "tried to queue more than one file"
360         if not hasattr(source, 'read'):
361             source = open(source, 'rb')
362             self._close_file = True
363         else:
364             self._close_file = False
365         if filename is None:
366             filename = os.path.basename(source.name)
367         self.start_new_file(filename)
368         self._queued_file = source
369
370     def _queue_dirents(self, stream_name, dirents):
371         assert (not self._queued_dirents), "tried to queue more than one tree"
372         self._queued_dirents = deque(sorted(dirents))
373
374     def _queue_tree(self, path, stream_name, max_manifest_depth):
375         self._queued_trees.append((path, stream_name, max_manifest_depth))
376
377     def write_file(self, source, filename=None):
378         self._queue_file(source, filename)
379         self.do_queued_work()
380
381     def write_directory_tree(self,
382                              path, stream_name='.', max_manifest_depth=-1):
383         self._queue_tree(path, stream_name, max_manifest_depth)
384         self.do_queued_work()
385
386     def write(self, newdata):
387         if hasattr(newdata, '__iter__'):
388             for s in newdata:
389                 self.write(s)
390             return
391         self._data_buffer.append(newdata)
392         self._data_buffer_len += len(newdata)
393         self._current_stream_length += len(newdata)
394         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
395             self.flush_data()
396
397     def flush_data(self):
398         data_buffer = ''.join(self._data_buffer)
399         if data_buffer:
400             self._current_stream_locators.append(
401                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
402             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
403             self._data_buffer_len = len(self._data_buffer[0])
404
405     def start_new_file(self, newfilename=None):
406         self.finish_current_file()
407         self.set_current_file_name(newfilename)
408
409     def set_current_file_name(self, newfilename):
410         if re.search(r'[\t\n]', newfilename):
411             raise errors.AssertionError(
412                 "Manifest filenames cannot contain whitespace: %s" %
413                 newfilename)
414         self._current_file_name = newfilename
415
416     def current_file_name(self):
417         return self._current_file_name
418
419     def finish_current_file(self):
420         if self._current_file_name is None:
421             if self._current_file_pos == self._current_stream_length:
422                 return
423             raise errors.AssertionError(
424                 "Cannot finish an unnamed file " +
425                 "(%d bytes at offset %d in '%s' stream)" %
426                 (self._current_stream_length - self._current_file_pos,
427                  self._current_file_pos,
428                  self._current_stream_name))
429         self._current_stream_files.append([
430                 self._current_file_pos,
431                 self._current_stream_length - self._current_file_pos,
432                 self._current_file_name])
433         self._current_file_pos = self._current_stream_length
434         self._current_file_name = None
435
436     def start_new_stream(self, newstreamname='.'):
437         self.finish_current_stream()
438         self.set_current_stream_name(newstreamname)
439
440     def set_current_stream_name(self, newstreamname):
441         if re.search(r'[\t\n]', newstreamname):
442             raise errors.AssertionError(
443                 "Manifest stream names cannot contain whitespace")
444         self._current_stream_name = '.' if newstreamname=='' else newstreamname
445
446     def current_stream_name(self):
447         return self._current_stream_name
448
449     def finish_current_stream(self):
450         self.finish_current_file()
451         self.flush_data()
452         if not self._current_stream_files:
453             pass
454         elif self._current_stream_name is None:
455             raise errors.AssertionError(
456                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
457                 (self._current_stream_length, len(self._current_stream_files)))
458         else:
459             if not self._current_stream_locators:
460                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
461             self._finished_streams.append([self._current_stream_name,
462                                            self._current_stream_locators,
463                                            self._current_stream_files])
464         self._current_stream_files = []
465         self._current_stream_length = 0
466         self._current_stream_locators = []
467         self._current_stream_name = None
468         self._current_file_pos = 0
469         self._current_file_name = None
470
471     def finish(self):
472         # Store the manifest in Keep and return its locator.
473         return self._my_keep().put(self.manifest_text())
474
475     def portable_data_hash(self):
476         stripped = self.stripped_manifest()
477         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
478
479     def manifest_text(self):
480         self.finish_current_stream()
481         manifest = ''
482
483         for stream in self._finished_streams:
484             if not re.search(r'^\.(/.*)?$', stream[0]):
485                 manifest += './'
486             manifest += stream[0].replace(' ', '\\040')
487             manifest += ' ' + ' '.join(stream[1])
488             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
489             manifest += "\n"
490
491         return manifest
492
493     def data_locators(self):
494         ret = []
495         for name, locators, files in self._finished_streams:
496             ret += locators
497         return ret
498
499
500 class ResumableCollectionWriter(CollectionWriter):
501     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
502                    '_current_stream_locators', '_current_stream_name',
503                    '_current_file_name', '_current_file_pos', '_close_file',
504                    '_data_buffer', '_dependencies', '_finished_streams',
505                    '_queued_dirents', '_queued_trees']
506
507     def __init__(self, api_client=None, num_retries=0):
508         self._dependencies = {}
509         super(ResumableCollectionWriter, self).__init__(
510             api_client, num_retries=num_retries)
511
512     @classmethod
513     def from_state(cls, state, *init_args, **init_kwargs):
514         # Try to build a new writer from scratch with the given state.
515         # If the state is not suitable to resume (because files have changed,
516         # been deleted, aren't predictable, etc.), raise a
517         # StaleWriterStateError.  Otherwise, return the initialized writer.
518         # The caller is responsible for calling writer.do_queued_work()
519         # appropriately after it's returned.
520         writer = cls(*init_args, **init_kwargs)
521         for attr_name in cls.STATE_PROPS:
522             attr_value = state[attr_name]
523             attr_class = getattr(writer, attr_name).__class__
524             # Coerce the value into the same type as the initial value, if
525             # needed.
526             if attr_class not in (type(None), attr_value.__class__):
527                 attr_value = attr_class(attr_value)
528             setattr(writer, attr_name, attr_value)
529         # Check dependencies before we try to resume anything.
530         if any(KeepLocator(ls).permission_expired()
531                for ls in writer._current_stream_locators):
532             raise errors.StaleWriterStateError(
533                 "locators include expired permission hint")
534         writer.check_dependencies()
535         if state['_current_file'] is not None:
536             path, pos = state['_current_file']
537             try:
538                 writer._queued_file = open(path, 'rb')
539                 writer._queued_file.seek(pos)
540             except IOError as error:
541                 raise errors.StaleWriterStateError(
542                     "failed to reopen active file {}: {}".format(path, error))
543         return writer
544
545     def check_dependencies(self):
546         for path, orig_stat in self._dependencies.items():
547             if not S_ISREG(orig_stat[ST_MODE]):
548                 raise errors.StaleWriterStateError("{} not file".format(path))
549             try:
550                 now_stat = tuple(os.stat(path))
551             except OSError as error:
552                 raise errors.StaleWriterStateError(
553                     "failed to stat {}: {}".format(path, error))
554             if ((not S_ISREG(now_stat[ST_MODE])) or
555                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
556                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
557                 raise errors.StaleWriterStateError("{} changed".format(path))
558
559     def dump_state(self, copy_func=lambda x: x):
560         state = {attr: copy_func(getattr(self, attr))
561                  for attr in self.STATE_PROPS}
562         if self._queued_file is None:
563             state['_current_file'] = None
564         else:
565             state['_current_file'] = (os.path.realpath(self._queued_file.name),
566                                       self._queued_file.tell())
567         return state
568
569     def _queue_file(self, source, filename=None):
570         try:
571             src_path = os.path.realpath(source)
572         except Exception:
573             raise errors.AssertionError("{} not a file path".format(source))
574         try:
575             path_stat = os.stat(src_path)
576         except OSError as stat_error:
577             path_stat = None
578         super(ResumableCollectionWriter, self)._queue_file(source, filename)
579         fd_stat = os.fstat(self._queued_file.fileno())
580         if not S_ISREG(fd_stat.st_mode):
581             # We won't be able to resume from this cache anyway, so don't
582             # worry about further checks.
583             self._dependencies[source] = tuple(fd_stat)
584         elif path_stat is None:
585             raise errors.AssertionError(
586                 "could not stat {}: {}".format(source, stat_error))
587         elif path_stat.st_ino != fd_stat.st_ino:
588             raise errors.AssertionError(
589                 "{} changed between open and stat calls".format(source))
590         else:
591             self._dependencies[src_path] = tuple(fd_stat)
592
593     def write(self, data):
594         if self._queued_file is None:
595             raise errors.AssertionError(
596                 "resumable writer can't accept unsourced data")
597         return super(ResumableCollectionWriter, self).write(data)