9570: Adjust Directory objects, too.
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
index 639426afd3be49db355dee357155e4933bb727fb..fbe587979ca46be32806e2db62661f3faff1609f 100644 (file)
@@ -3,11 +3,14 @@ import urlparse
 from functools import partial
 import logging
 import json
+import re
 
+import cwltool.draft2tool
 from cwltool.draft2tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement
 from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
 
 import arvados.collection
 
@@ -16,6 +19,8 @@ from .pathmapper import ArvPathMapper
 
 logger = logging.getLogger('arvados.cwl-runner')
 
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
 class Runner(object):
     def __init__(self, runner, tool, job_order, enable_reuse):
         self.arvrunner = runner
@@ -41,15 +46,14 @@ class Runner(object):
     def arvados_job_spec(self, *args, **kwargs):
         self.upload_docker(self.tool)
 
-        workflowfiles = set()
-        jobfiles = set()
-        workflowfiles.add(self.tool.tool["id"])
+        workflowfiles = []
+        jobfiles = []
+        workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
 
         self.name = os.path.basename(self.tool.tool["id"])
 
         def visitFiles(files, path):
-            files.add(path)
-            return path
+            files.append(path)
 
         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
         loaded = set()
@@ -63,10 +67,12 @@ class Runner(object):
 
         sc = scandeps(uri, workflowobj,
                       set(("$import", "run")),
-                      set(("$include", "$schemas", "path")),
+                      set(("$include", "$schemas", "path", "location")),
                       loadref)
-        adjustFiles(sc, partial(visitFiles, workflowfiles))
-        adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+        adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+        adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
+        adjustDirObjs(sc, partial(visitFiles, workflowfiles))
+        adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
 
         keepprefix = kwargs.get("keepprefix", "")
         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
@@ -81,7 +87,10 @@ class Runner(object):
                                   name=os.path.basename(self.job_order.get("id", "#")),
                                   **kwargs)
 
-        adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+        def setloc(p):
+            p["location"] = jobmapper.mapper(p["location"])[1]
+        adjustFileObjs(self.job_order, setloc)
+        adjustDirObjs(self.job_order, setloc)
 
         if "id" in self.job_order:
             del self.job_order["id"]
@@ -109,14 +118,14 @@ class Runner(object):
                 outc = arvados.collection.Collection(record["output"])
                 with outc.open("cwl.output.json") as f:
                     outputs = json.load(f)
-                def keepify(path):
+                def keepify(fileobj):
+                    path = fileobj["location"]
                     if not path.startswith("keep:"):
-                        return "keep:%s/%s" % (record["output"], path)
-                    else:
-                        return path
-                adjustFiles(outputs, keepify)
+                        fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+                adjustFileObjs(outputs, keepify)
+                adjustDirObjs(outputs, keepify)
             except Exception as e:
                 logger.error("While getting final output object: %s", e)
             self.arvrunner.output_callback(outputs, processStatus)
         finally:
-            del self.arvrunner.jobs[record["uuid"]]
+            del self.arvrunner.processes[record["uuid"]]