Merge branch '18027-unmount-fuse'
[arvados.git] / sdk / python / arvados / collection.py
index f4502b7e668113a6eed7809f5e6552c2a298f055..50cb703a56a5a0dc66a068593fc4d3ed4a855166 100644 (file)
@@ -7,24 +7,26 @@ from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from past.builtins import basestring
 from builtins import object
 from builtins import str
 from past.builtins import basestring
 from builtins import object
+import ciso8601
+import datetime
+import errno
 import functools
 import functools
+import hashlib
+import io
 import logging
 import os
 import re
 import logging
 import os
 import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
 import threading
 import threading
+import time
 
 from collections import deque
 from stat import *
 
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import arvados.config as config
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import arvados.config as config
@@ -35,6 +37,21 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
 
 _logger = logging.getLogger('arvados.collection')
 
+
+if sys.version_info >= (3, 0):
+    TextIOWrapper = io.TextIOWrapper
+else:
+    class TextIOWrapper(io.TextIOWrapper):
+        """To maintain backward compatibility, cast str to unicode in
+        write('foo').
+
+        """
+        def write(self, data):
+            if isinstance(data, basestring):
+                data = unicode(data)
+            return super(TextIOWrapper, self).write(data)
+
+
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -266,7 +283,7 @@ class CollectionWriter(CollectionBase):
             streampath, filename = split(streampath)
         if self._last_open and not self._last_open.closed:
             raise errors.AssertionError(
             streampath, filename = split(streampath)
         if self._last_open and not self._last_open.closed:
             raise errors.AssertionError(
-                "can't open '{}' when '{}' is still open".format(
+                u"can't open '{}' when '{}' is still open".format(
                     filename, self._last_open.name))
         if streampath != self.current_stream_name():
             self.start_new_stream(streampath)
                     filename, self._last_open.name))
         if streampath != self.current_stream_name():
             self.start_new_stream(streampath)
@@ -444,22 +461,22 @@ class ResumableCollectionWriter(CollectionWriter):
                 writer._queued_file.seek(pos)
             except IOError as error:
                 raise errors.StaleWriterStateError(
                 writer._queued_file.seek(pos)
             except IOError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to reopen active file {}: {}".format(path, error))
+                    u"failed to reopen active file {}: {}".format(path, error))
         return writer
 
     def check_dependencies(self):
         for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
         return writer
 
     def check_dependencies(self):
         for path, orig_stat in listitems(self._dependencies):
             if not S_ISREG(orig_stat[ST_MODE]):
