13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / python / arvados / collection.py
index e38a6bd475c7b8a4aee7787a537a40656fd93b36..627f0346db2c6760710db3edaf356f4cb724bf91 100644 (file)
@@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from past.builtins import basestring
 from builtins import object
 from builtins import str
 from past.builtins import basestring
 from builtins import object
+import ciso8601
+import datetime
+import errno
 import functools
 import functools
+import hashlib
+import io
 import logging
 import os
 import re
 import logging
 import os
 import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
 import threading
 import threading
+import time
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
@@ -35,6 +37,21 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
 
 _logger = logging.getLogger('arvados.collection')
 
+
+if sys.version_info >= (3, 0):
+    TextIOWrapper = io.TextIOWrapper
+else:
+    class TextIOWrapper(io.TextIOWrapper):
+        """To maintain backward compatibility, cast str to unicode in
+        write('foo').
+
+        """
+        def write(self, data):
+            if isinstance(data, basestring):
+                data = unicode(data)
+            return super(TextIOWrapper, self).write(data)
+
+
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
         self._callback = None
         self._items = {}
 
@@ -544,6 +562,23 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
     def stream_name(self):
         raise NotImplementedError()
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
+    @synchronized
+    def set_has_remote_blocks(self, val):
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -636,7 +671,7 @@ class RichCollectionBase(CollectionBase):
 
         return self.find_or_create(path, COLLECTION)
 
 
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode="r"):
+    def open(self, path, mode="r", encoding=None):
         """Open a file-like object for access.
 
         :path:
         """Open a file-like object for access.
 
         :path:
@@ -658,6 +693,7 @@ class RichCollectionBase(CollectionBase):
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
+
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
@@ -680,7 +716,12 @@ class RichCollectionBase(CollectionBase):
         if mode[0] == 'w':
             arvfile.truncate(0)
 
         if mode[0] == 'w':
             arvfile.truncate(0)
 
-        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
@@ -832,6 +873,8 @@ class RichCollectionBase(CollectionBase):
 
         self._items[target_name] = item
         self.set_committed(False)
 
         self._items[target_name] = item
         self.set_committed(False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self.set_has_remote_blocks(True)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1023,6 +1066,24 @@ class RichCollectionBase(CollectionBase):
             else:
                 return self._manifest_text
 
             else:
                 return self._manifest_text
 
+    @synchronized
+    def _copy_remote_blocks(self, remote_blocks={}):
+        """Scan through the entire collection and ask Keep to copy remote blocks.
+
+        When accessing a remote collection, blocks will have a remote signature
+        (+R instead of +A). Collect these signatures and request Keep to copy the
+        blocks to the local cluster, returning local (+A) signatures.
+
+        :remote_blocks:
+          Shared cache of remote to local block mappings. This is used to avoid
+          doing extra work when blocks are shared by more than one file in
+          different subdirectories.
+
+        """
+        for item in self:
+            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
+        return remote_blocks
+
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """Generate list of add/modify/delete actions.
     @synchronized
     def diff(self, end_collection, prefix=".", holding_collection=None):
         """Generate list of add/modify/delete actions.
@@ -1257,8 +1318,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1376,6 +1441,10 @@ class Collection(RichCollectionBase):
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
     def _has_collection_uuid(self):
         return self._manifest_locator is not None and re.match(arvados.util.collection_uuid_pattern, self._manifest_locator)
 
+    def _has_local_collection_uuid(self):
+        return self._has_collection_uuid and \
+            self._my_api()._rootDesc['uuidPrefix'] == self._manifest_locator.split('-')[0]
+
     def __enter__(self):
         return self
 
     def __enter__(self):
         return self
 
@@ -1505,8 +1574,14 @@ class Collection(RichCollectionBase):
             body["trash_at"] = t
 
         if not self.committed():
             body["trash_at"] = t
 
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
+            elif not self._has_local_collection_uuid():
+                raise AssertionError("Collection manifest_locator is from a remote cluster. Use save_new() to save it on the local cluster.")
 
             self._my_block_manager().commit_all()
 
 
             self._my_block_manager().commit_all()
 
@@ -1591,6 +1666,11 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
+
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
@@ -1627,6 +1707,9 @@ class Collection(RichCollectionBase):
     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
 
     _block_re = re.compile(r'[0-9a-f]{32}\+(\d+)(\+\S+)*')
     _segment_re = re.compile(r'(\d+):(\d+):(\S+)')
 
+    def _unescape_manifest_path(self, path):
+        return re.sub('\\\\([0-3][0-7][0-7])', lambda m: chr(int(m.group(1), 8)), path)
+
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
     @synchronized
     def _import_manifest(self, manifest_text):
         """Import a manifest into a `Collection`.
@@ -1651,7 +1734,7 @@ class Collection(RichCollectionBase):
 
             if state == STREAM_NAME:
                 # starting a new stream
 
             if state == STREAM_NAME:
                 # starting a new stream
-                stream_name = tok.replace('\\040', ' ')
+                stream_name = self._unescape_manifest_path(tok)
                 blocks = []
                 segments = []
                 streamoffset = 0
                 blocks = []
                 segments = []
                 streamoffset = 0
@@ -1673,13 +1756,18 @@ class Collection(RichCollectionBase):
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
                 if file_segment:
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
-                    name = file_segment.group(3).replace('\\040', ' ')
-                    filepath = os.path.join(stream_name, name)
-                    afile = self.find_or_create(filepath, FILE)
-                    if isinstance(afile, ArvadosFile):
-                        afile.add_segment(blocks, pos, size)
+                    name = self._unescape_manifest_path(file_segment.group(3))
+                    if name.split('/')[-1] == '.':
+                        # placeholder for persisting an empty directory, not a real file
+                        if len(name) > 2:
+                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
                     else:
-                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                        filepath = os.path.join(stream_name, name)
+                        afile = self.find_or_create(filepath, FILE)
+                        if isinstance(afile, ArvadosFile):
+                            afile.add_segment(blocks, pos, size)
+                        else:
+                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)