Merge branch '3187-pipeline-instance-page' into 3605-improved-dashboard
[arvados.git] / sdk / python / arvados / collection.py
index 8fdfcf86c3e8f4bf5e136802ce1329592a53d10b..496136ebe3c92116cc5ddf0340267b33969b5df0 100644 (file)
@@ -92,11 +92,16 @@ def normalize(collection):
 
 
 class CollectionReader(object):
-    def __init__(self, manifest_locator_or_text):
+    def __init__(self, manifest_locator_or_text, api_client=None, keep_client=None):
+        self._api_client = api_client
+        self._keep_client = keep_client
         if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
-        elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+', manifest_locator_or_text):
+        elif re.match(r'[a-z0-9]{5}-[a-z0-9]{5}-[a-z0-9]{15}$', manifest_locator_or_text):
+            self._manifest_locator = manifest_locator_or_text
+            self._manifest_text = None
+        elif re.match(r'((\S+)( +[a-f0-9]{32}(\+\d+)(\+\S+)*)+( +\d+:\d+:\S+)+$)+', manifest_locator_or_text, re.MULTILINE):
             self._manifest_text = manifest_locator_or_text
             self._manifest_locator = None
         else:
@@ -111,17 +116,33 @@ class CollectionReader(object):
         pass
 
     def _populate(self):
-        if self._streams != None:
+        if self._streams is not None:
             return
         if not self._manifest_text:
             try:
-                c = arvados.api('v1').collections().get(
+                # As in KeepClient itself, we must wait until the last possible
+                # moment to instantiate an API client, in order to avoid
+                # tripping up clients that don't have access to an API server.
+                # If we do build one, make sure our Keep client uses it.
+                # If instantiation fails, we'll fall back to the except clause,
+                # just like any other Collection lookup failure.
+                if self._api_client is None:
+                    self._api_client = arvados.api('v1')
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                if self._keep_client is None:
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                c = self._api_client.collections().get(
                     uuid=self._manifest_locator).execute()
                 self._manifest_text = c['manifest_text']
             except Exception as e:
+                if not util.portable_data_hash_pattern.match(
+                      self._manifest_locator):
+                    raise
                 _logger.warning("API lookup failed for collection %s (%s: %s)",
                                 self._manifest_locator, type(e), str(e))
-                self._manifest_text = Keep.get(self._manifest_locator)
+                if self._keep_client is None:
+                    self._keep_client = KeepClient(api_client=self._api_client)
+                self._manifest_text = self._keep_client.get(self._manifest_locator)
         self._streams = []
         for stream_line in self._manifest_text.split("\n"):
             if stream_line != '':
@@ -132,7 +153,7 @@ class CollectionReader(object):
         # now regenerate the manifest text based on the normalized stream
 
         #print "normalizing", self._manifest_text
-        self._manifest_text = ''.join([StreamReader(stream).manifest_text() for stream in self._streams])
+        self._manifest_text = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text() for stream in self._streams])
         #print "result", self._manifest_text
 
 
@@ -140,7 +161,7 @@ class CollectionReader(object):
         self._populate()
         resp = []
         for s in self._streams:
-            resp.append(StreamReader(s))
+            resp.append(StreamReader(s, keep=self._keep_client))
         return resp
 
     def all_files(self):
@@ -151,7 +172,7 @@ class CollectionReader(object):
     def manifest_text(self, strip=False):
         self._populate()
         if strip:
-            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            m = ''.join([StreamReader(stream, keep=self._keep_client).manifest_text(strip=True) for stream in self._streams])
             return m
         else:
             return self._manifest_text
@@ -159,7 +180,9 @@ class CollectionReader(object):
 class CollectionWriter(object):
     KEEP_BLOCK_SIZE = 2**26
 
-    def __init__(self):
+    def __init__(self, api_client=None):
+        self._api_client = api_client
+        self._keep_client = None
         self._data_buffer = []
         self._data_buffer_len = 0
         self._current_stream_files = []
@@ -180,6 +203,10 @@ class CollectionWriter(object):
     def __exit__(self):
         self.finish()
 
+    def _prep_keep_client(self):
+        if self._keep_client is None:
+            self._keep_client = KeepClient(api_client=self._api_client)
+
     def do_queued_work(self):
         # The work queue consists of three pieces:
         # * _queued_file: The file object we're currently writing to the
@@ -285,7 +312,9 @@ class CollectionWriter(object):
     def flush_data(self):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer != '':
-            self._current_stream_locators += [Keep.put(data_buffer[0:self.KEEP_BLOCK_SIZE])]
+            self._prep_keep_client()
+            self._current_stream_locators.append(
+                self._keep_client.put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
             self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
@@ -314,8 +343,8 @@ class CollectionWriter(object):
                  self._current_file_pos,
                  self._current_stream_name))
         self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
+                                        self._current_stream_length - self._current_file_pos,
+                                        self._current_file_name]]
         self._current_file_pos = self._current_stream_length
 
     def start_new_stream(self, newstreamname='.'):
@@ -344,8 +373,8 @@ class CollectionWriter(object):
             if len(self._current_stream_locators) == 0:
                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
             self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
+                                        self._current_stream_locators,
+                                        self._current_stream_files]]
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -354,9 +383,9 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        # Send the stripped manifest to Keep, to ensure that we use the
-        # same UUID regardless of what hints are used on the collection.
-        return Keep.put(self.stripped_manifest())
+        # Store the manifest in Keep and return its locator.
+        self._prep_keep_client()
+        return self._keep_client.put(self.manifest_text())
 
     def stripped_manifest(self):
         """
@@ -404,9 +433,9 @@ class ResumableCollectionWriter(CollectionWriter):
                    '_data_buffer', '_dependencies', '_finished_streams',
                    '_queued_dirents', '_queued_trees']
 
-    def __init__(self):
+    def __init__(self, api_client=None):
         self._dependencies = {}
-        super(ResumableCollectionWriter, self).__init__()
+        super(ResumableCollectionWriter, self).__init__(api_client)
 
     @classmethod
     def from_state(cls, state, *init_args, **init_kwargs):