Merge branch 'master' into 4232-slow-pipes-n-jobs
[arvados.git] / sdk / python / arvados / collection.py
index 874c38e79e7211de89e8f32983c6476b13038eaf..7bfdf782f8d06b03d6ac482fa64872d1eb8ff9be 100644 (file)
@@ -1,28 +1,14 @@
-import gflags
-import httplib
-import httplib2
+import functools
 import logging
 import os
 import logging
 import os
-import pprint
-import sys
-import types
-import subprocess
-import json
-import UserDict
 import re
 import re
-import hashlib
-import string
-import bz2
-import zlib
-import fcntl
-import time
-import threading
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
+from .arvfile import ArvadosFileBase
 from keep import *
 from keep import *
-from stream import *
+from .stream import StreamReader, split
 import config
 import errors
 import util
 import config
 import errors
 import util
@@ -51,7 +37,7 @@ def normalize_stream(s, stream):
         fout = f.replace(' ', '\\040')
         for segment in stream[f]:
             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
         fout = f.replace(' ', '\\040')
         for segment in stream[f]:
             segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
-            if current_span == None:
+            if current_span is None:
                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
             else:
                 if segmentoffset == current_span[1]:
                 current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
             else:
                 if segmentoffset == current_span[1]:
@@ -60,42 +46,20 @@ def normalize_stream(s, stream):
                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 
                     stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
                     current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
 
-        if current_span != None:
+        if current_span is not None:
             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 
             stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
 
-        if len(stream[f]) == 0:
+        if not stream[f]:
             stream_tokens.append("0:0:{0}".format(fout))
 
     return stream_tokens
 
             stream_tokens.append("0:0:{0}".format(fout))
 
     return stream_tokens
 
-def normalize(collection):
-    streams = {}
-    for s in collection.all_streams():
-        for f in s.all_files():
-            filestream = s.name() + "/" + f.name()
-            r = filestream.rindex("/")
-            streamname = filestream[:r]
-            filename = filestream[r+1:]
-            if streamname not in streams:
-                streams[streamname] = {}
-            if filename not in streams[streamname]:
-                streams[streamname][filename] = []
-            for r in f.segments:
-                streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
-
-    normalized_streams = []
-    sortedstreams = list(streams.keys())
-    sortedstreams.sort()
-    for s in sortedstreams:
-        normalized_streams.append(normalize_stream(s, streams[s]))
-    return normalized_streams
-
 
 class CollectionBase(object):
     def __enter__(self):
 
 class CollectionBase(object):
     def __enter__(self):
-        pass
+        return self
 
 
-    def __exit__(self):
+    def __exit__(self, exc_type, exc_value, traceback):
         pass
 
     def _my_keep(self):
         pass
 
     def _my_keep(self):
@@ -104,6 +68,25 @@ class CollectionBase(object):
                                            num_retries=self.num_retries)
         return self._keep_client
 
                                            num_retries=self.num_retries)
         return self._keep_client
 
