3644: Tweak CollectionReader manifest text regular expression to use multiline
[arvados.git] / sdk / python / arvados / collection.py
index 814bd75a1ed0d0ccaad6c362385eac406660773e..a06a17faeeb051e0edeee277535c9b8d969db9ce 100644 (file)
@@ -27,6 +27,8 @@ import config
 import errors
 import util
 
+_logger = logging.getLogger('arvados.collection')
+
 def normalize_stream(s, stream):
     stream_tokens = [s]
     sortedfiles = list(stream.keys())
@@ -41,6 +43,9 @@ def normalize_stream(s, stream):
                 blocks[b[arvados.LOCATOR]] = streamoffset
                 streamoffset += b[arvados.BLOCKSIZE]
 
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
     for f in sortedfiles:
         current_span = None
         fout = f.replace(' ', '\\040')
@@ -87,11 +92,16 @@ def normalize(collection):
 
 
 class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text):
-        if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+    def __init__(self, manifest_locator_or_text, api_client=None):
+        self._api_client = api_client
+        self._keep_client = None
+        if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+            self._manifest_locator = manifest_locator_or_text
+            self._manifest_text = None
+        elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
-        elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+        elif re.match(r'((\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
             self._manifest_text = manifest_locator_or_text
             self._manifest_locator = None
         else:
@@ -106,17 +116,30 @@ class CollectionReader(object):
         pass
 
     def _populate(self):
-        if self._streams != None:
+        if self._streams is not None:
             return
         if not self._manifest_text:
             try:
-                c = arvados.api('v1').collections().get(
+                # As in KeepClient itself, we must wait until the last possible
+                # moment to instantiate an API client, in order to avoid
+                # tripping up clients that don't have access to an API server.
+                # If we do build one, make sure our Keep client uses it.
+                # If instantiation fails, we'll fall back to the except clause,
+                # just like any other Collection lookup failure.
+                if self._api_client is None:
+                    self._api_client = arvados.api('v1')
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                if self._keep_client is None:
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                c = self._api_client.collections().get(
                     uuid=self._manifest_locator).execute()
                 self._manifest_text = c['manifest_text']
             except Exception as e:
-                logging.warning("API lookup failed for collection %s (%s: %s)" %
-                                (self._manifest_locator, type(e), str(e)))
-                self._manifest_text = Keep.get(self._manifest_locator)
+                _logger.warning("API lookup failed for collection %s (%s: %s)",
+                                self._manifest_locator, type(e), str(e))
+                if self._keep_client is None:
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                self._manifest_text = self._keep_client.get(self._manifest_locator)
         self._streams = []
         for stream_line in self._manifest_text.split("\n"):
             if stream_line != '':
@@ -143,14 +166,20 @@ class CollectionReader(object):
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self):
+    def manifest_text(self, strip=False):
         self._populate()
-        return self._manifest_text
+        if strip:
+            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            return m
+        else:
+            return self._manifest_text
 
 class CollectionWriter(object):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self):
+    def __init__(self, api_client=None):
+        self._api_client = api_client
+        self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._current_stream_files = []
@@ -171,7 +200,11 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
-    def _do_queued_work(self):
+    def _prep_keep_client(self):
+        if self._keep_client is None:
+            self._keep_client = KeepClient(api_client=self._api_client)
+
+    def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
         #   Collection.
@@ -194,11 +227,6 @@ class CollectionWriter(object):
                 self._work_trees()
             else:
                 break
-            self.checkpoint_state()
-
-    def checkpoint_state(self):
-        # Subclasses can implement this method to, e.g., report or record state.
-        pass
 
     def _work_file(self):
         while True:
@@ -233,7 +261,11 @@ class CollectionWriter(object):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
                         else os.listdir)
-        self._queue_dirents(stream_name, make_dirents(path))
+        d = make_dirents(path)
+        if len(d) > 0:
+            self._queue_dirents(stream_name, d)
+        else:
+            self._queued_trees.popleft()
 
     def _queue_file(self, source, filename=None):
         assert (self._queued_file is None), "tried to queue more than one file"
@@ -256,12 +288,12 @@ class CollectionWriter(object):
 
     def write_file(self, source, filename=None):
         self._queue_file(source, filename)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write_directory_tree(self,
                              path, stream_name='.', max_manifest_depth=-1):
         self._queue_tree(path, stream_name, max_manifest_depth)
-        self._do_queued_work()
+        self.do_queued_work()
 
     def write(self, newdata):
         if hasattr(newdata, '__iter__'):
