13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / python / arvados / collection.py
index 6b33ea3030fa48d7f492d8f6df41abf896356e1b..627f0346db2c6760710db3edaf356f4cb724bf91 100644 (file)
@@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from past.builtins import basestring
 from builtins import object
+import ciso8601
+import datetime
+import errno
 import functools
+import hashlib
+import io
 import logging
 import os
 import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
 import threading
+import time
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
@@ -35,6 +37,21 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
+
+if sys.version_info >= (3, 0):
+    TextIOWrapper = io.TextIOWrapper
+else:
+    class TextIOWrapper(io.TextIOWrapper):
+        """To maintain backward compatibility, cast str to unicode in
+        write('foo').
+
+        """
+        def write(self, data):
+            if isinstance(data, basestring):
+                data = unicode(data)
+            return super(TextIOWrapper, self).write(data)
+
+
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
@@ -544,6 +562,23 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
+    @synchronized
+    def set_has_remote_blocks(self, val):
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -636,7 +671,7 @@ class RichCollectionBase(CollectionBase):
 
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode="r"):
+    def open(self, path, mode="r", encoding=None):
         """Open a file-like object for access.
 
         :path:
@@ -658,6 +693,7 @@ class RichCollectionBase(CollectionBase):
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
+
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
@@ -680,7 +716,12 @@ class RichCollectionBase(CollectionBase):
         if mode[0] == 'w':
             arvfile.truncate(0)
 
-        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
@@ -832,6 +873,8 @@ class RichCollectionBase(CollectionBase):
 
         self._items[target_name] = item
         self.set_committed(False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self.set_has_remote_blocks(True)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1037,18 +1080,8 @@ class RichCollectionBase(CollectionBase):
           different subdirectories.
 
         """
-        for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
-            for s in self[filename].segments():
-                if '+R' in s.locator:
-                    try:
-                        loc = remote_blocks[s.locator]
-                    except KeyError:
-                        loc = self._my_keep().refresh_signature(s.locator)
-                        remote_blocks[s.locator] = loc
-                    s.locator = loc
-                    self.set_committed(False)
-        for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
-            remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+        for item in self:
+            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
         return remote_blocks
 
     @synchronized
@@ -1285,8 +1318,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1536,10 +1573,11 @@ class Collection(RichCollectionBase):
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
-
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             elif not self._has_local_collection_uuid():
@@ -1628,8 +1666,10 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)