Merge branch '3410-replication-attrs' closes #3410 refs #5011
[arvados.git] / sdk / python / arvados / collection.py
index 814bd75a1ed0d0ccaad6c362385eac406660773e..7bfdf782f8d06b03d6ac482fa64872d1eb8ff9be 100644 (file)
@@ -1,32 +1,20 @@
-import gflags
-import httplib
-import httplib2
+import functools
 import logging
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
 
 from collections import deque
 from stat import *
 
+from .arvfile import ArvadosFileBase
 from keep import *
-from stream import *
+from .stream import StreamReader, split
 import config
 import errors
 import util
 
+_logger = logging.getLogger('arvados.collection')
+
 def normalize_stream(s, stream):
     stream_tokens = [s]
     sortedfiles = list(stream.keys())
@@ -41,12 +29,15 @@ def normalize_stream(s, stream):
                 blocks[b[arvados.LOCATOR]] = streamoffset
                 streamoffset += b[arvados.BLOCKSIZE]
 
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
     for f in sortedfiles:
         current_span = None
         fout = f.replace(' ', '\\040')
         for segment in stream[f]:
             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
-            if current_span == None:
+            if current_span is None:
                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
             else:
                 if segmentoffset == current_span[1]:
@@ -55,102 +46,288 @@ def normalize_stream(s, stream):
                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 
-        if current_span != None:
+        if current_span is not None:
             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 
-        if len(stream[f]) == 0:
+        if not stream[f]:
             stream_tokens.append("0:0:{0}".format(fout))
 
     return stream_tokens
 
