Fix misake in arvados-cwl-runner upload_dependencies refactor. refs #9766
[arvados.git] / sdk / cwl / arvados_cwl / runner.py
1 import os
2 import urlparse
3 from functools import partial
4 import logging
5 import json
6 import re
7
8 import cwltool.draft2tool
9 from cwltool.draft2tool import CommandLineTool
10 import cwltool.workflow
11 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
12 from cwltool.load_tool import fetch_document
13 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
14
15 import arvados.collection
16
17 from .arvdocker import arv_docker_get_image
18 from .pathmapper import ArvPathMapper
19
20 logger = logging.getLogger('arvados.cwl-runner')
21
22 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
23
24 def upload_dependencies(arvrunner, name, document_loader,
25                         workflowobj, uri, loadref_run):
26     loaded = set()
27     def loadref(b, u):
28         joined = urlparse.urljoin(b, u)
29         if joined not in loaded:
30             loaded.add(joined)
31             return document_loader.fetch(urlparse.urljoin(b, u))
32         else:
33             return {}
34
35     if loadref_run:
36         loadref_fields = set(("$import", "run"))
37     else:
38         loadref_fields = set(("$import",))
39
40     sc = scandeps(uri, workflowobj,
41                   loadref_fields,
42                   set(("$include", "$schemas", "path", "location")),
43                   loadref)
44
45     files = []
46     def visitFiles(path):
47         files.append(path)
48
49     adjustFileObjs(sc, visitFiles)
50     adjustDirObjs(sc, visitFiles)
51
52     normalizeFilesDirs(files)
53
54     if "id" in workflowobj:
55         files.append({"class": "File", "location": workflowobj["id"]})
56
57     mapper = ArvPathMapper(arvrunner, files, "",
58                            "keep:%s",
59                            "keep:%s/%s",
60                            name=name)
61
62     def setloc(p):
63         p["location"] = mapper.mapper(p["location"]).target
64     adjustFileObjs(workflowobj, setloc)
65     adjustDirObjs(workflowobj, setloc)
66
67     return mapper
68
69
70 def upload_docker(arvrunner, tool):
71     if isinstance(tool, CommandLineTool):
72         (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
73         if docker_req:
74             arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
75     elif isinstance(tool, cwltool.workflow.Workflow):
76         for s in tool.steps:
77             upload_docker(arvrunner, s.embedded_tool)
78
79
80 class Runner(object):
81     def __init__(self, runner, tool, job_order, enable_reuse):
82         self.arvrunner = runner
83         self.tool = tool
84         self.job_order = job_order
85         self.running = False
86         self.enable_reuse = enable_reuse
87         self.uuid = None
88
89     def update_pipeline_component(self, record):
90         pass
91
92     def arvados_job_spec(self, *args, **kwargs):
93         upload_docker(self.arvrunner, self.tool)
94
95         self.name = os.path.basename(self.tool.tool["id"])
96
97         workflowmapper = upload_dependencies(self.arvrunner,
98                                              self.name,
99                                              self.tool.doc_loader,
100                                              self.tool.tool,
101                                              self.tool.tool["id"],
102                                              True)
103
104         jobmapper = upload_dependencies(self.arvrunner,
105                                         os.path.basename(self.job_order.get("id", "#")),
106                                         self.tool.doc_loader,
107                                         self.job_order,
108                                         self.job_order.get("id", "#"),
109                                         False)
110
111         if "id" in self.job_order:
112             del self.job_order["id"]
113
114         return workflowmapper
115
116
117     def done(self, record):
118         if record["state"] == "Complete":
119             if record.get("exit_code") is not None:
120                 if record["exit_code"] == 33:
121                     processStatus = "UnsupportedRequirement"
122                 elif record["exit_code"] == 0:
123                     processStatus = "success"
124                 else:
125                     processStatus = "permanentFail"
126             else:
127                 processStatus = "success"
128         else:
129             processStatus = "permanentFail"
130
131         outputs = None
132         try:
133             try:
134                 outc = arvados.collection.Collection(record["output"])
135                 with outc.open("cwl.output.json") as f:
136                     outputs = json.load(f)
137                 def keepify(fileobj):
138                     path = fileobj["location"]
139                     if not path.startswith("keep:"):
140                         fileobj["location"] = "keep:%s/%s" % (record["output"], path)
141                 adjustFileObjs(outputs, keepify)
142                 adjustDirObjs(outputs, keepify)
143             except Exception as e:
144                 logger.error("While getting final output object: %s", e)
145             self.arvrunner.output_callback(outputs, processStatus)
146         finally:
147             del self.arvrunner.processes[record["uuid"]]