14885: Reverts back to parse_datetime_as_naive
[arvados.git] / sdk / python / arvados / collection.py
index f4502b7e668113a6eed7809f5e6552c2a298f055..5b1f1a2a677fa15dad5801635ff0e6f639985584 100644 (file)
@@ -7,24 +7,26 @@ from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from past.builtins import basestring
 from builtins import object
+import ciso8601
+import datetime
+import errno
 import functools
+import hashlib
+import io
 import logging
 import os
 import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
 import threading
+import time
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
-from ._normalize_stream import normalize_stream
+from ._normalize_stream import normalize_stream, escape
 from ._ranges import Range, LocatorAndRange
 from .safeapi import ThreadSafeApiCache
 import arvados.config as config
@@ -35,6 +37,21 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
+
+if sys.version_info >= (3, 0):
+    TextIOWrapper = io.TextIOWrapper
+else:
+    class TextIOWrapper(io.TextIOWrapper):
+        """To maintain backward compatibility, cast str to unicode in
+        write('foo').
+
+        """
+        def write(self, data):
+            if isinstance(data, basestring):
+                data = unicode(data)
+            return super(TextIOWrapper, self).write(data)
+
+
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -520,6 +537,7 @@ class RichCollectionBase(CollectionBase):
     def __init__(self, parent=None):
         self.parent = parent
         self._committed = False
+        self._has_remote_blocks = False
         self._callback = None
         self._items = {}
 
@@ -544,6 +562,24 @@ class RichCollectionBase(CollectionBase):
     def stream_name(self):
         raise NotImplementedError()
 
+
+    @synchronized
+    def has_remote_blocks(self):
+        """Recursively check for a +R segment locator signature."""
+
+        if self._has_remote_blocks:
+            return True
+        for item in self:
+            if self[item].has_remote_blocks():
+                return True
+        return False
+
+    @synchronized
+    def set_has_remote_blocks(self, val):
+        self._has_remote_blocks = val
+        if self.parent:
+            self.parent.set_has_remote_blocks(val)
+
     @must_be_writable
     @synchronized
     def find_or_create(self, path, create_type):
@@ -636,7 +672,7 @@ class RichCollectionBase(CollectionBase):
 
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode="r"):
+    def open(self, path, mode="r", encoding=None):
         """Open a file-like object for access.
 
         :path:
@@ -658,6 +694,7 @@ class RichCollectionBase(CollectionBase):
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
+
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
@@ -680,7 +717,12 @@ class RichCollectionBase(CollectionBase):
         if mode[0] == 'w':
             arvfile.truncate(0)
 
-        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
@@ -832,6 +874,8 @@ class RichCollectionBase(CollectionBase):
 
         self._items[target_name] = item
         self.set_committed(False)
+        if not self._has_remote_blocks and source_obj.has_remote_blocks():
+            self.set_has_remote_blocks(True)
 
         if modified_from:
             self.notify(MOD, self, target_name, (modified_from, item))
@@ -1015,7 +1059,9 @@ class RichCollectionBase(CollectionBase):
             if stream:
                 buf.append(" ".join(normalize_stream(stream_name, stream)) + "\n")
             for dirname in [s for s in sorted_keys if isinstance(self[s], RichCollectionBase)]:
-                buf.append(self[dirname].manifest_text(stream_name=os.path.join(stream_name, dirname), strip=strip, normalize=True, only_committed=only_committed))
+                buf.append(self[dirname].manifest_text(
+                    stream_name=os.path.join(stream_name, dirname),
+                    strip=strip, normalize=True, only_committed=only_committed))
             return "".join(buf)
         else:
             if strip:
@@ -1037,18 +1083,8 @@ class RichCollectionBase(CollectionBase):
           different subdirectories.
 
         """
-        for filename in [f for f in self.keys() if isinstance(self[f], ArvadosFile)]:
-            for s in self[filename].segments():
-                if '+R' in s.locator:
-                    try:
-                        loc = remote_blocks[s.locator]
-                    except KeyError:
-                        loc = self._my_keep().refresh_signature(s.locator)
-                        remote_blocks[s.locator] = loc
-                    s.locator = loc
-                    self.set_committed(False)
-        for dirname in [d for d in self.keys() if isinstance(self[d], RichCollectionBase)]:
-            remote_blocks = self[dirname]._copy_remote_blocks(remote_blocks)
+        for item in self:
+            remote_blocks = self[item]._copy_remote_blocks(remote_blocks)
         return remote_blocks
 
     @synchronized