-def normalize(collection):
-    streams = {}
-    for s in collection.all_streams():
-        for f in s.all_files():
-            filestream = s.name() + "/" + f.name()
-            r = filestream.rindex("/")
-            streamname = filestream[:r]
-            filename = filestream[r+1:]
-            if streamname not in streams:
-                streams[streamname] = {}
-            if filename not in streams[streamname]:
-                streams[streamname][filename] = []
-            for r in f.segments:
-                streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
-
-    normalized_streams = []
-    sortedstreams = list(streams.keys())
-    sortedstreams.sort()
-    for s in sortedstreams:
-        normalized_streams.append(normalize_stream(s, streams[s]))
-    return normalized_streams
-
-
-class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text):
-        if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+
+class CollectionBase(object):
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        pass
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            self._keep_client = KeepClient(api_client=self._api_client,
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+    def stripped_manifest(self):
+        """
+        Return the manifest for the current collection with all
+        non-portable hints (i.e., permission signatures and other
+        hints other than size hints) removed from the locators.
+        """
+        raw = self.manifest_text()
+        clean = []
+        for line in raw.split("\n"):
+            fields = line.split()
+            if fields:
+                clean_fields = fields[:1] + [
+                    (re.sub(r'\+[^\d][^\+]*', '', x)
+                     if re.match(util.keep_locator_pattern, x)
+                     else x)
+                    for x in fields[1:]]
+                clean += [' '.join(clean_fields), "\n"]
+        return ''.join(clean)
+
+
+class CollectionReader(CollectionBase):
+    def __init__(self, manifest_locator_or_text, api_client=None,
+                 keep_client=None, num_retries=0):
+        """Instantiate a CollectionReader.
+
+        This class parses Collection manifests to provide a simple interface
+        to read its underlying files.
+
+        Arguments:
+        * manifest_locator_or_text: One of a Collection UUID, portable data
+          hash, or full manifest text.
+        * api_client: The API client to use to look up Collections.  If not
+          provided, CollectionReader will build one from available Arvados
+          configuration.
+        * keep_client: The KeepClient to use to download Collection data.
+          If not provided, CollectionReader will build one from available
+          Arvados configuration.
+        * num_retries: The default number of times to retry failed
+          service requests.  Default 0.  You may change this value
+          after instantiation, but note those changes may not
+          propagate to related objects like the Keep client.
+        """
+        self._api_client = api_client
+        self._keep_client = keep_client
+        self.num_retries = num_retries
+        if re.match(util.keep_locator_pattern, manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
-        elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+        elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+            self._manifest_locator = manifest_locator_or_text
+            self._manifest_text = None
+        elif re.match(util.manifest_pattern, manifest_locator_or_text):
             self._manifest_text = manifest_locator_or_text
             self._manifest_locator = None
         else:
             raise errors.ArgumentError(
                 "Argument to CollectionReader must be a manifest or a collection UUID")
+        self._api_response = None
         self._streams = None
 
-    def __enter__(self):
-        pass
-
-    def __exit__(self):
-        pass
+    def _populate_from_api_server(self):
+        # As in KeepClient itself, we must wait until the last
+        # possible moment to instantiate an API client, in order to
+        # avoid tripping up clients that don't have access to an API
+        # server.  If we do build one, make sure our Keep client uses
+        # it.  If instantiation fails, we'll fall back to the except
+        # clause, just like any other Collection lookup
+        # failure. Return an exception, or None if successful.
+        try:
+            if self._api_client is None:
+                self._api_client = arvados.api('v1')
+                self._keep_client = None  # Make a new one with the new api.
+            self._api_response = self._api_client.collections().get(
+                uuid=self._manifest_locator).execute(
+                num_retries=self.num_retries)
+            self._manifest_text = self._api_response['manifest_text']
+            return None
+        except Exception as e:
+            return e
+
+    def _populate_from_keep(self):
+        # Retrieve a manifest directly from Keep. This has a chance of
+        # working if [a] the locator includes a permission signature
+        # or [b] the Keep services are operating in world-readable
+        # mode. Return an exception, or None if successful.
+        try:
+            self._manifest_text = self._my_keep().get(
+                self._manifest_locator, num_retries=self.num_retries)
+        except Exception as e:
+            return e
 
     def _populate(self):
-        if self._streams != None:
-            return
-        if not self._manifest_text:
-            try:
-                c = arvados.api('v1').collections().get(
-                    uuid=self._manifest_locator).execute()
-                self._manifest_text = c['manifest_text']
-            except Exception as e:
-                logging.warning("API lookup failed for collection %s (%s: %s)" %
-                                (self._manifest_locator, type(e), str(e)))
-                self._manifest_text = Keep.get(self._manifest_locator)
-        self._streams = []
-        for stream_line in self._manifest_text.split("\n"):
-            if stream_line != '':
-                stream_tokens = stream_line.split()
-                self._streams += [stream_tokens]
-        self._streams = normalize(self)
-
-        # now regenerate the manifest text based on the normalized stream
-
-        #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
-        #print "result", self._manifest_text
-
+        error_via_api = None
+        error_via_keep = None
+        should_try_keep = ((self._manifest_text is None) and
+                           util.keep_locator_pattern.match(
+                self._manifest_locator))
+        if ((self._manifest_text is None) and
+            util.signed_locator_pattern.match(self._manifest_locator)):
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
+            error_via_api = self._populate_from_api_server()
+            if error_via_api is not None and not should_try_keep:
+                raise error_via_api
+        if ((self._manifest_text is None) and
+            not error_via_keep and
+            should_try_keep):
+            # Looks like a keep locator, and we didn't already try keep above
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
+            # Nothing worked!
+            raise arvados.errors.NotFoundError(
+                ("Failed to retrieve collection '{}' " +
+                 "from either API server ({}) or Keep ({})."
+                 ).format(
+                    self._manifest_locator,
+                    error_via_api,
+                    error_via_keep))
+        self._streams = [sline.split()
+                         for sline in self._manifest_text.split("\n")
+                         if sline]
+
+    def _populate_first(orig_func):
+        # Decorator for methods that read actual Collection data.
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if self._streams is None:
+                self._populate()
+            return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    @_populate_first
+    def api_response(self):
+        """api_response() -> dict or None
+
+        Returns information about this Collection fetched from the API server.
+        If the Collection exists in Keep but not the API server, currently
+        returns None.  Future versions may provide a synthetic response.
+        """
+        return self._api_response
+
+    @_populate_first
+    def normalize(self):
+        # Rearrange streams
+        streams = {}
+        for s in self.all_streams():
+            for f in s.all_files():
+                streamname, filename = split(s.name() + "/" + f.name())
+                if streamname not in streams:
+                    streams[streamname] = {}
+                if filename not in streams[streamname]:
+                    streams[streamname][filename] = []
+                for r in f.segments:
+                    streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+        self._streams = [normalize_stream(s, streams[s])
+                         for s in sorted(streams)]
+
+        # Regenerate the manifest text based on the normalized streams
+        self._manifest_text = ''.join(
+            [StreamReader(stream, keep=self._my_keep()).manifest_text()
+             for stream in self._streams])
+
+    @_populate_first
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to read from the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object to read that file.
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        keep_client = self._my_keep()
+        for stream_s in self._streams:
+            stream = StreamReader(stream_s, keep_client,
+                                  num_retries=self.num_retries)
+            if stream.name() == streampath:
+                break
+        else:
+            raise ValueError("stream '{}' not found in Collection".
+                             format(streampath))
+        try:
+            return stream.files()[filename]
+        except KeyError:
+            raise ValueError("file '{}' not found in Collection stream '{}'".
+                             format(filename, streampath))
 
+    @_populate_first
     def all_streams(self):
-        self._populate()
-        resp = []
-        for s in self._streams:
-            resp.append(StreamReader(s))
-        return resp
+        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+                for s in self._streams]
 
     def all_files(self):
         for s in self.all_streams():
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self):
-        self._populate()
-        return self._manifest_text
+    @_populate_first
+    def manifest_text(self, strip=False, normalize=False):
+        if normalize:
+            cr = CollectionReader(self.manifest_text())
+            cr.normalize()
+            return cr.manifest_text(strip=strip, normalize=False)
+        elif strip:
+            return self.stripped_manifest()
+        else:
+            return self._manifest_text
+
+
+class _WriterFile(ArvadosFileBase):
+    def __init__(self, coll_writer, name):
+        super(_WriterFile, self).__init__(name, 'wb')
+        self.dest = coll_writer
+
+    def close(self):
+        super(_WriterFile, self).close()
+        self.dest.finish_current_file()
 