+    def stripped_manifest(self):
+        """
+        Return the manifest for the current collection with all
+        non-portable hints (i.e., permission signatures and other
+        hints other than size hints) removed from the locators.
+        """
+        raw = self.manifest_text()
+        clean = []
+        for line in raw.split("\n"):
+            fields = line.split()
+            if fields:
+                clean_fields = fields[:1] + [
+                    (re.sub(r'\+[^\d][^\+]*', '', x)
+                     if re.match(util.keep_locator_pattern, x)
+                     else x)
+                    for x in fields[1:]]
+                clean += [' '.join(clean_fields), "\n"]
+        return ''.join(clean)
+
 
 class CollectionReader(CollectionBase):
     def __init__(self, manifest_locator_or_text, api_client=None,
 
 class CollectionReader(CollectionBase):
     def __init__(self, manifest_locator_or_text, api_client=None,
@@ -142,38 +125,59 @@ class CollectionReader(CollectionBase):
         else:
             raise errors.ArgumentError(
                 "Argument to CollectionReader must be a manifest or a collection UUID")
         else:
             raise errors.ArgumentError(
                 "Argument to CollectionReader must be a manifest or a collection UUID")
+        self._api_response = None
         self._streams = None
 
         self._streams = None
 
+    def _populate_from_api_server(self):
+        # As in KeepClient itself, we must wait until the last
+        # possible moment to instantiate an API client, in order to
+        # avoid tripping up clients that don't have access to an API
+        # server.  If we do build one, make sure our Keep client uses
+        # it.  If instantiation fails, we'll fall back to the except
+        # clause, just like any other Collection lookup
+        # failure. Return an exception, or None if successful.
+        try:
+            if self._api_client is None:
+                self._api_client = arvados.api('v1')
+                self._keep_client = None  # Make a new one with the new api.
+            self._api_response = self._api_client.collections().get(
+                uuid=self._manifest_locator).execute(
+                num_retries=self.num_retries)
+            self._manifest_text = self._api_response['manifest_text']
+            return None
+        except Exception as e:
+            return e
+
+    def _populate_from_keep(self):
+        # Retrieve a manifest directly from Keep. This has a chance of
+        # working if [a] the locator includes a permission signature
+        # or [b] the Keep services are operating in world-readable
+        # mode. Return an exception, or None if successful.
+        try:
+            self._manifest_text = self._my_keep().get(
+                self._manifest_locator, num_retries=self.num_retries)
+        except Exception as e:
+            return e
+
     def _populate(self):
     def _populate(self):
-        if self._streams is not None:
-            return
         error_via_api = None
         error_via_keep = None
         error_via_api = None
         error_via_keep = None
-        should_try_keep = (not self._manifest_text and
+        should_try_keep = ((self._manifest_text is None) and
                            util.keep_locator_pattern.match(
                 self._manifest_locator))
                            util.keep_locator_pattern.match(
                 self._manifest_locator))
-        if (not self._manifest_text and
+        if ((self._manifest_text is None) and
             util.signed_locator_pattern.match(self._manifest_locator)):
             util.signed_locator_pattern.match(self._manifest_locator)):
-            try:
-                self._populate_from_keep()
-            except e:
-                error_via_keep = e
-        if not self._manifest_text:
-            try:
-                self._populate_from_api_server()
-            except Exception as e:
-                if not should_try_keep:
-                    raise
-                error_via_api = e
-        if (not self._manifest_text and
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
+            error_via_api = self._populate_from_api_server()
+            if error_via_api is not None and not should_try_keep:
+                raise error_via_api
+        if ((self._manifest_text is None) and
             not error_via_keep and
             should_try_keep):
             # Looks like a keep locator, and we didn't already try keep above
             not error_via_keep and
             should_try_keep):
             # Looks like a keep locator, and we didn't already try keep above
-            try:
-                self._populate_from_keep()
-            except Exception as e:
-                error_via_keep = e
-        if not self._manifest_text:
+            error_via_keep = self._populate_from_keep()
+        if self._manifest_text is None:
             # Nothing worked!
             raise arvados.errors.NotFoundError(
                 ("Failed to retrieve collection '{}' " +
             # Nothing worked!
             raise arvados.errors.NotFoundError(
                 ("Failed to retrieve collection '{}' " +
@@ -185,17 +189,75 @@ class CollectionReader(CollectionBase):
         self._streams = [sline.split()
                          for sline in self._manifest_text.split("\n")
                          if sline]
         self._streams = [sline.split()
                          for sline in self._manifest_text.split("\n")
                          if sline]
-        self._streams = normalize(self)
 
 
-        # now regenerate the manifest text based on the normalized stream
-
-        #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
-        #print "result", self._manifest_text
+    def _populate_first(orig_func):
+        # Decorator for methods that read actual Collection data.
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if self._streams is None:
+                self._populate()
+            return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    @_populate_first
+    def api_response(self):
+        """api_response() -> dict or None
+
+        Returns information about this Collection fetched from the API server.
+        If the Collection exists in Keep but not the API server, currently
+        returns None.  Future versions may provide a synthetic response.
+        """
+        return self._api_response
 
 
+    @_populate_first
+    def normalize(self):
+        # Rearrange streams
+        streams = {}
+        for s in self.all_streams():
+            for f in s.all_files():
+                streamname, filename = split(s.name() + "/" + f.name())
+                if streamname not in streams:
+                    streams[streamname] = {}
+                if filename not in streams[streamname]:
+                    streams[streamname][filename] = []
+                for r in f.segments:
+                    streams[streamname][filename].extend(s.locators_and_ranges(r[0], r[1]))
+
+        self._streams = [normalize_stream(s, streams[s])
+                         for s in sorted(streams)]
+
+        # Regenerate the manifest text based on the normalized streams
+        self._manifest_text = ''.join(
+            [StreamReader(stream, keep=self._my_keep()).manifest_text()
+             for stream in self._streams])
+
+    @_populate_first
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to read from the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object to read that file.
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        keep_client = self._my_keep()
+        for stream_s in self._streams:
+            stream = StreamReader(stream_s, keep_client,
+                                  num_retries=self.num_retries)
+            if stream.name() == streampath:
+                break
+        else:
+            raise ValueError("stream '{}' not found in Collection".
+                             format(streampath))
+        try:
+            return stream.files()[filename]
+        except KeyError:
+            raise ValueError("file '{}' not found in Collection stream '{}'".
+                             format(filename, streampath))
 
 
+    @_populate_first
     def all_streams(self):
     def all_streams(self):
-        self._populate()
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
                 for s in self._streams]
 
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
                 for s in self._streams]
 
@@ -204,19 +266,45 @@ class CollectionReader(CollectionBase):
             for f in s.all_files():
                 yield f
 
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self, strip=False):
-        self._populate()
-        if strip:
-            m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
-            return m
+    @_populate_first
+    def manifest_text(self, strip=False, normalize=False):
+        if normalize:
+            cr = CollectionReader(self.manifest_text())
+            cr.normalize()
+            return cr.manifest_text(strip=strip, normalize=False)
+        elif strip:
+            return self.stripped_manifest()
         else:
             return self._manifest_text
 
 
         else:
             return self._manifest_text
 
 
+class _WriterFile(ArvadosFileBase):
+    def __init__(self, coll_writer, name):
+        super(_WriterFile, self).__init__(name, 'wb')
+        self.dest = coll_writer
+
+    def close(self):
+        super(_WriterFile, self).close()
+        self.dest.finish_current_file()
+
+    @ArvadosFileBase._before_close
+    def write(self, data):
+        self.dest.write(data)
+
+    @ArvadosFileBase._before_close
+    def writelines(self, seq):
+        for data in seq:
+            self.write(data)
+
+    @ArvadosFileBase._before_close
+    def flush(self):
+        self.dest.flush_data()
+
+
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
 class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, num_retries=0, replication=None):
         """Instantiate a CollectionWriter.
 
         CollectionWriter lets you build a new Arvados Collection from scratch.
         """Instantiate a CollectionWriter.
 
         CollectionWriter lets you build a new Arvados Collection from scratch.
@@ -232,9 +320,13 @@ class CollectionWriter(CollectionBase):
           service requests.  Default 0.  You may change this value
           after instantiation, but note those changes may not
           propagate to related objects like the Keep client.
           service requests.  Default 0.  You may change this value
           after instantiation, but note those changes may not
           propagate to related objects like the Keep client.
+        * replication: The number of copies of each block to store.
+          If this argument is None or not supplied, replication is
+          the server-provided default if available, otherwise 2.
         """
         self._api_client = api_client
         self.num_retries = num_retries
         """
         self._api_client = api_client
         self.num_retries = num_retries
+        self.replication = (2 if replication is None else replication)
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
@@ -249,9 +341,11 @@ class CollectionWriter(CollectionBase):
         self._queued_file = None
         self._queued_dirents = deque()
         self._queued_trees = deque()
         self._queued_file = None
         self._queued_dirents = deque()
         self._queued_trees = deque()
+        self._last_open = None
 
 
-    def __exit__(self):
-        self.finish()
+    def __exit__(self, exc_type, exc_value, traceback):
+        if exc_type is None:
+            self.finish()
 
     def do_queued_work(self):
         # The work queue consists of three pieces:
 
     def do_queued_work(self):
         # The work queue consists of three pieces:
@@ -308,10 +402,9 @@ class CollectionWriter(CollectionBase):
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
 
     def _work_trees(self):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
-        make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
-                        else os.listdir)
-        d = make_dirents(path)
-        if len(d) > 0:
+        d = util.listdir_recursive(
+            path, max_depth = (None if max_manifest_depth == 0 else 0))
+        if d:
             self._queue_dirents(stream_name, d)
         else:
             self._queued_trees.popleft()
             self._queue_dirents(stream_name, d)
         else:
             self._queued_trees.popleft()
@@ -355,11 +448,42 @@ class CollectionWriter(CollectionBase):
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
 
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
             self.flush_data()
 
+    def open(self, streampath, filename=None):
+        """open(streampath[, filename]) -> file-like object
+
+        Pass in the path of a file to write to the Collection, either as a
+        single string or as two separate stream name and file name arguments.
+        This method returns a file-like object you can write to add it to the
+        Collection.
+
+        You may only have one file object from the Collection open at a time,
+        so be sure to close the object when you're done.  Using the object in
+        a with statement makes that easy::
+
+          with cwriter.open('./doc/page1.txt') as outfile:
+              outfile.write(page1_data)
+          with cwriter.open('./doc/page2.txt') as outfile:
+              outfile.write(page2_data)
+        """
+        if filename is None:
+            streampath, filename = split(streampath)
+        if self._last_open and not self._last_open.closed:
+            raise errors.AssertionError(
+                "can't open '{}' when '{}' is still open".format(
+                    filename, self._last_open.name))
+        if streampath != self.current_stream_name():
+            self.start_new_stream(streampath)
+        self.set_current_file_name(filename)
+        self._last_open = _WriterFile(self, filename)
+        return self._last_open
+
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
-                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
+                self._my_keep().put(
+                    data_buffer[0:self.KEEP_BLOCK_SIZE],
+                    copies=self.replication))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -372,13 +496,17 @@ class CollectionWriter(CollectionBase):
             raise errors.AssertionError(
                 "Manifest filenames cannot contain whitespace: %s" %
                 newfilename)
             raise errors.AssertionError(
                 "Manifest filenames cannot contain whitespace: %s" %
                 newfilename)
