12684: Support num_retries in PySDK client constructors
[arvados.git] / services / fuse / arvados_fuse / __init__.py
index 1ca3a0dc16cd21c2e28dfe32b6ebeffc3933b5cb..31afcda8d12267970631372014706793ef95c9f3 100644 (file)
@@ -49,31 +49,33 @@ an object that is live in the inode cache, that object is immediately updated.
 
 """
 
+from __future__ import absolute_import
+from __future__ import division
+from future.utils import viewitems
+from future.utils import native
+from future.utils import listvalues
+from future.utils import listitems
+from future import standard_library
+standard_library.install_aliases()
+from builtins import next
+from builtins import str
+from builtins import object
 import os
-import sys
 import llfuse
 import errno
 import stat
 import threading
 import arvados
-import pprint
 import arvados.events
-import re
-import apiclient
-import json
 import logging
 import time
-import _strptime
-import calendar
 import threading
 import itertools
-import ciso8601
 import collections
 import functools
 import arvados.keep
 from prometheus_client import Summary
-
-import Queue
+import queue
 
 # Default _notify_queue has a limit of 1000 items, but it really needs to be
 # unlimited to avoid deadlocks, see https://arvados.org/issues/3198#note-43 for
@@ -81,15 +83,15 @@ import Queue
 
 if hasattr(llfuse, 'capi'):
     # llfuse < 0.42
-    llfuse.capi._notify_queue = Queue.Queue()
+    llfuse.capi._notify_queue = queue.Queue()
 else:
     # llfuse >= 0.42
-    llfuse._notify_queue = Queue.Queue()
+    llfuse._notify_queue = queue.Queue()
 
 LLFUSE_VERSION_0 = llfuse.__version__.startswith('0')
 
-from fusedir import sanitize_filename, Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
-from fusefile import StringFile, FuseArvadosFile
+from .fusedir import Directory, CollectionDirectory, TmpCollectionDirectory, MagicDirectory, TagsDirectory, ProjectDirectory, SharedDirectory, CollectionDirectoryBase
+from .fusefile import StringFile, FuseArvadosFile
 
 _logger = logging.getLogger('arvados.arvados_fuse')
 
@@ -204,7 +206,7 @@ class InodeCache(object):
 
     def cap_cache(self):
         if self._total > self.cap:
-            for ent in self._entries.values():
+            for ent in listvalues(self._entries):
                 if self._total < self.cap or len(self._entries) < self.min_entries:
                     break
                 self._remove(ent, True)
@@ -261,10 +263,10 @@ class Inodes(object):
         self._entries[key] = item
 
     def __iter__(self):
-        return self._entries.iterkeys()
+        return iter(self._entries.keys())
 
     def items(self):
-        return self._entries.items()
+        return viewitems(self._entries.items())
 
     def __contains__(self, k):
         return k in self._entries
@@ -302,12 +304,12 @@ class Inodes(object):
         if entry.has_ref(False):
             # Only necessary if the kernel has previously done a lookup on this
             # inode and hasn't yet forgotten about it.
-            llfuse.invalidate_entry(entry.inode, name.encode(self.encoding))
+            llfuse.invalidate_entry(entry.inode, native(name.encode(self.encoding)))
 
     def clear(self):
         self.inode_cache.clear()
 
-        for k,v in self._entries.items():
+        for k,v in viewitems(self._entries):
             try:
                 v.finalize()
             except Exception as e:
@@ -327,6 +329,8 @@ def catch_exceptions(orig_func):
             raise
         except EnvironmentError as e:
             raise llfuse.FUSEError(e.errno)
+        except NotImplementedError:
+            raise llfuse.FUSEError(errno.ENOTSUP)
         except arvados.errors.KeepWriteError as e:
             _logger.error("Keep write error: " + str(e))
             raise llfuse.FUSEError(errno.EIO)
@@ -413,25 +417,26 @@ class Operations(llfuse.Operations):
         # initializing to continue
         self.initlock.set()
 
-    def time_samples(self):
-        metrics = self.fuse_time.collect()
+    def metric_samples(self):
+        return self.fuse_time.collect()[0].samples
 
-        # We should have one parent summary metric, and child summaries for each op
-        if len(metrics) != 1:
-            _logger.warning("arv-mount metrics: invalid number of prometheus Summary metrics")
-            return [] 
-        return metrics[0].samples
+    def metric_op_names(self):
+        ops = []
+        for cur_op in [sample.labels['op'] for sample in self.metric_samples()]:
+            if cur_op not in ops:
+                ops.append(cur_op)
+        return ops
 
-    def time_sum_samples(self):
-        return [sample for sample in self.time_samples() if sample.name == 'arvmount_fuse_operations_seconds_sum']
+    def metric_value(self, opname, metric):
+        op_value = [sample.value for sample in self.metric_samples()
+                    if sample.name == metric and sample.labels['op'] == opname]
+        return op_value[0] if len(op_value) == 1 else None
 
-    def time_sum_value(self, opname):
-        for op_sum in self.time_sum_samples():
-            if op_sum.labels['op'] == opname:
-                return op_sum.value
+    def metric_sum_func(self, opname):
+        return lambda: self.metric_value(opname, "arvmount_fuse_operations_seconds_sum")
 
-    def time_sum_value_func(self, opname):
-        return lambda: self.time_sum_value(opname)
+    def metric_count_func(self, opname):
+        return lambda: int(self.metric_value(opname, "arvmount_fuse_operations_seconds_count"))
 
     @destroy_time.time()
     @catch_exceptions
@@ -472,24 +477,13 @@ class Operations(llfuse.Operations):
 
             for item in self.inodes.inode_cache.find_by_uuid(ev["object_uuid"]):
                 item.invalidate()
-                if ev.get("object_kind") == "arvados#collection":
-                    pdh = new_attrs.get("portable_data_hash")
-                    # new_attributes.modified_at currently lacks
-                    # subsecond precision (see #6347) so use event_at
-                    # which should always be the same.
-                    stamp = ev.get("event_at")
-                    if (stamp and pdh and item.writable() and
-                        item.collection is not None and
-                        item.collection.modified() and
-                        new_attrs.get("is_trashed") is not True):
-                        item.update(to_record_version=(stamp, pdh))
 
             oldowner = old_attrs.get("owner_uuid")
             newowner = ev.get("object_owner_uuid")
             for parent in (
                     self.inodes.inode_cache.find_by_uuid(oldowner) +
                     self.inodes.inode_cache.find_by_uuid(newowner)):
-                parent.child_event(ev)
+                parent.invalidate()
 
     @getattr_time.time()
     @catch_exceptions
@@ -524,7 +518,7 @@ class Operations(llfuse.Operations):
         entry.st_size = e.size()
 
         entry.st_blksize = 512
-        entry.st_blocks = (entry.st_size/512)+1
+        entry.st_blocks = (entry.st_size // 512) + 1
         if hasattr(entry, 'st_atime_ns'):
             # llfuse >= 0.42
             entry.st_atime_ns = int(e.atime() * 1000000000)
@@ -565,7 +559,7 @@ class Operations(llfuse.Operations):
     @lookup_time.time()
     @catch_exceptions
     def lookup(self, parent_inode, name, ctx=None):
-        name = unicode(name, self.inodes.encoding)
+        name = str(name, self.inodes.encoding)
         inode = None
 
         if name == '.':
@@ -713,8 +707,7 @@ class Operations(llfuse.Operations):
 
         # update atime
         self.inodes.touch(p)
-
-        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + list(p.items()))
+        self._filehandles[fh] = DirectoryHandle(fh, p, [('.', p), ('..', parent)] + listitems(p))
         return fh
 
     @readdir_time.time()
@@ -770,6 +763,7 @@ class Operations(llfuse.Operations):
     @create_time.time()
     @catch_exceptions
     def create(self, inode_parent, name, mode, flags, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount create: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -787,6 +781,7 @@ class Operations(llfuse.Operations):
     @mkdir_time.time()
     @catch_exceptions
     def mkdir(self, inode_parent, name, mode, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount mkdir: parent_inode %i '%s' %o", inode_parent, name, mode)
 
         p = self._check_writable(inode_parent)
@@ -801,6 +796,7 @@ class Operations(llfuse.Operations):
     @unlink_time.time()
     @catch_exceptions
     def unlink(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount unlink: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.unlink(name)
@@ -808,6 +804,7 @@ class Operations(llfuse.Operations):
     @rmdir_time.time()
     @catch_exceptions
     def rmdir(self, inode_parent, name, ctx=None):
+        name = name.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rmdir: parent_inode %i '%s'", inode_parent, name)
         p = self._check_writable(inode_parent)
         p.rmdir(name)
@@ -815,6 +812,8 @@ class Operations(llfuse.Operations):
     @rename_time.time()
     @catch_exceptions
     def rename(self, inode_parent_old, name_old, inode_parent_new, name_new, ctx=None):
+        name_old = name_old.decode(encoding=self.inodes.encoding)
+        name_new = name_new.decode(encoding=self.inodes.encoding)
         _logger.debug("arv-mount rename: old_parent_inode %i '%s' new_parent_inode %i '%s'", inode_parent_old, name_old, inode_parent_new, name_new)
         src = self._check_writable(inode_parent_old)
         dest = self._check_writable(inode_parent_new)