3198: New Collection/AravosFile design work in progress
[arvados.git] / sdk / python / arvados / collection.py
index fa782a1ac2e9beb74c0733d8d3d5439272159276..077115ea8bcf7dba14d15cca98d4001bd1200d59 100644 (file)
@@ -1,3 +1,4 @@
+import functools
 import logging
 import os
 import re
@@ -5,55 +6,15 @@ import re
 from collections import deque
 from stat import *
 
-from .arvfile import ArvadosFileBase
+from .arvfile import ArvadosFileBase, split, ArvadosFile
 from keep import *
-from .stream import StreamReader, split
+from .stream import StreamReader, normalize_stream
 import config
 import errors
 import util
 
 _logger = logging.getLogger('arvados.collection')
 
-def normalize_stream(s, stream):
-    stream_tokens = [s]
-    sortedfiles = list(stream.keys())
-    sortedfiles.sort()
-
-    blocks = {}
-    streamoffset = 0L
-    for f in sortedfiles:
-        for b in stream[f]:
-            if b[arvados.LOCATOR] not in blocks:
-                stream_tokens.append(b[arvados.LOCATOR])
-                blocks[b[arvados.LOCATOR]] = streamoffset
-                streamoffset += b[arvados.BLOCKSIZE]
-
-    if len(stream_tokens) == 1:
-        stream_tokens.append(config.EMPTY_BLOCK_LOCATOR)
-
-    for f in sortedfiles:
-        current_span = None
-        fout = f.replace(' ', '\\040')
-        for segment in stream[f]:
-            segmentoffset = blocks[segment[arvados.LOCATOR]] + segment[arvados.OFFSET]
-            if current_span is None:
-                current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-            else:
-                if segmentoffset == current_span[1]:
-                    current_span[1] += segment[arvados.SEGMENTSIZE]
-                else:
-                    stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-                    current_span = [segmentoffset, segmentoffset + segment[arvados.SEGMENTSIZE]]
-
-        if current_span is not None:
-            stream_tokens.append("{0}:{1}:{2}".format(current_span[0], current_span[1] - current_span[0], fout))
-
-        if not stream[f]:
-            stream_tokens.append("0:0:{0}".format(fout))
-
-    return stream_tokens
-
-
 class CollectionBase(object):
     def __enter__(self):
         return self
@@ -124,6 +85,7 @@ class CollectionReader(CollectionBase):
         else:
             raise errors.ArgumentError(
                 "Argument to CollectionReader must be a manifest or a collection UUID")
+        self._api_response = None
         self._streams = None
 
     def _populate_from_api_server(self):
@@ -138,10 +100,10 @@ class CollectionReader(CollectionBase):
             if self._api_client is None:
                 self._api_client = arvados.api('v1')
                 self._keep_client = None  # Make a new one with the new api.
-            c = self._api_client.collections().get(
+            self._api_response = self._api_client.collections().get(
                 uuid=self._manifest_locator).execute(
                 num_retries=self.num_retries)
-            self._manifest_text = c['manifest_text']
+            self._manifest_text = self._api_response['manifest_text']
             return None
         except Exception as e:
             return e
@@ -158,8 +120,6 @@ class CollectionReader(CollectionBase):
             return e
 
     def _populate(self):
-        if self._streams is not None:
-            return
         error_via_api = None
         error_via_keep = None
         should_try_keep = ((self._manifest_text is None) and
@@ -190,9 +150,27 @@ class CollectionReader(CollectionBase):
                          for sline in self._manifest_text.split("\n")
                          if sline]
 
-    def normalize(self):
-        self._populate()
+    def _populate_first(orig_func):
+        # Decorator for methods that read actual Collection data.
+        @functools.wraps(orig_func)
+        def wrapper(self, *args, **kwargs):
+            if self._streams is None:
+                self._populate()
+            return orig_func(self, *args, **kwargs)
+        return wrapper
+
+    @_populate_first
+    def api_response(self):
+        """api_response() -> dict or None
+
+        Returns information about this Collection fetched from the API server.
+        If the Collection exists in Keep but not the API server, currently
+        returns None.  Future versions may provide a synthetic response.
+        """
+        return self._api_response
 
+    @_populate_first
+    def normalize(self):
         # Rearrange streams
         streams = {}
         for s in self.all_streams():
@@ -213,6 +191,7 @@ class CollectionReader(CollectionBase):
             [StreamReader(stream, keep=self._my_keep()).manifest_text()
              for stream in self._streams])
 
+    @_populate_first
     def open(self, streampath, filename=None):
         """open(streampath[, filename]) -> file-like object
 
@@ -220,7 +199,6 @@ class CollectionReader(CollectionBase):
         single string or as two separate stream name and file name arguments.
         This method returns a file-like object to read that file.
         """
-        self._populate()
         if filename is None:
             streampath, filename = split(streampath)
         keep_client = self._my_keep()
@@ -238,8 +216,8 @@ class CollectionReader(CollectionBase):
             raise ValueError("file '{}' not found in Collection stream '{}'".
                              format(filename, streampath))
 
+    @_populate_first
     def all_streams(self):
-        self._populate()
         return [StreamReader(s, self._my_keep(), num_retries=self.num_retries)
                 for s in self._streams]
 
@@ -248,6 +226,7 @@ class CollectionReader(CollectionBase):
             for f in s.all_files():
                 yield f
 
