15061: Add warning about clusters that are not in tokens file.
[arvados.git] / sdk / python / arvados / arvfile.py
index aa6bdad90bea0551f7043fbdd0889f6dea2ff6a8..37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b 100644 (file)
@@ -88,9 +88,6 @@ class _FileLikeObjectBase(object):
 class ArvadosFileReaderBase(_FileLikeObjectBase):
     def __init__(self, name, mode, num_retries=None):
         super(ArvadosFileReaderBase, self).__init__(name, mode)
-        self._binary = 'b' in mode
-        if sys.version_info >= (3, 0) and not self._binary:
-            raise NotImplementedError("text mode {!r} is not implemented".format(mode))
         self._filepos = 0
         self.num_retries = num_retries
         self._readline_cache = (None, None)
@@ -866,6 +863,9 @@ class ArvadosFile(object):
 
     """
 
+    __slots__ = ('parent', 'name', '_writers', '_committed',
+                 '_segments', 'lock', '_current_bblock', 'fuse_entry')
+
     def __init__(self, parent, name, stream=[], segments=[]):
         """
         ArvadosFile constructor.
@@ -897,6 +897,38 @@ class ArvadosFile(object):
                 return True
         return False
 
+    @synchronized
+    def has_remote_blocks(self):
+        """Returns True if any of the segment's locators has a +R signature"""
+
+        for s in self._segments:
+            if '+R' in s.locator:
+                return True
+        return False
+
+    @synchronized
+    def _copy_remote_blocks(self, remote_blocks={}):
+        """Ask Keep to copy remote blocks and point to their local copies.
+
+        This is called from the parent Collection.
+
+        :remote_blocks:
+            Shared cache of remote to local block mappings. This is used to avoid
+            doing extra work when blocks are shared by more than one file in
+            different subdirectories.
+        """
+
+        for s in self._segments:
+            if '+R' in s.locator:
+                try:
+                    loc = remote_blocks[s.locator]
+                except KeyError:
+                    loc = self.parent._my_keep().refresh_signature(s.locator)
+                    remote_blocks[s.locator] = loc
+                s.locator = loc
+                self.parent.set_committed(False)
+        return remote_blocks
+
     @synchronized
     def segments(self):
         return copy.copy(self._segments)
@@ -1243,6 +1275,11 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     def stream_name(self):
         return self.arvadosfile.parent.stream_name()
 
+    def readinto(self, b):
+        data = self.read(len(b))
+        b[:len(data)] = data
+        return len(data)
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def read(self, size=None, num_retries=None):
@@ -1321,3 +1358,33 @@ class ArvadosFileWriter(ArvadosFileReader):
         if not self.closed:
             self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+    """An interface to an Arvados file that's compatible with io wrappers.
+
+    """
+    def __init__(self, f):
+        self.f = f
+        self.closed = False
+    def close(self):
+        self.closed = True
+        return self.f.close()
+    def flush(self):
+        return self.f.flush()
+    def read(self, *args, **kwargs):
+        return self.f.read(*args, **kwargs)
+    def readable(self):
+        return self.f.readable()
+    def readinto(self, *args, **kwargs):
+        return self.f.readinto(*args, **kwargs)
+    def seek(self, *args, **kwargs):
+        return self.f.seek(*args, **kwargs)
+    def seekable(self):
+        return self.f.seekable()
+    def tell(self):
+        return self.f.tell()
+    def writable(self):
+        return self.f.writable()
+    def write(self, *args, **kwargs):
+        return self.f.write(*args, **kwargs)