-                raise errors.StaleWriterStateError("{} not file".format(path))
+                raise errors.StaleWriterStateError(u"{} not file".format(path))
             try:
                 now_stat = tuple(os.stat(path))
             except OSError as error:
                 raise errors.StaleWriterStateError(
             try:
                 now_stat = tuple(os.stat(path))
             except OSError as error:
                 raise errors.StaleWriterStateError(
-                    "failed to stat {}: {}".format(path, error))
+                    u"failed to stat {}: {}".format(path, error))
             if ((not S_ISREG(now_stat[ST_MODE])) or
                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
             if ((not S_ISREG(now_stat[ST_MODE])) or
                 (orig_stat[ST_MTIME] != now_stat[ST_MTIME]) or
                 (orig_stat[ST_SIZE] != now_stat[ST_SIZE])):
-                raise errors.StaleWriterStateError("{} changed".format(path))
+                raise errors.StaleWriterStateError(u"{} changed".format(path))
 
     def dump_state(self, copy_func=lambda x: x):
         state = {attr: copy_func(getattr(self, attr))
 
     def dump_state(self, copy_func=lambda x: x):
         state = {attr: copy_func(getattr(self, attr))
@@ -475,7 +492,7 @@ class ResumableCollectionWriter(CollectionWriter):
         try:
             src_path = os.path.realpath(source)
         except Exception:
         try:
             src_path = os.path.realpath(source)
         except Exception:
-            raise errors.AssertionError("{} not a file path".format(source))
+            raise errors.AssertionError(u"{} not a file path".format(source))
         try:
             path_stat = os.stat(src_path)
         except OSError as stat_error:
         try:
             path_stat = os.stat(src_path)
         except OSError as stat_error:
@@ -488,10 +505,10 @@ class ResumableCollectionWriter(CollectionWriter):
             self._dependencies[source] = tuple(fd_stat)
         elif path_stat is None:
             raise errors.AssertionError(
             self._dependencies[source] = tuple(fd_stat)
         elif path_stat is None:
             raise errors.AssertionError(
-                "could not stat {}: {}".format(source, stat_error))
+                u"could not stat {}: {}".format(source, stat_error))
         elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
         elif path_stat.st_ino != fd_stat.st_ino:
             raise errors.AssertionError(
-                "{} changed between open and stat calls".format(source))
+                u"{} changed between open and stat calls".format(source))
         else:
             self._dependencies[src_path] = tuple(fd_stat)
 
         else:
             self._dependencies[src_path] = tuple(fd_stat)
 
@@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
         self._callback = None
         self._items = {}
 
@@ -544,6 +562,24 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
     def stream_name(self):
         raise NotImplementedError()
 
+
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
+    @synchronized
+    def set_has_remote_blocks(self, val):
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -636,7 +672,7 @@ class RichCollectionBase(CollectionBase):
 
         return self.find_or_create(path, COLLECTION)
 
 
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode="r"):
+    def open(self, path, mode="r", encoding=None):
         """Open a file-like object for access.
 
         :path:
         """Open a file-like object for access.
 
         :path:
@@ -658,6 +694,7 @@ class RichCollectionBase(CollectionBase):
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
+
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
@@ -680,7 +717,12 @@ class RichCollectionBase(CollectionBase):
         if mode[0] == 'w':
             arvfile.truncate(0)
 
         if mode[0] == 'w':
             arvfile.truncate(0)
 
-        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
@@ -832,6 +874,8 @@ class RichCollectionBase(CollectionBase):
 
         self._items[target_name] = item
         self.set_committed(False)
 
         self._items[target_name] = item
         self.set_committed(False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self.set_has_remote_blocks(True)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1015,7 +1059,9 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+                buf.append(self[dirname].manifest_text(
+                    stream_name=os.path.join(stream_name, dirname),
+                    strip=strip, normalize=True, only_committed=only_committed))
             return "".join(buf)
         else:
             if strip:
             return "".join(buf)
         else:
             if strip:
@@ -1037,18 +1083,8 @@ class RichCollectionBase(CollectionBase):
           different subdirectories.
 
         """
           different subdirectories.
 
         """
-        for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
-            for s in self[filename].segments():
-                if '+R' in s.locator:
-                    try:
-                        loc = remote_blocks[s.locator]
-                    except KeyError:
-                        loc = self._my_keep().refresh_signature(s.locator)
-                        remote_blocks[s.locator] = loc
-                    s.locator = loc
-                    self.set_committed(False)
-        for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
-            remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+        for item in self:
+            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
         return remote_blocks
 
     @synchronized
         return remote_blocks
 
     @synchronized
@@ -1225,6 +1261,7 @@ class Collection(RichCollectionBase):
                  apiconfig=None,
                  block_manager=None,
                  replication_desired=None,
                  apiconfig=None,
                  block_manager=None,
                  replication_desired=None,
+                 storage_classes_desired=None,
                  put_threads=None):
         """Collection constructor.
 
                  put_threads=None):
         """Collection constructor.
 
@@ -1257,12 +1294,22 @@ class Collection(RichCollectionBase):
           configuration applies. If not None, this value will also be used
           for determining the number of block copies being written.
 
           configuration applies. If not None, this value will also be used
           for determining the number of block copies being written.
 
+        :storage_classes_desired:
+          A list of storage class names where to upload the data. If None,
+          the keep client is expected to store the data into the cluster's
+          default storage class(es).
+
         """
         """
+
+        if storage_classes_desired and type(storage_classes_desired) is not list:
+            raise errors.ArgumentError("storage_classes_desired must be list type.")
+
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
         super(Collection, self).__init__(parent)
         self._api_client = api_client
         self._keep_client = keep_client
         self._block_manager = block_manager
         self.replication_desired = replication_desired
+        self._storage_classes_desired = storage_classes_desired
         self.put_threads = put_threads
 
         if apiconfig:
         self.put_threads = put_threads
 
         if apiconfig:
@@ -1285,8 +1332,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1296,6 +1347,9 @@ class Collection(RichCollectionBase):
             except (IOError, errors.SyntaxError) as e:
                 raise errors.ArgumentError("Error processing manifest text: %s", e)
 
             except (IOError, errors.SyntaxError) as e:
                 raise errors.ArgumentError("Error processing manifest text: %s", e)
 
+    def storage_classes_desired(self):
+        return self._storage_classes_desired or []
+
     def root_collection(self):
         return self
 
     def root_collection(self):
         return self
 
@@ -1307,7 +1361,10 @@ class Collection(RichCollectionBase):
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
-            return ciso8601.parse_datetime(self._api_response["trash_at"])
+            try:
+                return ciso8601.parse_datetime(self._api_response["trash_at"])
+            except ValueError:
+                return None
         else:
             return None
 
         else:
             return None
 
@@ -1367,7 +1424,7 @@ class Collection(RichCollectionBase):
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
             copies = (self.replication_desired or
                       self._my_api()._rootDesc.get('defaultCollectionReplication',
                                                    2))
-            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads)
+            self._block_manager = _BlockManager(self._my_keep(), copies=copies, put_threads=self.put_threads, num_retries=self.num_retries, storage_classes_func=self.storage_classes_desired)
         return self._block_manager
 
     def _remember_api_response(self, response):
         return self._block_manager
 
     def _remember_api_response(self, response):