+        elif re.search(r'\x00', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain NUL characters: %s" %
+                newfilename)
         self._current_file_name = newfilename
 
     def current_file_name(self):
         return self._current_file_name
 
     def finish_current_file(self):
         self._current_file_name = newfilename
 
     def current_file_name(self):
         return self._current_file_name
 
     def finish_current_file(self):
-        if self._current_file_name == None:
+        if self._current_file_name is None:
             if self._current_file_pos == self._current_stream_length:
                 return
             raise errors.AssertionError(
             if self._current_file_pos == self._current_stream_length:
                 return
             raise errors.AssertionError(
@@ -430,23 +558,20 @@ class CollectionWriter(CollectionBase):
         self._current_file_name = None
 
     def finish(self):
         self._current_file_name = None
 
     def finish(self):
-        # Store the manifest in Keep and return its locator.
-        return self._my_keep().put(self.manifest_text())
+        """Store the manifest in Keep and return its locator.
 
 
-    def stripped_manifest(self):
-        """
-        Return the manifest for the current collection with all permission
-        hints removed from the locators in the manifest.
+        This is useful for storing manifest fragments (task outputs)
+        temporarily in Keep during a Crunch job.
+
+        In other cases you should make a collection instead, by
+        sending manifest_text() to the API server's "create
+        collection" endpoint.
         """
         """
-        raw = self.manifest_text()
-        clean = ''
-        for line in raw.split("\n"):
-            fields = line.split()
-            if len(fields) > 0:
-                locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
-                             for x in fields[1:-1] ]
-                clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
-        return clean
+        return self._my_keep().put(self.manifest_text(), copies=self.replication)
+
+    def portable_data_hash(self):
+        stripped = self.stripped_manifest()
+        return hashlib.md5(stripped).hexdigest() + '+' + str(len(stripped))
 
     def manifest_text(self):
         self.finish_current_stream()
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -460,10 +585,7 @@ class CollectionWriter(CollectionBase):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        if manifest:
-            return CollectionReader(manifest, self._api_client).manifest_text()
-        else:
-            return ""
+        return manifest
 
     def data_locators(self):
         ret = []
 
     def data_locators(self):
         ret = []
@@ -479,10 +601,9 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self, api_client=None, num_retries=0):
+    def __init__(self, api_client=None, **kwargs):
         self._dependencies = {}
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__(
-            api_client, num_retries=num_retries)
+        super(ResumableCollectionWriter, self).__init__(api_client, **kwargs)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):