-class CollectionWriter(object):
+    @ArvadosFileBase._before_close
+    def write(self, data):
+        self.dest.write(data)
+
+    @ArvadosFileBase._before_close
+    def writelines(self, seq):
+        for data in seq:
+            self.write(data)
+
+    @ArvadosFileBase._before_close
+    def flush(self):
+        self.dest.flush_data()
+
+
+class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self):
+    def __init__(self, api_client=None, num_retries=0, replication=None):
+        """Instantiate a CollectionWriter.
+
+        CollectionWriter lets you build a new Arvados Collection from scratch.
+        Write files to it.  The CollectionWriter will upload data to Keep as
+        appropriate, and provide you with the Collection manifest text when
+        you're finished.
+
+        Arguments:
+        * api_client: The API client to use to look up Collections.  If not
+          provided, CollectionReader will build one from available Arvados
+          configuration.
+        * num_retries: The default number of times to retry failed
+          service requests.  Default 0.  You may change this value
+          after instantiation, but note those changes may not
+          propagate to related objects like the Keep client.
+        * replication: The number of copies of each block to store.
+          If this argument is None or not supplied, replication is
+          the server-provided default if available, otherwise 2.
+        """
+        self._api_client = api_client
+        self.num_retries = num_retries
+        self.replication = (2 if replication is None else replication)
+        self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._current_stream_files = []
@@ -164,14 +341,13 @@ class CollectionWriter(object):
         self._queued_file = None
         self._queued_dirents = deque()
         self._queued_trees = deque()
+        self._last_open = None
 
-    def __enter__(self):
-        pass
-
-    def __exit__(self):
-        self.finish()
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.finish()
 
-    def _do_queued_work(self):
+    def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
         #   Collection.
@@ -194,11 +370,6 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
-            self.checkpoint_state()
-
-    def checkpoint_state(self):
-        # Subclasses can implement this method to, e.g., report or record state.
-        pass
 
     def _work_file(self):
         while True:
@@ -231,9 +402,12 @@ class CollectionWriter(object):
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
-        make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
-                        else os.listdir)
-        self._queue_dirents(stream_name, make_dirents(path))
+        d = util.listdir_recursive(
+            path, max_depth = (None if max_manifest_depth == 0 else 0))
+        if d:
+            self._queue_dirents(stream_name, d)
+        else:
+            self._queued_trees.popleft()
 
     def _queue_file(self, source, filename=None):
         assert (self._queued_file is None), "tried to queue more than one file"
@@ -256,31 +430,62 @@ class CollectionWriter(object):
 
     def write_file(self, source, filename=None):
         self._queue_file(source, filename)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write_directory_tree(self,
                              path, stream_name='.', max_manifest_depth=-1):
         self._queue_tree(path, stream_name, max_manifest_depth)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):
             for s in newdata:
                 self.write(s)
             return
-        self._data_buffer += [newdata]
+        self._data_buffer.append(newdata)
         self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to write to the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object you can write to add it to the
+        Collection.
+
+        You may only have one file object from the Collection open at a time,
+        so be sure to close the object when you're done.  Using the object in
+        a with statement makes that easy::
+
+          with cwriter.open('./doc/page1.txt') as outfile:
+              outfile.write(page1_data)
+          with cwriter.open('./doc/page2.txt') as outfile:
+              outfile.write(page2_data)
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        if self._last_open and not self._last_open.closed:
+            raise errors.AssertionError(
+                "can't open '{}' when '{}' is still open".format(
+                    filename, self._last_open.name))
+        if streampath != self.current_stream_name():
+            self.start_new_stream(streampath)
+        self.set_current_file_name(filename)
+        self._last_open = _WriterFile(self, filename)
+        return self._last_open
+
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
-        if data_buffer != '':
-            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+        if data_buffer:
+            self._current_stream_locators.append(
+                self._my_keep().put(
+                    data_buffer[0:self.KEEP_BLOCK_SIZE],
+                    copies=self.replication))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
-            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
@@ -291,13 +496,17 @@ class CollectionWriter(object):
             raise errors.AssertionError(
                 "Manifest filenames cannot contain whitespace: %s" %
                 newfilename)