@@ -1388,9 +1445,11 @@ class Collection(RichCollectionBase):
         self._manifest_text = self._api_response['manifest_text']
         self._portable_data_hash = self._api_response['portable_data_hash']
         # If not overriden via kwargs, we should try to load the
         self._manifest_text = self._api_response['manifest_text']
         self._portable_data_hash = self._api_response['portable_data_hash']
         # If not overriden via kwargs, we should try to load the
-        # replication_desired from the API server
+        # replication_desired and storage_classes_desired from the API server
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
         if self.replication_desired is None:
             self.replication_desired = self._api_response.get('replication_desired', None)
+        if self._storage_classes_desired is None:
+            self._storage_classes_desired = self._api_response.get('storage_classes_desired', None)
 
     def _populate(self):
         if self._manifest_text is None:
 
     def _populate(self):
         if self._manifest_text is None:
@@ -1523,6 +1582,8 @@ class Collection(RichCollectionBase):
 
         if storage_classes and type(storage_classes) is not list:
             raise errors.ArgumentError("storage_classes must be list type.")
 
         if storage_classes and type(storage_classes) is not list:
             raise errors.ArgumentError("storage_classes must be list type.")
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
@@ -1530,16 +1591,17 @@ class Collection(RichCollectionBase):
         body={}
         if properties:
             body["properties"] = properties
         body={}
         if properties:
             body["properties"] = properties
-        if storage_classes:
-            body["storage_classes_desired"] = storage_classes
+        if self.storage_classes_desired():
+            body["storage_classes_desired"] = self.storage_classes_desired()
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
         if trash_at:
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
-
         if not self.committed():
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             elif not self._has_local_collection_uuid():
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             elif not self._has_local_collection_uuid():
@@ -1628,8 +1690,13 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
+
+        if storage_classes:
+            self._storage_classes_desired = storage_classes
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
@@ -1646,8 +1713,8 @@ class Collection(RichCollectionBase):
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
                 body["owner_uuid"] = owner_uuid
             if properties:
                 body["properties"] = properties
-            if storage_classes:
-                body["storage_classes_desired"] = storage_classes
+            if self.storage_classes_desired():
+                body["storage_classes_desired"] = self.storage_classes_desired()
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
             if trash_at:
                 t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
                 body["trash_at"] = t
@@ -1717,12 +1784,17 @@ class Collection(RichCollectionBase):
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
                     name = self._unescape_manifest_path(file_segment.group(3))
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
                     name = self._unescape_manifest_path(file_segment.group(3))
-                    filepath = os.path.join(stream_name, name)
-                    afile = self.find_or_create(filepath, FILE)
-                    if isinstance(afile, ArvadosFile):
-                        afile.add_segment(blocks, pos, size)
+                    if name.split('/')[-1] == '.':
+                        # placeholder for persisting an empty directory, not a real file
+                        if len(name) > 2:
+                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
                     else:
-                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                        filepath = os.path.join(stream_name, name)
+                        afile = self.find_or_create(filepath, FILE)
+                        if isinstance(afile, ArvadosFile):
+                            afile.add_segment(blocks, pos, size)
+                        else:
+                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
@@ -1788,6 +1860,16 @@ class Subcollection(RichCollectionBase):
         self.name = newname
         self.lock = self.parent.root_collection().lock
 
         self.name = newname
         self.lock = self.parent.root_collection().lock
 
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+        """Encode empty directories by using an \056-named (".") empty file"""
+        if len(self._items) == 0:
+            return "%s %s 0:0:\\056\n" % (
+                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+        return super(Subcollection, self)._get_manifest_text(stream_name,
+                                                             strip, normalize,
+                                                             only_committed)
+
 
 class CollectionReader(Collection):
     """A read-only collection object.
 
 class CollectionReader(Collection):
     """A read-only collection object.