9275: create pipeline_instance in submit mode as well and add the runner job to it...
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30
31 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
32 from arvados.api import OrderedJsonModel
33
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
36
37 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
38 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
39 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
40
41
42 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
43     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
44
45     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
46         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
47
48     sp = dockerRequirement["dockerImageId"].split(":")
49     image_name = sp[0]
50     image_tag = sp[1] if len(sp) > 1 else None
51
52     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
53                                                             image_name=image_name,
54                                                             image_tag=image_tag)
55
56     if not images:
57         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
58         args = ["--project-uuid="+project_uuid, image_name]
59         if image_tag:
60             args.append(image_tag)
61         logger.info("Uploading Docker image %s", ":".join(args[1:]))
62         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
63
64     return dockerRequirement["dockerImageId"]
65
66
67 class CollectionFsAccess(cwltool.process.StdFsAccess):
68     """Implement the cwltool FsAccess interface for Arvados Collections."""
69
70     def __init__(self, basedir):
71         super(CollectionFsAccess, self).__init__(basedir)
72         self.collections = {}
73
74     def get_collection(self, path):
75         p = path.split("/")
76         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
77             pdh = p[0][5:]
78             if pdh not in self.collections:
79                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
80             return (self.collections[pdh], "/".join(p[1:]))
81         else:
82             return (None, path)
83
84     def _match(self, collection, patternsegments, parent):
85         if not patternsegments:
86             return []
87
88         if not isinstance(collection, arvados.collection.RichCollectionBase):
89             return []
90
91         ret = []
92         # iterate over the files and subcollections in 'collection'
93         for filename in collection:
94             if patternsegments[0] == '.':
95                 # Pattern contains something like "./foo" so just shift
96                 # past the "./"
97                 ret.extend(self._match(collection, patternsegments[1:], parent))
98             elif fnmatch.fnmatch(filename, patternsegments[0]):
99                 cur = os.path.join(parent, filename)
100                 if len(patternsegments) == 1:
101                     ret.append(cur)
102                 else:
103                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
104         return ret
105
106     def glob(self, pattern):
107         collection, rest = self.get_collection(pattern)
108         patternsegments = rest.split("/")
109         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
110
111     def open(self, fn, mode):
112         collection, rest = self.get_collection(fn)
113         if collection:
114             return collection.open(rest, mode)
115         else:
116             return open(self._abs(fn), mode)
117
118     def exists(self, fn):
119         collection, rest = self.get_collection(fn)
120         if collection:
121             return collection.exists(rest)
122         else:
123             return os.path.exists(self._abs(fn))
124
125 class ArvadosJob(object):
126     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
127
128     def __init__(self, runner):
129         self.arvrunner = runner
130         self.running = False
131
132     def run(self, dry_run=False, pull_image=True, **kwargs):
133         script_parameters = {
134             "command": self.command_line
135         }
136         runtime_constraints = {}
137
138         if self.generatefiles:
139             vwd = arvados.collection.Collection()
140             script_parameters["task.vwd"] = {}
141             for t in self.generatefiles:
142                 if isinstance(self.generatefiles[t], dict):
143                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
144                     vwd.copy(rest, t, source_collection=src)
145                 else:
146                     with vwd.open(t, "w") as f:
147                         f.write(self.generatefiles[t])
148             vwd.save_new()
149             for t in self.generatefiles:
150                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
151
152         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
153         if self.environment:
154             script_parameters["task.env"].update(self.environment)
155
156         if self.stdin:
157             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
158
159         if self.stdout:
160             script_parameters["task.stdout"] = self.stdout
161
162         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
163         if docker_req and kwargs.get("use_container") is not False:
164             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
165         else:
166             runtime_constraints["docker_image"] = "arvados/jobs"
167
168         resources = self.builder.resources
169         if resources is not None:
170             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
171             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
172             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
173
174         filters = [["repository", "=", "arvados"],
175                    ["script", "=", "crunchrunner"],
176                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
177         if not self.arvrunner.ignore_docker_for_reuse:
178             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
179
180         try:
181             response = self.arvrunner.api.jobs().create(
182                 body={
183                     "owner_uuid": self.arvrunner.project_uuid,
184                     "script": "crunchrunner",
185                     "repository": "arvados",
186                     "script_version": "master",
187                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
188                     "script_parameters": {"tasks": [script_parameters]},
189                     "runtime_constraints": runtime_constraints
190                 },
191                 filters=filters,
192                 find_or_create=kwargs.get("enable_reuse", True)
193             ).execute(num_retries=self.arvrunner.num_retries)
194
195             self.arvrunner.jobs[response["uuid"]] = self
196
197             self.arvrunner.pipeline["components"][self.name] = {"job": response}
198             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
199                                                                                      body={
200                                                                                          "components": self.arvrunner.pipeline["components"]
201                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
202
203             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
204
205             if response["state"] in ("Complete", "Failed", "Cancelled"):
206                 self.done(response)
207         except Exception as e:
208             logger.error("Got error %s" % str(e))
209             self.output_callback({}, "permanentFail")
210
211     def update_pipeline_component(self, record):
212         self.arvrunner.pipeline["components"][self.name] = {"job": record}
213         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
214                                                                                  body={
215                                                                                     "components": self.arvrunner.pipeline["components"]
216                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
217
218     def done(self, record):
219         try:
220             self.update_pipeline_component(record)
221         except:
222             pass
223
224         try:
225             if record["state"] == "Complete":
226                 processStatus = "success"
227             else:
228                 processStatus = "permanentFail"
229
230             try:
231                 outputs = {}
232                 if record["output"]:
233                     logc = arvados.collection.Collection(record["log"])
234                     log = logc.open(logc.keys()[0])
235                     tmpdir = None
236                     outdir = None
237                     keepdir = None
238                     for l in log:
239                         # Determine the tmpdir, outdir and keepdir paths from
240                         # the job run.  Unfortunately, we can't take the first
241                         # values we find (which are expected to be near the
242                         # top) and stop scanning because if the node fails and
243                         # the job restarts on a different node these values
244                         # will different runs, and we need to know about the
245                         # final run that actually produced output.
246
247                         g = tmpdirre.match(l)
248                         if g:
249                             tmpdir = g.group(1)
250                         g = outdirre.match(l)
251                         if g:
252                             outdir = g.group(1)
253                         g = keepre.match(l)
254                         if g:
255                             keepdir = g.group(1)
256
257                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
258
259                     # check if collection already exists with same owner, name and content
260                     collection_exists = self.arvrunner.api.collections().list(
261                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
262                                  ['portable_data_hash', '=', record["output"]],
263                                  ["name", "=", colname]]
264                     ).execute(num_retries=self.arvrunner.num_retries)
265
266                     if not collection_exists["items"]:
267                         # Create a collection located in the same project as the
268                         # pipeline with the contents of the output.
269                         # First, get output record.
270                         collections = self.arvrunner.api.collections().list(
271                             limit=1,
272                             filters=[['portable_data_hash', '=', record["output"]]],
273                             select=["manifest_text"]
274                         ).execute(num_retries=self.arvrunner.num_retries)
275
276                         if not collections["items"]:
277                             raise WorkflowException(
278                                 "Job output '%s' cannot be found on API server" % (
279                                     record["output"]))
280
281                         # Create new collection in the parent project
282                         # with the output contents.
283                         self.arvrunner.api.collections().create(body={
284                             "owner_uuid": self.arvrunner.project_uuid,
285                             "name": colname,
286                             "portable_data_hash": record["output"],
287                             "manifest_text": collections["items"][0]["manifest_text"]
288                         }, ensure_unique_name=True).execute(
289                             num_retries=self.arvrunner.num_retries)
290
291                     self.builder.outdir = outdir
292                     self.builder.pathmapper.keepdir = keepdir
293                     outputs = self.collect_outputs("keep:" + record["output"])
294             except WorkflowException as e:
295                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
296                 processStatus = "permanentFail"
297             except Exception as e:
298                 logger.exception("Got unknown exception while collecting job outputs:")
299                 processStatus = "permanentFail"
300
301             self.output_callback(outputs, processStatus)
302         finally:
303             del self.arvrunner.jobs[record["uuid"]]
304
305
306 class RunnerJob(object):
307     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
308
309     def __init__(self, runner, tool, job_order, enable_reuse):
310         self.arvrunner = runner
311         self.tool = tool
312         self.job_order = job_order
313         self.running = False
314         self.enable_reuse = enable_reuse
315
316     #def update_pipeline_component(self, record):
317     #    pass
318
319     def upload_docker(self, tool):
320         if isinstance(tool, CommandLineTool):
321             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
322             if docker_req:
323                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
324         elif isinstance(tool, cwltool.workflow.Workflow):
325             for s in tool.steps:
326                 self.upload_docker(s.embedded_tool)
327
328     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
329         """Create an Arvados job specification for this workflow.
330
331         The returned dict can be used to create a job (i.e., passed as
332         the +body+ argument to jobs().create()), or as a component in
333         a pipeline template or pipeline instance.
334         """
335         self.upload_docker(self.tool)
336
337         workflowfiles = set()
338         jobfiles = set()
339         workflowfiles.add(self.tool.tool["id"])
340
341         self.name = os.path.basename(self.tool.tool["id"])
342
343         def visitFiles(files, path):
344             files.add(path)
345             return path
346
347         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
348         def loadref(b, u):
349             return document_loader.fetch(urlparse.urljoin(b, u))
350
351         sc = scandeps(uri, workflowobj,
352                       set(("$import", "run")),
353                       set(("$include", "$schemas", "path")),
354                       loadref)
355         adjustFiles(sc, partial(visitFiles, workflowfiles))
356         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
357
358         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
359                                        "%s",
360                                        "%s/%s",
361                                        name=self.name,
362                                        **kwargs)
363
364         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
365                                   "%s",
366                                   "%s/%s",
367                                   name=os.path.basename(self.job_order.get("id", "#")),
368                                   **kwargs)
369
370         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
371
372         if "id" in self.job_order:
373             del self.job_order["id"]
374
375         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
376         return {
377             "script": "cwl-runner",
378             "script_version": "master",
379             "repository": "arvados",
380             "script_parameters": self.job_order,
381             "runtime_constraints": {
382                 "docker_image": "arvados/jobs"
383             }
384         }
385
386     def run(self, *args, **kwargs):
387         job_spec = self.arvados_job_spec(*args, **kwargs)
388         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
389
390         response = self.arvrunner.api.jobs().create(
391             body=job_spec,
392             find_or_create=self.enable_reuse
393         ).execute(num_retries=self.arvrunner.num_retries)
394
395         self.uuid = response["uuid"]
396         self.arvrunner.jobs[self.uuid] = self
397
398         logger.info("Submitted job %s", response["uuid"])
399
400         if response["state"] in ("Complete", "Failed", "Cancelled"):
401             self.done(response)
402
403     def done(self, record):
404         if record["state"] == "Complete":
405             processStatus = "success"
406         else:
407             processStatus = "permanentFail"
408
409         outputs = None
410         try:
411             try:
412                 outc = arvados.collection.Collection(record["output"])
413                 with outc.open("cwl.output.json") as f:
414                     outputs = json.load(f)
415                 def keepify(path):
416                     if not path.startswith("keep:"):
417                         return "keep:%s/%s" % (record["output"], path)
418                 adjustFiles(outputs, keepify)
419             except Exception as e:
420                 logger.error("While getting final output object: %s", e)
421             self.arvrunner.output_callback(outputs, processStatus)
422         finally:
423             del self.arvrunner.jobs[record["uuid"]]
424
425
426 class RunnerTemplate(object):
427     """An Arvados pipeline template that invokes a CWL workflow."""
428
429     type_to_dataclass = {
430         'boolean': 'boolean',
431         'File': 'File',
432         'float': 'number',
433         'int': 'number',
434         'string': 'text',
435     }
436
437     def __init__(self, runner, tool, job_order, enable_reuse):
438         self.runner = runner
439         self.tool = tool
440         self.job = RunnerJob(
441             runner=runner,
442             tool=tool,
443             job_order=job_order,
444             enable_reuse=enable_reuse)
445
446     def pipeline_component_spec(self):
447         """Return a component that Workbench and a-r-p-i will understand.
448
449         Specifically, translate CWL input specs to Arvados pipeline
450         format, like {"dataclass":"File","value":"xyz"}.
451         """
452         spec = self.job.arvados_job_spec()
453
454         # Most of the component spec is exactly the same as the job
455         # spec (script, script_version, etc.).
456         # spec['script_parameters'] isn't right, though. A component
457         # spec's script_parameters hash is a translation of
458         # self.tool.tool['inputs'] with defaults/overrides taken from
459         # the job order. So we move the job parameters out of the way
460         # and build a new spec['script_parameters'].
461         job_params = spec['script_parameters']
462         spec['script_parameters'] = {}
463
464         for param in self.tool.tool['inputs']:
465             param = copy.deepcopy(param)
466
467             # Data type and "required" flag...
468             types = param['type']
469             if not isinstance(types, list):
470                 types = [types]
471             param['required'] = 'null' not in types
472             non_null_types = set(types) - set(['null'])
473             if len(non_null_types) == 1:
474                 the_type = [c for c in non_null_types][0]
475                 dataclass = self.type_to_dataclass.get(the_type)
476                 if dataclass:
477                     param['dataclass'] = dataclass
478             # Note: If we didn't figure out a single appropriate
479             # dataclass, we just left that attribute out.  We leave
480             # the "type" attribute there in any case, which might help
481             # downstream.
482
483             # Title and description...
484             title = param.pop('label', '')
485             descr = param.pop('description', '').rstrip('\n')
486             if title:
487                 param['title'] = title
488             if descr:
489                 param['description'] = descr
490
491             # Fill in the value from the current job order, if any.
492             param_id = shortname(param.pop('id'))
493             value = job_params.get(param_id)
494             if value is None:
495                 pass
496             elif not isinstance(value, dict):
497                 param['value'] = value
498             elif param.get('dataclass') == 'File' and value.get('path'):
499                 param['value'] = value['path']
500
501             spec['script_parameters'][param_id] = param
502         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
503         return spec
504
505     def save(self):
506         job_spec = self.pipeline_component_spec()
507         response = self.runner.api.pipeline_templates().create(body={
508             "components": {
509                 self.job.name: job_spec,
510             },
511             "name": self.job.name,
512             "owner_uuid": self.runner.project_uuid,
513         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
514         self.uuid = response["uuid"]
515         logger.info("Created template %s", self.uuid)
516
517
518 class ArvPathMapper(cwltool.pathmapper.PathMapper):
519     """Convert container-local paths to and from Keep collection ids."""
520
521     def __init__(self, arvrunner, referenced_files, input_basedir,
522                  collection_pattern, file_pattern, name=None, **kwargs):
523         self._pathmap = arvrunner.get_uploaded()
524         uploadfiles = set()
525
526         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
527
528         for src in referenced_files:
529             if isinstance(src, basestring) and pdh_path.match(src):
530                 self._pathmap[src] = (src, collection_pattern % src[5:])
531             if "#" in src:
532                 src = src[:src.index("#")]
533             if src not in self._pathmap:
534                 ab = cwltool.pathmapper.abspath(src, input_basedir)
535                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
536                 if kwargs.get("conformance_test"):
537                     self._pathmap[src] = (src, ab)
538                 elif isinstance(st, arvados.commands.run.UploadFile):
539                     uploadfiles.add((src, ab, st))
540                 elif isinstance(st, arvados.commands.run.ArvFile):
541                     self._pathmap[src] = (ab, st.fn)
542                 else:
543                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
544
545         if uploadfiles:
546             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
547                                              arvrunner.api,
548                                              dry_run=kwargs.get("dry_run"),
549                                              num_retries=3,
550                                              fnPattern=file_pattern,
551                                              name=name,
552                                              project=arvrunner.project_uuid)
553
554         for src, ab, st in uploadfiles:
555             arvrunner.add_uploaded(src, (ab, st.fn))
556             self._pathmap[src] = (ab, st.fn)
557
558         self.keepdir = None
559
560     def reversemap(self, target):
561         if target.startswith("keep:"):
562             return (target, target)
563         elif self.keepdir and target.startswith(self.keepdir):
564             return (target, "keep:" + target[len(self.keepdir)+1:])
565         else:
566             return super(ArvPathMapper, self).reversemap(target)
567
568
569 class ArvadosCommandTool(CommandLineTool):
570     """Wrap cwltool CommandLineTool to override selected methods."""
571
572     def __init__(self, arvrunner, toolpath_object, **kwargs):
573         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
574         self.arvrunner = arvrunner
575
576     def makeJobRunner(self):
577         return ArvadosJob(self.arvrunner)
578
579     def makePathMapper(self, reffiles, **kwargs):
580         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
581                              "$(task.keep)/%s",
582                              "$(task.keep)/%s/%s",
583                              **kwargs)
584
585
586 class ArvCwlRunner(object):
587     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
588     complete, and report output."""
589
590     def __init__(self, api_client):
591         self.api = api_client
592         self.jobs = {}
593         self.lock = threading.Lock()
594         self.cond = threading.Condition(self.lock)
595         self.final_output = None
596         self.uploaded = {}
597         self.num_retries = 4
598
599     def arvMakeTool(self, toolpath_object, **kwargs):
600         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
601             return ArvadosCommandTool(self, toolpath_object, **kwargs)
602         else:
603             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
604
605     def output_callback(self, out, processStatus):
606         if processStatus == "success":
607             logger.info("Overall job status is %s", processStatus)
608             if self.pipeline:
609                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
610                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
611
612         else:
613             logger.warn("Overall job status is %s", processStatus)
614             if self.pipeline:
615                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
616                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
617         self.final_output = out
618
619     def on_message(self, event):
620         if "object_uuid" in event:
621             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
622                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
623                     uuid = event["object_uuid"]
624                     with self.lock:
625                         j = self.jobs[uuid]
626                         logger.info("Job %s (%s) is Running", j.name, uuid)
627                         j.running = True
628                         j.update_pipeline_component(event["properties"]["new_attributes"])
629                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
630                     uuid = event["object_uuid"]
631                     try:
632                         self.cond.acquire()
633                         j = self.jobs[uuid]
634                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
635                         j.done(event["properties"]["new_attributes"])
636                         self.cond.notify()
637                     finally:
638                         self.cond.release()
639
640     def get_uploaded(self):
641         return self.uploaded.copy()
642
643     def add_uploaded(self, src, pair):
644         self.uploaded[src] = pair
645
646     def arvExecutor(self, tool, job_order, **kwargs):
647         self.debug = kwargs.get("debug")
648
649         if kwargs.get("quiet"):
650             logger.setLevel(logging.WARN)
651             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
652
653         useruuid = self.api.users().current().execute()["uuid"]
654         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
655         self.pipeline = None
656
657         if kwargs.get("create_template"):
658             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
659             tmpl.save()
660             # cwltool.main will write our return value to stdout.
661             return tmpl.uuid
662
663         if kwargs.get("submit"):
664             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
665
666         components = {}
667         if kwargs.get("submit"):
668             components[os.path.basename(tool.tool["id"])] = {"job": runnerjob}
669         elif "cwl_runner_job" in kwargs:
670             components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
671
672         self.pipeline = self.api.pipeline_instances().create(
673             body={
674                 "owner_uuid": self.project_uuid,
675                 "name": shortname(tool.tool["id"]),
676                 "components": components,
677                 "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
678
679         logger.info("Pipeline instance %s", self.pipeline["uuid"])
680
681         if kwargs.get("submit") and not kwargs.get("wait"):
682                 runnerjob.run()
683                 return runnerjob.uuid
684
685         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
686
687         self.debug = kwargs.get("debug")
688         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
689         self.fs_access = CollectionFsAccess(kwargs["basedir"])
690
691         kwargs["fs_access"] = self.fs_access
692         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
693
694         kwargs["outdir"] = "$(task.outdir)"
695         kwargs["tmpdir"] = "$(task.tmpdir)"
696
697         if kwargs.get("conformance_test"):
698             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
699         else:
700             if kwargs.get("submit"):
701                 jobiter = iter((runnerjob,))
702             else:
703                 jobiter = tool.job(job_order,
704                                    self.output_callback,
705                                    docker_outdir="$(task.outdir)",
706                                    **kwargs)
707
708             try:
709                 self.cond.acquire()
710                 # Will continue to hold the lock for the duration of this code
711                 # except when in cond.wait(), at which point on_message can update
712                 # job state and process output callbacks.
713
714                 for runnable in jobiter:
715                     if runnable:
716                         runnable.run(**kwargs)
717                     else:
718                         if self.jobs:
719                             self.cond.wait(1)
720                         else:
721                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
722                             break
723
724                 while self.jobs:
725                     self.cond.wait(1)
726
727                 events.close()
728             except:
729                 if sys.exc_info()[0] is KeyboardInterrupt:
730                     logger.error("Interrupted, marking pipeline as failed")
731                 else:
732                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
733                 if self.pipeline:
734                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
735                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
736             finally:
737                 self.cond.release()
738
739             if self.final_output is None:
740                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
741
742             return self.final_output
743
744 def versionstring():
745     """Print version string of key packages for provenance and debugging."""
746
747     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
748     arvpkg = pkg_resources.require("arvados-python-client")
749     cwlpkg = pkg_resources.require("cwltool")
750
751     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
752                                     "arvados-python-client", arvpkg[0].version,
753                                     "cwltool", cwlpkg[0].version)
754
755 def arg_parser():  # type: () -> argparse.ArgumentParser
756     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
757
758     parser.add_argument("--conformance-test", action="store_true")
759     parser.add_argument("--basedir", type=str,
760                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
761     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
762                         help="Output directory, default current directory")
763
764     parser.add_argument("--eval-timeout",
765                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
766                         type=float,
767                         default=20)
768     parser.add_argument("--version", action="store_true", help="Print version and exit")
769
770     exgroup = parser.add_mutually_exclusive_group()
771     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
772     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
773     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
774
775     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
776
777     exgroup = parser.add_mutually_exclusive_group()
778     exgroup.add_argument("--enable-reuse", action="store_true",
779                         default=True, dest="enable_reuse",
780                         help="")
781     exgroup.add_argument("--disable-reuse", action="store_false",
782                         default=True, dest="enable_reuse",
783                         help="")
784
785     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
786     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
787                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
788                         default=False)
789
790     exgroup = parser.add_mutually_exclusive_group()
791     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
792                         default=True, dest="submit")
793     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
794                         default=True, dest="submit")
795     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
796
797     exgroup = parser.add_mutually_exclusive_group()
798     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
799                         default=True, dest="wait")
800     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
801                         default=True, dest="wait")
802
803     parser.add_argument("workflow", type=str, nargs="?", default=None)
804     parser.add_argument("job_order", nargs=argparse.REMAINDER)
805
806     return parser
807
808 def main(args, stdout, stderr, api_client=None):
809     parser = arg_parser()
810
811     job_order_object = None
812     arvargs = parser.parse_args(args)
813     if arvargs.create_template and not arvargs.job_order:
814         job_order_object = ({}, "")
815
816     try:
817         if api_client is None:
818             api_client=arvados.api('v1', model=OrderedJsonModel())
819         runner = ArvCwlRunner(api_client)
820     except Exception as e:
821         logger.error(e)
822         return 1
823
824     return cwltool.main.main(args=arvargs,
825                              stdout=stdout,
826                              stderr=stderr,
827                              executor=runner.arvExecutor,
828                              makeTool=runner.arvMakeTool,
829                              versionfunc=versionstring,
830                              job_order_object=job_order_object)