3706: Consolidate more regular expressions into util package.
[arvados.git] / sdk / python / arvados / collection.py
index e7b26017d613ea8d0c792da9d228bb722ba9af44..874c38e79e7211de89e8f32983c6476b13038eaf 100644 (file)
@@ -27,6 +27,8 @@ import config
 import errors
 import util
 
+_logger = logging.getLogger('arvados.collection')
+
 def normalize_stream(s, stream):
     stream_tokens = [s]
     sortedfiles = list(stream.keys())
@@ -41,6 +43,9 @@ def normalize_stream(s, stream):
                 blocks[b[arvados.LOCATOR]] = streamoffset
                 streamoffset += b[arvados.BLOCKSIZE]
 
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
     for f in sortedfiles:
         current_span = None
         fout = f.replace(' ', '\\040')
@@ -86,12 +91,52 @@ def normalize(collection):
     return normalized_streams
 
 
-class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text):
-        if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+class CollectionBase(object):
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        pass
+
+    def _my_keep(self):
+        if self._keep_client is None:
+            self._keep_client = KeepClient(api_client=self._api_client,
+                                           num_retries=self.num_retries)
+        return self._keep_client
+
+
+class CollectionReader(CollectionBase):
+    def __init__(self, manifest_locator_or_text, api_client=None,
+                 keep_client=None, num_retries=0):
+        """Instantiate a CollectionReader.
+
+        This class parses Collection manifests to provide a simple interface
+        to read its underlying files.
+
+        Arguments:
+        * manifest_locator_or_text: One of a Collection UUID, portable data
+          hash, or full manifest text.
+        * api_client: The API client to use to look up Collections.  If not
+          provided, CollectionReader will build one from available Arvados
+          configuration.
+        * keep_client: The KeepClient to use to download Collection data.
+          If not provided, CollectionReader will build one from available
+          Arvados configuration.
+        * num_retries: The default number of times to retry failed
+          service requests.  Default 0.  You may change this value
+          after instantiation, but note those changes may not
+          propagate to related objects like the Keep client.
+        """
+        self._api_client = api_client
+        self._keep_client = keep_client
+        self.num_retries = num_retries
+        if re.match(util.keep_locator_pattern, manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
-        elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+        elif re.match(util.collection_uuid_pattern, manifest_locator_or_text):
+            self._manifest_locator = manifest_locator_or_text
+            self._manifest_text = None
+        elif re.match(util.manifest_pattern, manifest_locator_or_text):
             self._manifest_text = manifest_locator_or_text
             self._manifest_locator = None
         else:
@@ -99,58 +144,98 @@ class CollectionReader(object):
                 "Argument to CollectionReader must be a manifest or a collection UUID")
         self._streams = None
 
-    def __enter__(self):
-        pass
-
-    def __exit__(self):
-        pass
-
     def _populate(self):
-        if self._streams != None:
+        if self._streams is not None:
             return
+        error_via_api = None
+        error_via_keep = None
+        should_try_keep = (not self._manifest_text and
+                           util.keep_locator_pattern.match(
+                self._manifest_locator))
+        if (not self._manifest_text and
+            util.signed_locator_pattern.match(self._manifest_locator)):
+            try:
+                self._populate_from_keep()
+            except e:
+                error_via_keep = e
         if not self._manifest_text:
             try:
-                c = arvados.api('v1').collections().get(
-                    uuid=self._manifest_locator).execute()
-                self._manifest_text = c['manifest_text']
+                self._populate_from_api_server()
+            except Exception as e:
+                if not should_try_keep:
+                    raise
+                error_via_api = e
+        if (not self._manifest_text and
+            not error_via_keep and
+            should_try_keep):
+            # Looks like a keep locator, and we didn't already try keep above
+            try:
+                self._populate_from_keep()
             except Exception as e:
-                logging.warning("API lookup failed for collection %s (%s: %s)" %
-                                (self._manifest_locator, type(e), str(e)))
-                self._manifest_text = Keep.get(self._manifest_locator)
-        self._streams = []
-        for stream_line in self._manifest_text.split("\n"):
-            if stream_line != '':
-                stream_tokens = stream_line.split()
-                self._streams += [stream_tokens]
+                error_via_keep = e
+        if not self._manifest_text:
+            # Nothing worked!
+            raise arvados.errors.NotFoundError(
+                ("Failed to retrieve collection '{}' " +
+                 "from either API server ({}) or Keep ({})."
+                 ).format(
+                    self._manifest_locator,
+                    error_via_api,
+                    error_via_keep))
+        self._streams = [sline.split()
+                         for sline in self._manifest_text.split("\n")
+                         if sline]
         self._streams = normalize(self)
 
         # now regenerate the manifest text based on the normalized stream
 
         #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+        self._manifest_text = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text() for stream in self._streams])
         #print "result", self._manifest_text
 
 
     def all_streams(self):
         self._populate()