@@ -277,10 +309,11 @@ class CollectionWriter(object):
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer != '':
-            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+            self._prep_keep_client()
+            self._current_stream_locators.append(
+                self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
-            self.checkpoint_state()
 
     def start_new_file(self, newfilename=None):
         self.finish_current_file()
@@ -307,8 +340,8 @@ class CollectionWriter(object):
                  self._current_file_pos,
                  self._current_stream_name))
         self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
+                                        self._current_stream_length - self._current_file_pos,
+                                        self._current_file_name]]
         self._current_file_pos = self._current_stream_length
 
     def start_new_stream(self, newstreamname='.'):
@@ -337,8 +370,8 @@ class CollectionWriter(object):
             if len(self._current_stream_locators) == 0:
                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
             self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
+                                        self._current_stream_locators,
+                                        self._current_stream_files]]
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -347,7 +380,24 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        return Keep.put(self.manifest_text())
+        # Store the manifest in Keep and return its locator.
+        self._prep_keep_client()
+        return self._keep_client.put(self.manifest_text())
+
+    def stripped_manifest(self):
+        """
+        Return the manifest for the current collection with all permission
+        hints removed from the locators in the manifest.
+        """
+        raw = self.manifest_text()
+        clean = ''
+        for line in raw.split("\n"):
+            fields = line.split()
+            if len(fields) > 0:
+                locators = [ re.sub(r'\+A[a-z0-9@_-]+', '', x)
+                             for x in fields[1:-1] ]
+                clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
+        return clean
 
     def manifest_text(self):
         self.finish_current_stream()
@@ -361,10 +411,10 @@ class CollectionWriter(object):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        #print 'writer',manifest
-        #print 'after reader',CollectionReader(manifest).manifest_text()
-
-        return CollectionReader(manifest).manifest_text()
+        if len(manifest) > 0:
+            return CollectionReader(manifest).manifest_text()
+        else:
+            return ""
 
     def data_locators(self):
         ret = []
@@ -380,13 +430,19 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self):
+    def __init__(self, api_client=None):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__()
+        super(ResumableCollectionWriter, self).__init__(api_client)
 
     @classmethod
-    def from_state(cls, state):
-        writer = cls()
+    def from_state(cls, state, *init_args, **init_kwargs):
+        # Try to build a new writer from scratch with the given state.
+        # If the state is not suitable to resume (because files have changed,
+        # been deleted, aren't predictable, etc.), raise a
+        # StaleWriterStateError.  Otherwise, return the initialized writer.
+        # The caller is responsible for calling writer.do_queued_work()
+        # appropriately after it's returned.
+        writer = cls(*init_args, **init_kwargs)
         for attr_name in cls.STATE_PROPS:
             attr_value = state[attr_name]
             attr_class = getattr(writer, attr_name).__class__
@@ -396,6 +452,10 @@ class ResumableCollectionWriter(CollectionWriter):
                 attr_value = attr_class(attr_value)
             setattr(writer, attr_name, attr_value)
         # Check dependencies before we try to resume anything.
+        if any(KeepLocator(ls).permission_expired()
+               for ls in writer._current_stream_locators):
+            raise errors.StaleWriterStateError(
+                "locators include expired permission hint")
         writer.check_dependencies()
         if state['_current_file'] is not None:
             path, pos = state['_current_file']
@@ -405,7 +465,6 @@ class ResumableCollectionWriter(CollectionWriter):
             except IOError as error:
                 raise errors.StaleWriterStateError(
                     "failed to reopen active file {}: {}".format(path, error))
-        writer._do_queued_work()
         return writer
 
     def check_dependencies(self):
@@ -439,15 +498,22 @@ class ResumableCollectionWriter(CollectionWriter):
             raise errors.AssertionError("{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
-        except OSError as error:
-            raise errors.AssertionError(
-                "could not stat {}: {}".format(source, error))
+        except OSError as stat_error:
+            path_stat = None
         super(ResumableCollectionWriter, self)._queue_file(source, filename)
         fd_stat = os.fstat(self._queued_file.fileno())
-        if path_stat.st_ino != fd_stat.st_ino:
+        if not S_ISREG(fd_stat.st_mode):
+            # We won't be able to resume from this cache anyway, so don't
+            # worry about further checks.
+            self._dependencies[source] = tuple(fd_stat)
+        elif path_stat is None:
+            raise errors.AssertionError(
+                "could not stat {}: {}".format(source, stat_error))
+        elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
                 "{} changed between open and stat calls".format(source))
-        self._dependencies[src_path] = tuple(fd_stat)
+        else:
+            self._dependencies[src_path] = tuple(fd_stat)
 
     def write(self, data):
         if self._queued_file is None: