3706: Prefer "if x" to "if len(x) > 0".
[arvados.git] / sdk / python / arvados / collection.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from collections import deque
22 from stat import *
23
24 from keep import *
25 from stream import *
26 import config
27 import errors
28 import util
29
30 _logger = logging.getLogger('arvados.collection')
31
32 def normalize_stream(s, stream):
33     stream_tokens = [s]
34     sortedfiles = list(stream.keys())
35     sortedfiles.sort()
36
37     blocks = {}
38     streamoffset = 0L
39     for f in sortedfiles:
40         for b in stream[f]:
41             if b[arvados.LOCATOR] not in blocks:
42                 stream_tokens.append(b[arvados.LOCATOR])
43                 blocks[b[arvados.LOCATOR]] = streamoffset
44                 streamoffset += b[arvados.BLOCKSIZE]
45
46     if len(stream_tokens) == 1:
47         stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
48
49     for f in sortedfiles:
50         current_span = None
51         fout = f.replace(' ', '\\040')
52         for segment in stream[f]:
53             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
54             if current_span == None:
55                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
56             else:
57                 if segmentoffset == current_span[1]:
58                     current_span[1] += segment[arvados.SEGMENTSIZE]
59                 else:
60                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
61                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
62
63         if current_span != None:
64             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
65
66         if not stream[f]:
67             stream_tokens.append("0:0:{0}".format(fout))
68
69     return stream_tokens
70
71
72 class CollectionBase(object):
73     def __enter__(self):
74         pass
75
76     def __exit__(self):
77         pass
78
79     def _my_keep(self):
80         if self._keep_client is None:
81             self._keep_client = KeepClient(api_client=self._api_client,
82                                            num_retries=self.num_retries)
83         return self._keep_client
84
85     def stripped_manifest(self):
86         """
87         Return the manifest for the current collection with all
88         non-portable hints (i.e., permission signatures and other
89         hints other than size hints) removed from the locators.
90         """
91         raw = self.manifest_text()
92         clean = ''
93         for line in raw.split("\n"):
94             fields = line.split()
95             if fields:
96                 locators = [ (re.sub(r'\+[^\d][^\+]*', '', x) if re.match(util.keep_locator_pattern, x) else x)
97                              for x in fields[1:-1] ]
98                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
99         return clean
100
101
102 class CollectionReader(CollectionBase):
103     def __init__(self, manifest_locator_or_text, api_client=None,
104                  keep_client=None, num_retries=0):
105         """Instantiate a CollectionReader.
106
107         This class parses Collection manifests to provide a simple interface
108         to read its underlying files.
109
110         Arguments:
111         * manifest_locator_or_text: One of a Collection UUID, portable data
112           hash, or full manifest text.
113         * api_client: The API client to use to look up Collections.  If not
114           provided, CollectionReader will build one from available Arvados
115           configuration.
116         * keep_client: The KeepClient to use to download Collection data.
117           If not provided, CollectionReader will build one from available
118           Arvados configuration.
119         * num_retries: The default number of times to retry failed
120           service requests.  Default 0.  You may change this value
121           after instantiation, but note those changes may not
122           propagate to related objects like the Keep client.
123         """
124         self._api_client = api_client
125         self._keep_client = keep_client
126         self.num_retries = num_retries
127         if re.match(util.keep_locator_pattern, manifest_locator_or_text):
128             self._manifest_locator = manifest_locator_or_text
129             self._manifest_text = None
130         elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
131             self._manifest_locator = manifest_locator_or_text
132             self._manifest_text = None
133         elif re.match(util.manifest_pattern, manifest_locator_or_text):
134             self._manifest_text = manifest_locator_or_text
135             self._manifest_locator = None
136         else:
137             raise errors.ArgumentError(
138                 "Argument to CollectionReader must be a manifest or a collection UUID")
139         self._streams = None
140
141     def _populate_from_api_server(self):
142         # As in KeepClient itself, we must wait until the last
143         # possible moment to instantiate an API client, in order to
144         # avoid tripping up clients that don't have access to an API
145         # server.  If we do build one, make sure our Keep client uses
146         # it.  If instantiation fails, we'll fall back to the except
147         # clause, just like any other Collection lookup
148         # failure. Return an exception, or None if successful.
149         try:
150             if self._api_client is None:
151                 self._api_client = arvados.api('v1')
152                 self._keep_client = None  # Make a new one with the new api.
153             c = self._api_client.collections().get(
154                 uuid=self._manifest_locator).execute(
155                 num_retries=self.num_retries)
156             self._manifest_text = c['manifest_text']
157             return None
158         except Exception as e:
159             return e
160
161     def _populate_from_keep(self):
162         # Retrieve a manifest directly from Keep. This has a chance of
163         # working if [a] the locator includes a permission signature
164         # or [b] the Keep services are operating in world-readable
165         # mode. Return an exception, or None if successful.
166         try:
167             self._manifest_text = self._my_keep().get(
168                 self._manifest_locator, num_retries=self.num_retries)
169         except Exception as e:
170             return e
171
172     def _populate(self):
173         if self._streams is not None:
174             return
175         error_via_api = None
176         error_via_keep = None
177         should_try_keep = (not self._manifest_text and
178                            util.keep_locator_pattern.match(
179                 self._manifest_locator))
180         if (not self._manifest_text and
181             util.signed_locator_pattern.match(self._manifest_locator)):
182             error_via_keep = self._populate_from_keep()
183         if not self._manifest_text:
184             error_via_api = self._populate_from_api_server()
185             if error_via_api != None and not should_try_keep:
186                 raise error_via_api
187         if (not self._manifest_text and
188             not error_via_keep and
189             should_try_keep):
190             # Looks like a keep locator, and we didn't already try keep above
191             error_via_keep = self._populate_from_keep()
192         if not self._manifest_text:
193             # Nothing worked!
194             raise arvados.errors.NotFoundError(
195                 ("Failed to retrieve collection '{}' " +
196                  "from either API server ({}) or Keep ({})."
197                  ).format(
198                     self._manifest_locator,
199                     error_via_api,
200                     error_via_keep))
201         self._streams = [sline.split()
202                          for sline in self._manifest_text.split("\n")
203                          if sline]
204
205     def normalize(self):
206         self._populate()
207
208         # Rearrange streams
209         streams = {}
210         for s in self.all_streams():
211             for f in s.all_files():
212                 filestream = s.name() + "/" + f.name()
213                 r = filestream.rindex("/")
214                 streamname = filestream[:r]
215                 filename = filestream[r+1:]
216                 if streamname not in streams:
217                     streams[streamname] = {}
218                 if filename not in streams[streamname]:
219                     streams[streamname][filename] = []
220                 for r in f.segments:
221                     streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
222
223         self._streams = [normalize_stream(s, streams[s])
224                          for s in sorted(streams)]
225
226         # Regenerate the manifest text based on the normalized streams
227         self._manifest_text = ''.join(
228             [StreamReader(stream, keep=self._my_keep()).manifest_text()
229              for stream in self._streams])
230
231     def all_streams(self):
232         self._populate()
233         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
234                 for s in self._streams]
235
236     def all_files(self):
237         for s in self.all_streams():
238             for f in s.all_files():
239                 yield f
240
241     def manifest_text(self, strip=False, normalize=False):
242         if normalize:
243             cr = CollectionReader(self.manifest_text())
244             cr.normalize()
245             return cr.manifest_text(strip=strip, normalize=False)
246         elif strip:
247             return self.stripped_manifest()
248         else:
249             self._populate()
250             return self._manifest_text
251
252
253 class CollectionWriter(CollectionBase):
254     KEEP_BLOCK_SIZE = 2**26
255
256     def __init__(self, api_client=None, num_retries=0):
257         """Instantiate a CollectionWriter.
258
259         CollectionWriter lets you build a new Arvados Collection from scratch.
260         Write files to it.  The CollectionWriter will upload data to Keep as
261         appropriate, and provide you with the Collection manifest text when
262         you're finished.
263
264         Arguments:
265         * api_client: The API client to use to look up Collections.  If not
266           provided, CollectionReader will build one from available Arvados
267           configuration.
268         * num_retries: The default number of times to retry failed
269           service requests.  Default 0.  You may change this value
270           after instantiation, but note those changes may not
271           propagate to related objects like the Keep client.
272         """
273         self._api_client = api_client
274         self.num_retries = num_retries
275         self._keep_client = None
276         self._data_buffer = []
277         self._data_buffer_len = 0
278         self._current_stream_files = []
279         self._current_stream_length = 0
280         self._current_stream_locators = []
281         self._current_stream_name = '.'
282         self._current_file_name = None
283         self._current_file_pos = 0
284         self._finished_streams = []
285         self._close_file = None
286         self._queued_file = None
287         self._queued_dirents = deque()
288         self._queued_trees = deque()
289
290     def __exit__(self):
291         self.finish()
292
293     def do_queued_work(self):
294         # The work queue consists of three pieces:
295         # * _queued_file: The file object we're currently writing to the
296         #   Collection.
297         # * _queued_dirents: Entries under the current directory
298         #   (_queued_trees[0]) that we want to write or recurse through.
299         #   This may contain files from subdirectories if
300         #   max_manifest_depth == 0 for this directory.
301         # * _queued_trees: Directories that should be written as separate
302         #   streams to the Collection.
303         # This function handles the smallest piece of work currently queued
304         # (current file, then current directory, then next directory) until
305         # no work remains.  The _work_THING methods each do a unit of work on
306         # THING.  _queue_THING methods add a THING to the work queue.
307         while True:
308             if self._queued_file:
309                 self._work_file()
310             elif self._queued_dirents:
311                 self._work_dirents()
312             elif self._queued_trees:
313                 self._work_trees()
314             else:
315                 break
316
317     def _work_file(self):
318         while True:
319             buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
320             if not buf:
321                 break
322             self.write(buf)
323         self.finish_current_file()
324         if self._close_file:
325             self._queued_file.close()
326         self._close_file = None
327         self._queued_file = None
328
329     def _work_dirents(self):
330         path, stream_name, max_manifest_depth = self._queued_trees[0]
331         if stream_name != self.current_stream_name():
332             self.start_new_stream(stream_name)
333         while self._queued_dirents:
334             dirent = self._queued_dirents.popleft()
335             target = os.path.join(path, dirent)
336             if os.path.isdir(target):
337                 self._queue_tree(target,
338                                  os.path.join(stream_name, dirent),
339                                  max_manifest_depth - 1)
340             else:
341                 self._queue_file(target, dirent)
342                 break
343         if not self._queued_dirents:
344             self._queued_trees.popleft()
345
346     def _work_trees(self):
347         path, stream_name, max_manifest_depth = self._queued_trees[0]
348         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
349                         else os.listdir)
350         d = make_dirents(path)
351         if d:
352             self._queue_dirents(stream_name, d)
353         else:
354             self._queued_trees.popleft()
355
356     def _queue_file(self, source, filename=None):
357         assert (self._queued_file is None), "tried to queue more than one file"
358         if not hasattr(source, 'read'):
359             source = open(source, 'rb')
360             self._close_file = True
361         else:
362             self._close_file = False
363         if filename is None:
364             filename = os.path.basename(source.name)
365         self.start_new_file(filename)
366         self._queued_file = source
367
368     def _queue_dirents(self, stream_name, dirents):
369         assert (not self._queued_dirents), "tried to queue more than one tree"
370         self._queued_dirents = deque(sorted(dirents))
371
372     def _queue_tree(self, path, stream_name, max_manifest_depth):
373         self._queued_trees.append((path, stream_name, max_manifest_depth))
374
375     def write_file(self, source, filename=None):
376         self._queue_file(source, filename)
377         self.do_queued_work()
378
379     def write_directory_tree(self,
380                              path, stream_name='.', max_manifest_depth=-1):
381         self._queue_tree(path, stream_name, max_manifest_depth)
382         self.do_queued_work()
383
384     def write(self, newdata):
385         if hasattr(newdata, '__iter__'):
386             for s in newdata:
387                 self.write(s)
388             return
389         self._data_buffer.append(newdata)
390         self._data_buffer_len += len(newdata)
391         self._current_stream_length += len(newdata)
392         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
393             self.flush_data()
394
395     def flush_data(self):
396         data_buffer = ''.join(self._data_buffer)
397         if data_buffer:
398             self._current_stream_locators.append(
399                 self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
400             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
401             self._data_buffer_len = len(self._data_buffer[0])
402
403     def start_new_file(self, newfilename=None):
404         self.finish_current_file()
405         self.set_current_file_name(newfilename)
406
407     def set_current_file_name(self, newfilename):
408         if re.search(r'[\t\n]', newfilename):
409             raise errors.AssertionError(
410                 "Manifest filenames cannot contain whitespace: %s" %
411                 newfilename)
412         self._current_file_name = newfilename
413
414     def current_file_name(self):
415         return self._current_file_name
416
417     def finish_current_file(self):
418         if self._current_file_name == None:
419             if self._current_file_pos == self._current_stream_length:
420                 return
421             raise errors.AssertionError(
422                 "Cannot finish an unnamed file " +
423                 "(%d bytes at offset %d in '%s' stream)" %
424                 (self._current_stream_length - self._current_file_pos,
425                  self._current_file_pos,
426                  self._current_stream_name))
427         self._current_stream_files.append([
428                 self._current_file_pos,
429                 self._current_stream_length - self._current_file_pos,
430                 self._current_file_name])
431         self._current_file_pos = self._current_stream_length
432         self._current_file_name = None
433
434     def start_new_stream(self, newstreamname='.'):
435         self.finish_current_stream()
436         self.set_current_stream_name(newstreamname)
437
438     def set_current_stream_name(self, newstreamname):
439         if re.search(r'[\t\n]', newstreamname):
440             raise errors.AssertionError(
441                 "Manifest stream names cannot contain whitespace")
442         self._current_stream_name = '.' if newstreamname=='' else newstreamname
443
444     def current_stream_name(self):
445         return self._current_stream_name
446
447     def finish_current_stream(self):
448         self.finish_current_file()
449         self.flush_data()
450         if not self._current_stream_files:
451             pass
452         elif self._current_stream_name is None:
453             raise errors.AssertionError(
454                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
455                 (self._current_stream_length, len(self._current_stream_files)))
456         else:
457             if not self._current_stream_locators:
458                 self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
459             self._finished_streams.append([self._current_stream_name,
460                                            self._current_stream_locators,
461                                            self._current_stream_files])
462         self._current_stream_files = []
463         self._current_stream_length = 0
464         self._current_stream_locators = []
465         self._current_stream_name = None
466         self._current_file_pos = 0
467         self._current_file_name = None
468
469     def finish(self):
470         # Store the manifest in Keep and return its locator.
471         return self._my_keep().put(self.manifest_text())
472
473     def portable_data_hash(self):
474         stripped = self.stripped_manifest()
475         return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
476
477     def manifest_text(self):
478         self.finish_current_stream()
479         manifest = ''
480
481         for stream in self._finished_streams:
482             if not re.search(r'^\.(/.*)?$', stream[0]):
483                 manifest += './'
484             manifest += stream[0].replace(' ', '\\040')
485             manifest += ' ' + ' '.join(stream[1])
486             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
487             manifest += "\n"
488
489         if manifest:
490             return manifest
491         else:
492             return ""
493
494     def data_locators(self):
495         ret = []
496         for name, locators, files in self._finished_streams:
497             ret += locators
498         return ret
499
500
501 class ResumableCollectionWriter(CollectionWriter):
502     STATE_PROPS = ['_current_stream_files', '_current_stream_length',
503                    '_current_stream_locators', '_current_stream_name',
504                    '_current_file_name', '_current_file_pos', '_close_file',
505                    '_data_buffer', '_dependencies', '_finished_streams',
506                    '_queued_dirents', '_queued_trees']
507
508     def __init__(self, api_client=None, num_retries=0):
509         self._dependencies = {}
510         super(ResumableCollectionWriter, self).__init__(
511             api_client, num_retries=num_retries)
512
513     @classmethod
514     def from_state(cls, state, *init_args, **init_kwargs):
515         # Try to build a new writer from scratch with the given state.
516         # If the state is not suitable to resume (because files have changed,
517         # been deleted, aren't predictable, etc.), raise a
518         # StaleWriterStateError.  Otherwise, return the initialized writer.
519         # The caller is responsible for calling writer.do_queued_work()
520         # appropriately after it's returned.
521         writer = cls(*init_args, **init_kwargs)
522         for attr_name in cls.STATE_PROPS:
523             attr_value = state[attr_name]
524             attr_class = getattr(writer, attr_name).__class__
525             # Coerce the value into the same type as the initial value, if
526             # needed.
527             if attr_class not in (type(None), attr_value.__class__):
528                 attr_value = attr_class(attr_value)
529             setattr(writer, attr_name, attr_value)
530         # Check dependencies before we try to resume anything.
531         if any(KeepLocator(ls).permission_expired()
532                for ls in writer._current_stream_locators):
533             raise errors.StaleWriterStateError(
534                 "locators include expired permission hint")
535         writer.check_dependencies()
536         if state['_current_file'] is not None:
537             path, pos = state['_current_file']
538             try:
539                 writer._queued_file = open(path, 'rb')
540                 writer._queued_file.seek(pos)
541             except IOError as error:
542                 raise errors.StaleWriterStateError(
543                     "failed to reopen active file {}: {}".format(path, error))
544         return writer
545
546     def check_dependencies(self):
547         for path, orig_stat in self._dependencies.items():
548             if not S_ISREG(orig_stat[ST_MODE]):
549                 raise errors.StaleWriterStateError("{} not file".format(path))
550             try:
551                 now_stat = tuple(os.stat(path))
552             except OSError as error:
553                 raise errors.StaleWriterStateError(
554                     "failed to stat {}: {}".format(path, error))
555             if ((not S_ISREG(now_stat[ST_MODE])) or
556                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
557                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
558                 raise errors.StaleWriterStateError("{} changed".format(path))
559
560     def dump_state(self, copy_func=lambda x: x):
561         state = {attr: copy_func(getattr(self, attr))
562                  for attr in self.STATE_PROPS}
563         if self._queued_file is None:
564             state['_current_file'] = None
565         else:
566             state['_current_file'] = (os.path.realpath(self._queued_file.name),
567                                       self._queued_file.tell())
568         return state
569
570     def _queue_file(self, source, filename=None):
571         try:
572             src_path = os.path.realpath(source)
573         except Exception:
574             raise errors.AssertionError("{} not a file path".format(source))
575         try:
576             path_stat = os.stat(src_path)
577         except OSError as stat_error:
578             path_stat = None
579         super(ResumableCollectionWriter, self)._queue_file(source, filename)
580         fd_stat = os.fstat(self._queued_file.fileno())
581         if not S_ISREG(fd_stat.st_mode):
582             # We won't be able to resume from this cache anyway, so don't
583             # worry about further checks.
584             self._dependencies[source] = tuple(fd_stat)
585         elif path_stat is None:
586             raise errors.AssertionError(
587                 "could not stat {}: {}".format(source, stat_error))
588         elif path_stat.st_ino != fd_stat.st_ino:
589             raise errors.AssertionError(
590                 "{} changed between open and stat calls".format(source))
591         else:
592             self._dependencies[src_path] = tuple(fd_stat)
593
594     def write(self, data):
595         if self._queued_file is None:
596             raise errors.AssertionError(
597                 "resumable writer can't accept unsourced data")
598         return super(ResumableCollectionWriter, self).write(data)