-        resp = []
-        for s in self._streams:
-            resp.append(StreamReader(s))
-        return resp
+        return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
+                for s in self._streams]
 
     def all_files(self):
         for s in self.all_streams():
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self):
+    def manifest_text(self, strip=False):
         self._populate()
-        return self._manifest_text
+        if strip:
+            m = ''.join([StreamReader(stream, keep=self._my_keep()).manifest_text(strip=True) for stream in self._streams])
+            return m
+        else:
+            return self._manifest_text
+
 
-class CollectionWriter(object):
+class CollectionWriter(CollectionBase):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self):
+    def __init__(self, api_client=None, num_retries=0):
+        """Instantiate a CollectionWriter.
+
+        CollectionWriter lets you build a new Arvados Collection from scratch.
+        Write files to it.  The CollectionWriter will upload data to Keep as
+        appropriate, and provide you with the Collection manifest text when
+        you're finished.
+
+        Arguments:
+        * api_client: The API client to use to look up Collections.  If not
+          provided, CollectionReader will build one from available Arvados
+          configuration.
+        * num_retries: The default number of times to retry failed
+          service requests.  Default 0.  You may change this value
+          after instantiation, but note those changes may not
+          propagate to related objects like the Keep client.
+        """
+        self._api_client = api_client
+        self.num_retries = num_retries
+        self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._current_stream_files = []
@@ -165,9 +250,6 @@ class CollectionWriter(object):
         self._queued_dirents = deque()
         self._queued_trees = deque()
 
-    def __enter__(self):
-        pass
-
     def __exit__(self):
         self.finish()
 
@@ -228,7 +310,11 @@ class CollectionWriter(object):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
                         else os.listdir)
-        self._queue_dirents(stream_name, make_dirents(path))
+        d = make_dirents(path)
+        if len(d) > 0:
+            self._queue_dirents(stream_name, d)
+        else:
+            self._queued_trees.popleft()
 
     def _queue_file(self, source, filename=None):
         assert (self._queued_file is None), "tried to queue more than one file"
@@ -263,7 +349,7 @@ class CollectionWriter(object):
             for s in newdata:
                 self.write(s)
             return
-        self._data_buffer += [newdata]
+        self._data_buffer.append(newdata)
         self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
         while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
@@ -271,8 +357,9 @@ class CollectionWriter(object):
 
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
-        if data_buffer != '':
-            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+        if data_buffer:
+            self._current_stream_locators.append(
+                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -300,10 +387,12 @@ class CollectionWriter(object):
                 (self._current_stream_length - self._current_file_pos,
                  self._current_file_pos,
                  self._current_stream_name))
-        self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
+        self._current_stream_files.append([
+                self._current_file_pos,
+                self._current_stream_length - self._current_file_pos,
+                self._current_file_name])
         self._current_file_pos = self._current_stream_length
+        self._current_file_name = None
 
     def start_new_stream(self, newstreamname='.'):
         self.finish_current_stream()
@@ -321,18 +410,18 @@ class CollectionWriter(object):
     def finish_current_stream(self):
         self.finish_current_file()
         self.flush_data()
-        if len(self._current_stream_files) == 0:
+        if not self._current_stream_files:
             pass
-        elif self._current_stream_name == None:
+        elif self._current_stream_name is None:
             raise errors.AssertionError(
                 "Cannot finish an unnamed stream (%d bytes in %d files)" %
                 (self._current_stream_length, len(self._current_stream_files)))
         else:
-            if len(self._current_stream_locators) == 0:
-                self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
-            self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
+            if not self._current_stream_locators:
+                self._current_stream_locators.append(config.EMPTY_BLOCK_LOCATOR)
+            self._finished_streams.append([self._current_stream_name,
+                                           self._current_stream_locators,
+                                           self._current_stream_files])
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -341,9 +430,8 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        # Send the stripped manifest to Keep, to ensure that we use the
-        # same UUID regardless of what hints are used on the collection.
-        return Keep.put(self.stripped_manifest())
+        # Store the manifest in Keep and return its locator.
+        return self._my_keep().put(self.manifest_text())
 
     def stripped_manifest(self):
         """
@@ -359,7 +447,7 @@ class CollectionWriter(object):
                              for x in fields[1:-1] ]
                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
         return clean
-        
+
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
@@ -372,10 +460,10 @@ class CollectionWriter(object):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        #print 'writer',manifest
-        #print 'after reader',CollectionReader(manifest).manifest_text()
-
-        return CollectionReader(manifest).manifest_text()
+        if manifest:
+            return CollectionReader(manifest, self._api_client).manifest_text()
+        else:
+            return ""
 
     def data_locators(self):
         ret = []
@@ -391,9 +479,10 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self):
+    def __init__(self, api_client=None, num_retries=0):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__()
+        super(ResumableCollectionWriter, self).__init__(
+            api_client, num_retries=num_retries)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):