+        elif re.search(r'\x00', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain NUL characters: %s" %
+                newfilename)
         self._current_file_name = newfilename
 
     def current_file_name(self):
         return self._current_file_name
 
     def finish_current_file(self):
-        if self._current_file_name == None:
+        if self._current_file_name is None:
             if self._current_file_pos == self._current_stream_length:
                 return
             raise errors.AssertionError(
@@ -306,10 +515,12 @@ class CollectionWriter(object):
                 (self._current_stream_length - self._current_file_pos,
                  self._current_file_pos,
                  self._current_stream_name))
-        self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
+        self._current_stream_files.append([
+                self._current_file_pos,
+                self._current_stream_length - self._current_file_pos,
+                self._current_file_name])
         self._current_file_pos = self._current_stream_length
+        self._current_file_name = None
 
     def start_new_stream(self, newstreamname='.'):
         self.finish_current_stream()
@@ -327,18 +538,18 @@ class CollectionWriter(object):
     def finish_current_stream(self):
         self.finish_current_file()
         self.flush_data()
-        if len(self._current_stream_files) == 0:
+        if not self._current_stream_files:
             pass
-        elif self._current_stream_name == None:
+        elif self._current_stream_name is None:
             raise errors.AssertionError(
                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
                 (self._current_stream_length, len(self._current_stream_files)))
         else:
-            if len(self._current_stream_locators) == 0:
-                self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
-            self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
+            if not self._current_stream_locators:
+                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+            self._finished_streams.append([self._current_stream_name,
+                                           self._current_stream_locators,
+                                           self._current_stream_files])
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -347,7 +558,20 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        return Keep.put(self.manifest_text())
+        """Store the manifest in Keep and return its locator.
+
+        This is useful for storing manifest fragments (task outputs)
+        temporarily in Keep during a Crunch job.
+
+        In other cases you should make a collection instead, by
+        sending manifest_text() to the API server's "create
+        collection" endpoint.
+        """
+        return self._my_keep().put(self.manifest_text(), copies=self.replication)
+
+    def portable_data_hash(self):
+        stripped = self.stripped_manifest()
+        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -361,10 +585,7 @@ class CollectionWriter(object):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        #print 'writer',manifest
-        #print 'after reader',CollectionReader(manifest).manifest_text()
-
-        return CollectionReader(manifest).manifest_text()
+        return manifest
 
     def data_locators(self):
         ret = []
@@ -380,13 +601,19 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self):
+    def __init__(self, api_client=None, **kwargs):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__()
+        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
 
     @classmethod
-    def from_state(cls, state):
-        writer = cls()
+    def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
+        writer = cls(*init_args, **init_kwargs)
         for attr_name in cls.STATE_PROPS:
             attr_value = state[attr_name]
             attr_class = getattr(writer, attr_name).__class__
@@ -396,6 +623,10 @@ class ResumableCollectionWriter(CollectionWriter):
                 attr_value = attr_class(attr_value)
             setattr(writer, attr_name, attr_value)
         # Check dependencies before we try to resume anything.
+        if any(KeepLocator(ls).permission_expired()
+               for ls in writer._current_stream_locators):
+            raise errors.StaleWriterStateError(
+                "locators include expired permission hint")
         writer.check_dependencies()
         if state['_current_file'] is not None:
             path, pos = state['_current_file']
@@ -405,7 +636,6 @@ class ResumableCollectionWriter(CollectionWriter):
             except IOError as error:
                 raise errors.StaleWriterStateError(
                     "failed to reopen active file {}: {}".format(path, error))
-        writer._do_queued_work()
         return writer
 
     def check_dependencies(self):
@@ -439,15 +669,22 @@ class ResumableCollectionWriter(CollectionWriter):
             raise errors.AssertionError("{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
-        except OSError as error:
-            raise errors.AssertionError(
-                "could not stat {}: {}".format(source, error))
+        except OSError as stat_error:
+            path_stat = None
         super(ResumableCollectionWriter, self)._queue_file(source, filename)
         fd_stat = os.fstat(self._queued_file.fileno())
-        if path_stat.st_ino != fd_stat.st_ino:
+        if not S_ISREG(fd_stat.st_mode):
+            # We won't be able to resume from this cache anyway, so don't
+            # worry about further checks.
+            self._dependencies[source] = tuple(fd_stat)
+        elif path_stat is None:
+            raise errors.AssertionError(
+                "could not stat {}: {}".format(source, stat_error))
+        elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
                 "{} changed between open and stat calls".format(source))
-        self._dependencies[src_path] = tuple(fd_stat)
+        else:
+            self._dependencies[src_path] = tuple(fd_stat)
 
     def write(self, data):
         if self._queued_file is None: