Merge branch '3604-theme-and-new-user-process' closes #3604
[arvados.git] / sdk / python / arvados / collection.py
index e7b26017d613ea8d0c792da9d228bb722ba9af44..7aed681504702dc3d37477b5f1c838cb5e10f7c6 100644 (file)
@@ -27,6 +27,8 @@ import config
 import errors
 import util
 
+_logger = logging.getLogger('arvados.collection')
+
 def normalize_stream(s, stream):
     stream_tokens = [s]
     sortedfiles = list(stream.keys())
@@ -41,6 +43,9 @@ def normalize_stream(s, stream):
                 blocks[b[arvados.LOCATOR]] = streamoffset
                 streamoffset += b[arvados.BLOCKSIZE]
 
+    if len(stream_tokens) == 1:
+        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
+
     for f in sortedfiles:
         current_span = None
         fout = f.replace(' ', '\\040')
@@ -88,10 +93,10 @@ def normalize(collection):
 
 class CollectionReader(object):
     def __init__(self, manifest_locator_or_text):
-        if re.search(r'^[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
+        if re.match(r'[a-f0-9]{32}(\+\d+)?(\+\S+)*$', manifest_locator_or_text):
             self._manifest_locator = manifest_locator_or_text
             self._manifest_text = None
-        elif re.search(r'^\S+( [a-f0-9]{32,}(\+\S+)*)*( \d+:\d+:\S+)+\n', manifest_locator_or_text):
+        elif re.match(r'(\S+)( [a-f0-9]{32}(\+\d+)(\+\S+)*)+( \d+:\d+:\S+)+\n', manifest_locator_or_text):
             self._manifest_text = manifest_locator_or_text
             self._manifest_locator = None
         else:
@@ -114,8 +119,8 @@ class CollectionReader(object):
                     uuid=self._manifest_locator).execute()
                 self._manifest_text = c['manifest_text']
             except Exception as e:
-                logging.warning("API lookup failed for collection %s (%s: %s)" %
-                                (self._manifest_locator, type(e), str(e)))
+                _logger.warning("API lookup failed for collection %s (%s: %s)",
+                                self._manifest_locator, type(e), str(e))
                 self._manifest_text = Keep.get(self._manifest_locator)
         self._streams = []
         for stream_line in self._manifest_text.split("\n"):
@@ -143,9 +148,13 @@ class CollectionReader(object):
             for f in s.all_files():
                 yield f
 
-    def manifest_text(self):
+    def manifest_text(self, strip=False):
         self._populate()
-        return self._manifest_text
+        if strip:
+            m = ''.join([StreamReader(stream).manifest_text(strip=True) for stream in self._streams])
+            return m
+        else:
+            return self._manifest_text
 
 class CollectionWriter(object):
     KEEP_BLOCK_SIZE = 2**26
@@ -228,7 +237,11 @@ class CollectionWriter(object):
         path, stream_name, max_manifest_depth = self._queued_trees[0]
         make_dirents = (util.listdir_recursive if (max_manifest_depth == 0)
                         else os.listdir)
-        self._queue_dirents(stream_name, make_dirents(path))
+        d = make_dirents(path)
+        if len(d) > 0:
+            self._queue_dirents(stream_name, d)
+        else:
+            self._queued_trees.popleft()
 
     def _queue_file(self, source, filename=None):
         assert (self._queued_file is None), "tried to queue more than one file"
@@ -301,8 +314,8 @@ class CollectionWriter(object):
                  self._current_file_pos,
                  self._current_stream_name))
         self._current_stream_files += [[self._current_file_pos,
-                                       self._current_stream_length - self._current_file_pos,
-                                       self._current_file_name]]
+                                        self._current_stream_length - self._current_file_pos,
+                                        self._current_file_name]]
         self._current_file_pos = self._current_stream_length
 
     def start_new_stream(self, newstreamname='.'):
@@ -331,8 +344,8 @@ class CollectionWriter(object):
             if len(self._current_stream_locators) == 0:
                 self._current_stream_locators += [config.EMPTY_BLOCK_LOCATOR]
             self._finished_streams += [[self._current_stream_name,
-                                       self._current_stream_locators,
-                                       self._current_stream_files]]
+                                        self._current_stream_locators,
+                                        self._current_stream_files]]
         self._current_stream_files = []
         self._current_stream_length = 0
         self._current_stream_locators = []
@@ -341,9 +354,8 @@ class CollectionWriter(object):
         self._current_file_name = None
 
     def finish(self):
-        # Send the stripped manifest to Keep, to ensure that we use the
-        # same UUID regardless of what hints are used on the collection.
-        return Keep.put(self.stripped_manifest())
+        # Store the manifest in Keep and return its locator.
+        return Keep.put(self.manifest_text())
 
     def stripped_manifest(self):
         """
@@ -359,7 +371,7 @@ class CollectionWriter(object):
                              for x in fields[1:-1] ]
                 clean += fields[0] + ' ' + ' '.join(locators) + ' ' + fields[-1] + "\n"
         return clean
-        
+
     def manifest_text(self):
         self.finish_current_stream()
         manifest = ''
@@ -372,10 +384,10 @@ class CollectionWriter(object):
             manifest += ' ' + ' '.join("%d:%d:%s" % (sfile[0], sfile[1], sfile[2].replace(' ', '\\040')) for sfile in stream[2])
             manifest += "\n"
 
-        #print 'writer',manifest
-        #print 'after reader',CollectionReader(manifest).manifest_text()
-
-        return CollectionReader(manifest).manifest_text()
+        if len(manifest) > 0:
+            return CollectionReader(manifest).manifest_text()
+        else:
+            return ""
 
     def data_locators(self):
         ret = []