3 from functools import partial
9 from cStringIO import StringIO
11 from schema_salad.sourceline import SourceLine
13 import cwltool.draft2tool
14 from cwltool.draft2tool import CommandLineTool
15 import cwltool.workflow
16 from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
17 from cwltool.load_tool import fetch_document
18 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
19 from cwltool.utils import aslist
20 from cwltool.builder import substitute
21 from cwltool.pack import pack
23 import arvados.collection
24 import ruamel.yaml as yaml
26 from .arvdocker import arv_docker_get_image
27 from .pathmapper import ArvPathMapper
28 from ._version import __version__
31 logger = logging.getLogger('arvados.cwl-runner')
33 cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r".*")
35 def trim_listing(obj):
36 """Remove 'listing' field from Directory objects that are keep references.
38 When Directory objects represent Keep references, it redundant and
39 potentially very expensive to pass fully enumerated Directory objects
40 between instances of cwl-runner (e.g. a submitting a job, or using the
41 RunInSingleContainer feature), so delete the 'listing' field when it is
45 if obj.get("location", "").startswith("keep:") and "listing" in obj:
47 if obj.get("location", "").startswith("_:"):
50 def upload_dependencies(arvrunner, name, document_loader,
51 workflowobj, uri, loadref_run, include_primary=True):
52 """Upload the dependencies of the workflowobj document to Keep.
54 Returns a pathmapper object mapping local paths to keep references. Also
55 does an in-place update of references in "workflowobj".
57 Use scandeps to find $import, $include, $schemas, run, File and Directory
58 fields that represent external references.
60 If workflowobj has an "id" field, this will reload the document to ensure
61 it is scanning the raw document prior to preprocessing.
66 joined = document_loader.fetcher.urljoin(b, u)
67 defrg, _ = urlparse.urldefrag(joined)
68 if defrg not in loaded:
70 # Use fetch_text to get raw file (before preprocessing).
71 text = document_loader.fetch_text(defrg)
72 if isinstance(text, bytes):
73 textIO = StringIO(text.decode('utf-8'))
75 textIO = StringIO(text)
76 return yaml.safe_load(textIO)
81 loadref_fields = set(("$import", "run"))
83 loadref_fields = set(("$import",))
86 if "id" in workflowobj:
87 # Need raw file content (before preprocessing) to ensure
88 # that external references in $include and $mixin are captured.
89 scanobj = loadref("", workflowobj["id"])
91 sc = scandeps(uri, scanobj,
93 set(("$include", "$schemas", "location")),
94 loadref, urljoin=document_loader.fetcher.urljoin)
96 normalizeFilesDirs(sc)
98 if include_primary and "id" in workflowobj:
99 sc.append({"class": "File", "location": workflowobj["id"]})
101 mapper = ArvPathMapper(arvrunner, sc, "",
107 if "location" in p and (not p["location"].startswith("_:")) and (not p["location"].startswith("keep:")):
108 p["location"] = mapper.mapper(p["location"]).resolved
109 adjustFileObjs(workflowobj, setloc)
110 adjustDirObjs(workflowobj, setloc)
115 def upload_docker(arvrunner, tool):
116 """Visitor which uploads Docker images referenced in CommandLineTool objects."""
117 if isinstance(tool, CommandLineTool):
118 (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
120 if docker_req.get("dockerOutputDirectory"):
121 # TODO: can be supported by containers API, but not jobs API.
122 raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
123 "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
124 arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid)
126 def packed_workflow(arvrunner, tool):
127 """Create a packed workflow.
129 A "packed" workflow is one where all the components have been combined into a single document."""
131 return pack(tool.doc_loader, tool.doc_loader.fetch(tool.tool["id"]),
132 tool.tool["id"], tool.metadata)
134 def tag_git_version(packed):
135 if tool.tool["id"].startswith("file://"):
136 path = os.path.dirname(tool.tool["id"][7:])
138 githash = subprocess.check_output(['git', 'log', '--first-parent', '--max-count=1', '--format=%H'], stderr=subprocess.STDOUT, cwd=path).strip()
139 except (OSError, subprocess.CalledProcessError):
142 packed["http://schema.org/version"] = githash
145 def upload_job_order(arvrunner, name, tool, job_order):
146 """Upload local files referenced in the input object and return updated input
147 object with 'location' updated to the proper keep references.
150 for t in tool.tool["inputs"]:
151 def setSecondary(fileobj):
152 if isinstance(fileobj, dict) and fileobj.get("class") == "File":
153 if "secondaryFiles" not in fileobj:
154 fileobj["secondaryFiles"] = [{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]]
156 if isinstance(fileobj, list):
160 if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
161 setSecondary(job_order[shortname(t["id"])])
163 jobmapper = upload_dependencies(arvrunner,
167 job_order.get("id", "#"),
170 if "id" in job_order:
173 # Need to filter this out, gets added by cwltool when providing
174 # parameters on the command line.
175 if "job_order" in job_order:
176 del job_order["job_order"]
180 def upload_workflow_deps(arvrunner, tool):
181 # Ensure that Docker images needed by this workflow are available
182 tool.visit(partial(upload_docker, arvrunner))
184 document_loader = tool.doc_loader
186 def upload_tool_deps(deptool):
188 upload_dependencies(arvrunner,
189 "%s dependencies" % (shortname(deptool["id"])),
194 include_primary=False)
195 document_loader.idx[deptool["id"]] = deptool
197 tool.visit(upload_tool_deps)
199 def arvados_jobs_image(arvrunner, img):
200 """Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
203 arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid)
204 except Exception as e:
205 raise Exception("Docker image %s is not available\n%s" % (img, e) )
208 class Runner(object):
209 """Base class for runner processes, which submit an instance of
210 arvados-cwl-runner and wait for the final result."""
212 def __init__(self, runner, tool, job_order, enable_reuse,
213 output_name, output_tags, submit_runner_ram=0,
214 name=None, on_error=None, submit_runner_image=None):
215 self.arvrunner = runner
217 self.job_order = job_order
219 self.enable_reuse = enable_reuse
221 self.final_output = None
222 self.output_name = output_name
223 self.output_tags = output_tags
225 self.on_error = on_error
226 self.jobs_image = submit_runner_image or "arvados/jobs:"+__version__
228 if submit_runner_ram:
229 self.submit_runner_ram = submit_runner_ram
231 self.submit_runner_ram = 1024
233 if self.submit_runner_ram <= 0:
234 raise Exception("Value of --submit-runner-ram must be greater than zero")
236 def update_pipeline_component(self, record):
239 def done(self, record):
240 """Base method for handling a completed runner."""
243 if record["state"] == "Complete":
244 if record.get("exit_code") is not None:
245 if record["exit_code"] == 33:
246 processStatus = "UnsupportedRequirement"
247 elif record["exit_code"] == 0:
248 processStatus = "success"
250 processStatus = "permanentFail"
252 processStatus = "success"
254 processStatus = "permanentFail"
258 if processStatus == "permanentFail":
259 logc = arvados.collection.CollectionReader(record["log"],
260 api_client=self.arvrunner.api,
261 keep_client=self.arvrunner.keep_client,
262 num_retries=self.arvrunner.num_retries)
263 done.logtail(logc, logger, "%s error log:" % self.arvrunner.label(self), maxlen=40)
265 self.final_output = record["output"]
266 outc = arvados.collection.CollectionReader(self.final_output,
267 api_client=self.arvrunner.api,
268 keep_client=self.arvrunner.keep_client,
269 num_retries=self.arvrunner.num_retries)
270 if "cwl.output.json" in outc:
271 with outc.open("cwl.output.json") as f:
273 outputs = json.load(f)
274 def keepify(fileobj):
275 path = fileobj["location"]
276 if not path.startswith("keep:"):
277 fileobj["location"] = "keep:%s/%s" % (record["output"], path)
278 adjustFileObjs(outputs, keepify)
279 adjustDirObjs(outputs, keepify)
280 except Exception as e:
281 logger.exception("[%s] While getting final output object: %s", self.name, e)
282 self.arvrunner.output_callback({}, "permanentFail")
284 self.arvrunner.output_callback(outputs, processStatus)
286 if record["uuid"] in self.arvrunner.processes:
287 del self.arvrunner.processes[record["uuid"]]