@@ -1285,8 +1321,12 @@ class Collection(RichCollectionBase):
                 self._manifest_locator = manifest_locator_or_text
             elif re.match(arvados.util.collection_uuid_pattern, manifest_locator_or_text):
                 self._manifest_locator = manifest_locator_or_text
+                if not self._has_local_collection_uuid():
+                    self._has_remote_blocks = True
             elif re.match(arvados.util.manifest_pattern, manifest_locator_or_text):
                 self._manifest_text = manifest_locator_or_text
+                if '+R' in self._manifest_text:
+                    self._has_remote_blocks = True
             else:
                 raise errors.ArgumentError(
                     "Argument to CollectionReader is not a manifest or a collection UUID")
@@ -1307,7 +1347,10 @@ class Collection(RichCollectionBase):
 
     def get_trash_at(self):
         if self._api_response and self._api_response["trash_at"]:
-            return ciso8601.parse_datetime(self._api_response["trash_at"])
+            try:
+                return ciso8601.parse_datetime(self._api_response["trash_at"])
+            except ValueError:
+                return None
         else:
             return None
 
@@ -1536,10 +1579,11 @@ class Collection(RichCollectionBase):
             t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
             body["trash_at"] = t
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
-
         if not self.committed():
+            if self._has_remote_blocks:
+                # Copy any remote blocks to the local cluster.
+                self._copy_remote_blocks(remote_blocks={})
+                self._has_remote_blocks = False
             if not self._has_collection_uuid():
                 raise AssertionError("Collection manifest_locator is not a collection uuid.  Use save_new() for new collections.")
             elif not self._has_local_collection_uuid():
@@ -1628,8 +1672,10 @@ class Collection(RichCollectionBase):
         if trash_at and type(trash_at) is not datetime.datetime:
             raise errors.ArgumentError("trash_at must be datetime type.")
 
-        # Copy any remote blocks to the local cluster.
-        self._copy_remote_blocks(remote_blocks={})
+        if self._has_remote_blocks:
+            # Copy any remote blocks to the local cluster.
+            self._copy_remote_blocks(remote_blocks={})
+            self._has_remote_blocks = False
 
         self._my_block_manager().commit_all()
         text = self.manifest_text(strip=False)
@@ -1717,12 +1763,17 @@ class Collection(RichCollectionBase):
                     pos = int(file_segment.group(1))
                     size = int(file_segment.group(2))
                     name = self._unescape_manifest_path(file_segment.group(3))
-                    filepath = os.path.join(stream_name, name)
-                    afile = self.find_or_create(filepath, FILE)
-                    if isinstance(afile, ArvadosFile):
-                        afile.add_segment(blocks, pos, size)
+                    if name.split('/')[-1] == '.':
+                        # placeholder for persisting an empty directory, not a real file
+                        if len(name) > 2:
+                            self.find_or_create(os.path.join(stream_name, name[:-2]), COLLECTION)
                     else:
-                        raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
+                        filepath = os.path.join(stream_name, name)
+                        afile = self.find_or_create(filepath, FILE)
+                        if isinstance(afile, ArvadosFile):
+                            afile.add_segment(blocks, pos, size)
+                        else:
+                            raise errors.SyntaxError("File %s conflicts with stream of the same name.", filepath)
                 else:
                     # error!
                     raise errors.SyntaxError("Invalid manifest format, expected file segment but did not match format: '%s'" % tok)
@@ -1788,6 +1839,16 @@ class Subcollection(RichCollectionBase):
         self.name = newname
         self.lock = self.parent.root_collection().lock
 
+    @synchronized
+    def _get_manifest_text(self, stream_name, strip, normalize, only_committed=False):
+        """Encode empty directories by using an \056-named (".") empty file"""
+        if len(self._items) == 0:
+            return "%s %s 0:0:\\056\n" % (
+                escape(stream_name), config.EMPTY_BLOCK_LOCATOR)
+        return super(Subcollection, self)._get_manifest_text(stream_name,
+                                                             strip, normalize,
+                                                             only_committed)
+
 
 class CollectionReader(Collection):
     """A read-only collection object.