Arvados-DCO-1.1-Signed-off-by: Radhika Chippada <radhika@curoverse.com>
[arvados.git] / crunch_scripts / crunchutil / subst.py
index b3526883fcf193ff719c249a37343afe45e340e7..53def97f9648872e69634b29d701944f3cf59639 100644 (file)
@@ -1,5 +1,16 @@
-import os
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
 import glob
+import os
+import re
+import stat
+
+BACKSLASH_ESCAPE_RE = re.compile(r'\\(.)')
+
+class SubstitutionError(Exception):
+    pass
 
 def search(c):
     DEFAULT = 0
@@ -28,17 +39,27 @@ def search(c):
             state = DEFAULT
         i += 1
     if depth != 0:
-        raise Exception("Substitution error, mismatched parentheses {}".format(c))
+        raise SubstitutionError("Substitution error, mismatched parentheses {}".format(c))
     return None
 
 def sub_file(v):
-    return os.path.join(os.environ['TASK_KEEPMOUNT'], v)
+    path = os.path.join(os.environ['TASK_KEEPMOUNT'], v)
+    st = os.stat(path)
+    if st and stat.S_ISREG(st.st_mode):
+        return path
+    else:
+        raise SubstitutionError("$(file {}) is not accessible or is not a regular file".format(path))
 
 def sub_dir(v):
     d = os.path.dirname(v)
     if d == '':
         d = v
-    return os.path.join(os.environ['TASK_KEEPMOUNT'], d)
+    path = os.path.join(os.environ['TASK_KEEPMOUNT'], d)
+    st = os.stat(path)
+    if st and stat.S_ISDIR(st.st_mode):
+        return path
+    else:
+        raise SubstitutionError("$(dir {}) is not accessible or is not a directory".format(path))
 
 def sub_basename(v):
     return os.path.splitext(os.path.basename(v))[0]
@@ -46,7 +67,7 @@ def sub_basename(v):
 def sub_glob(v):
     l = glob.glob(v)
     if len(l) == 0:
-        raise Exception("$(glob): No match on '%s'" % v)
+        raise SubstitutionError("$(glob {}) no match found".format(v))
     else:
         return l[0]
 
@@ -57,19 +78,25 @@ default_subs = {"file ": sub_file,
 
 def do_substitution(p, c, subs=default_subs):
     while True:
-        #print("c is", c)
         m = search(c)
-        if m is not None:
-            v = do_substitution(p, c[m[0]+2 : m[1]])
-            var = True
-            for sub in subs:
-                if v.startswith(sub):
-                    r = subs[sub](v[len(sub):])
-                    var = False
-                    break
-            if var:
+        if m is None:
+            return BACKSLASH_ESCAPE_RE.sub(r'\1', c)
+
+        v = do_substitution(p, c[m[0]+2 : m[1]])
+        var = True
+        for sub in subs:
+            if v.startswith(sub):
+                r = subs[sub](v[len(sub):])
+                var = False
+                break
+        if var:
+            if v in p:
                 r = p[v]
+            else:
+                raise SubstitutionError("Unknown variable or function '%s' while performing substitution on '%s'" % (v, c))
+            if r is None:
+                raise SubstitutionError("Substitution for '%s' is null while performing substitution on '%s'" % (v, c))
+            if not isinstance(r, basestring):
+                raise SubstitutionError("Substitution for '%s' must be a string while performing substitution on '%s'" % (v, c))
 
-            c = c[:m[0]] + r + c[m[1]+1:]
-        else:
-            return c
+        c = c[:m[0]] + r + c[m[1]+1:]