+    @_populate_first
     def manifest_text(self, strip=False, normalize=False):
         if normalize:
             cr = CollectionReader(self.manifest_text())
@@ -256,7 +235,6 @@ class CollectionReader(CollectionBase):
         elif strip:
             return self.stripped_manifest()
         else:
-            self._populate()
             return self._manifest_text
 
 
@@ -284,8 +262,6 @@ class _WriterFile(ArvadosFileBase):
 
 
 class CollectionWriter(CollectionBase):
-    KEEP_BLOCK_SIZE = 2**26
-
     def __init__(self, api_client=None, num_retries=0):
         """Instantiate a CollectionWriter.
 
@@ -351,7 +327,7 @@ class CollectionWriter(CollectionBase):
 
     def _work_file(self):
         while True:
-            buf = self._queued_file.read(self.KEEP_BLOCK_SIZE)
+            buf = self._queued_file.read(config.KEEP_BLOCK_SIZE)
             if not buf:
                 break
             self.write(buf)
@@ -423,7 +399,7 @@ class CollectionWriter(CollectionBase):
         self._data_buffer.append(newdata)
         self._data_buffer_len += len(newdata)
         self._current_stream_length += len(newdata)
-        while self._data_buffer_len >= self.KEEP_BLOCK_SIZE:
+        while self._data_buffer_len >= config.KEEP_BLOCK_SIZE:
             self.flush_data()
 
     def open(self, streampath, filename=None):
@@ -459,8 +435,8 @@ class CollectionWriter(CollectionBase):
         data_buffer = ''.join(self._data_buffer)
         if data_buffer:
             self._current_stream_locators.append(
-                self._my_keep().put(data_buffer[0:self.KEEP_BLOCK_SIZE]))
-            self._data_buffer = [data_buffer[self.KEEP_BLOCK_SIZE:]]
+                self._my_keep().put(data_buffer[0:config.KEEP_BLOCK_SIZE]))
+            self._data_buffer = [data_buffer[config.KEEP_BLOCK_SIZE:]]
             self._data_buffer_len = len(self._data_buffer[0])
 
     def start_new_file(self, newfilename=None):
@@ -472,6 +448,10 @@ class CollectionWriter(CollectionBase):
             raise errors.AssertionError(
                 "Manifest filenames cannot contain whitespace: %s" %
                 newfilename)
+        elif re.search(r'\x00', newfilename):
+            raise errors.AssertionError(
+                "Manifest filenames cannot contain NUL characters: %s" %
+                newfilename)
         self._current_file_name = newfilename
 
     def current_file_name(self):
@@ -656,3 +636,94 @@ class ResumableCollectionWriter(CollectionWriter):
             raise errors.AssertionError(
                 "resumable writer can't accept unsourced data")
         return super(ResumableCollectionWriter, self).write(data)
+
+
+class Collection(object):
+    def __init__(self):
+        self.items = {}
+
+    def find_or_create(self, path):
+        p = path.split("/")
+        if p[0] == '.':
+            del p[0]
+
+        if len(p) > 0:
+            item = self.items.get(p[0])
+            if len(p) == 1:
+                # item must be a file
+                if item is None:
+                    # create new file
+                    item = ArvadosFile(p[0], 'wb', [], [])
+                    self.items[p[0]] = item
+                return item
+            else:
+                if item is None:
+                    # create new collection
+                    item = Collection()
+                    self.items[p[0]] = item
+                del p[0]
+                return item.find_or_create("/".join(p))
+        else:
+            return self
+
+
+def import_manifest(manifest_text):
+    c = Collection()
+
+    STREAM_NAME = 0
+    BLOCKS = 1
+    SEGMENTS = 2
+
+    stream_name = None
+    state = STREAM_NAME
+
+    for n in re.finditer(r'([^ \n]+)([ \n])', manifest_text):
+        tok = n.group(1)
+        sep = n.group(2)
+        if state == STREAM_NAME:
+            # starting a new stream
+            stream_name = tok.replace('\\040', ' ')
+            blocks = []
+            segments = []
+            streamoffset = 0L
+            state = BLOCKS
+            continue
+
+        if state == BLOCKS:
+            s = re.match(r'[0-9a-f]{32}\+(\d)+(\+\S+)*', tok)
+            if s:
+                blocksize = long(s.group(1))
+                blocks.append([tok, blocksize, streamoffset])
+                streamoffset += blocksize
+            else:
+                state = SEGMENTS
+
+        if state == SEGMENTS:
+            s = re.search(r'^(\d+):(\d+):(\S+)', tok)
+            if s:
+                pos = long(s.group(1))
+                size = long(s.group(2))
+                name = s.group(3).replace('\\040', ' ')
+                f = c.find_or_create("%s/%s" % (stream_name, name))
+                f.add_segment(blocks, pos, size)
+            else:
+                # error!
+                raise errors.SyntaxError("Invalid manifest format")
+
+        if sep == "\n":
+            stream_name = None
+            state = STREAM_NAME
+
+    return c
+
+def export_manifest(item, stream_name="."):
+    buf = ""
+    print item
+    if isinstance(item, Collection):
+        for i, j in item.items.values():
+            buf += export_manifest(j, stream_name)
+    else:
+        buf += stream_name
+        buf += " "
+        buf += item.segments
+    return buf