Merge branch '8653-cwl-to-template'
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 import cwltool.draft2tool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 from cwltool.process import shortname
18 import cwltool.workflow
19 import fnmatch
20 import functools
21 import json
22 import logging
23 import os
24 import pkg_resources  # part of setuptools
25 import re
26 import sys
27 import threading
28
29 from cwltool.process import get_feature, adjustFiles, scandeps
30 from arvados.api import OrderedJsonModel
31
32 logger = logging.getLogger('arvados.cwl-runner')
33 logger.setLevel(logging.INFO)
34
35 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
36 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
37 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
38
39
40 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
41     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
42
43     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
44         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
45
46     sp = dockerRequirement["dockerImageId"].split(":")
47     image_name = sp[0]
48     image_tag = sp[1] if len(sp) > 1 else None
49
50     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
51                                                             image_name=image_name,
52                                                             image_tag=image_tag)
53
54     if not images:
55         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
56         args = ["--project-uuid="+project_uuid, image_name]
57         if image_tag:
58             args.append(image_tag)
59         logger.info("Uploading Docker image %s", ":".join(args[1:]))
60         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
61
62     return dockerRequirement["dockerImageId"]
63
64
65 class CollectionFsAccess(cwltool.process.StdFsAccess):
66     """Implement the cwltool FsAccess interface for Arvados Collections."""
67
68     def __init__(self, basedir):
69         self.collections = {}
70         self.basedir = basedir
71
72     def get_collection(self, path):
73         p = path.split("/")
74         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
75             pdh = p[0][5:]
76             if pdh not in self.collections:
77                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
78             return (self.collections[pdh], "/".join(p[1:]))
79         else:
80             return (None, path)
81
82     def _match(self, collection, patternsegments, parent):
83         if not patternsegments:
84             return []
85
86         if not isinstance(collection, arvados.collection.RichCollectionBase):
87             return []
88
89         ret = []
90         # iterate over the files and subcollections in 'collection'
91         for filename in collection:
92             if patternsegments[0] == '.':
93                 # Pattern contains something like "./foo" so just shift
94                 # past the "./"
95                 ret.extend(self._match(collection, patternsegments[1:], parent))
96             elif fnmatch.fnmatch(filename, patternsegments[0]):
97                 cur = os.path.join(parent, filename)
98                 if len(patternsegments) == 1:
99                     ret.append(cur)
100                 else:
101                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
102         return ret
103
104     def glob(self, pattern):
105         collection, rest = self.get_collection(pattern)
106         patternsegments = rest.split("/")
107         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
108
109     def open(self, fn, mode):
110         collection, rest = self.get_collection(fn)
111         if collection:
112             return collection.open(rest, mode)
113         else:
114             return open(self._abs(fn), mode)
115
116     def exists(self, fn):
117         collection, rest = self.get_collection(fn)
118         if collection:
119             return collection.exists(rest)
120         else:
121             return os.path.exists(self._abs(fn))
122
123 class ArvadosJob(object):
124     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
125
126     def __init__(self, runner):
127         self.arvrunner = runner
128         self.running = False
129
130     def run(self, dry_run=False, pull_image=True, **kwargs):
131         script_parameters = {
132             "command": self.command_line
133         }
134         runtime_constraints = {}
135
136         if self.generatefiles:
137             vwd = arvados.collection.Collection()
138             script_parameters["task.vwd"] = {}
139             for t in self.generatefiles:
140                 if isinstance(self.generatefiles[t], dict):
141                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
142                     vwd.copy(rest, t, source_collection=src)
143                 else:
144                     with vwd.open(t, "w") as f:
145                         f.write(self.generatefiles[t])
146             vwd.save_new()
147             for t in self.generatefiles:
148                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
149
150         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
151         if self.environment:
152             script_parameters["task.env"].update(self.environment)
153
154         if self.stdin:
155             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
156
157         if self.stdout:
158             script_parameters["task.stdout"] = self.stdout
159
160         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
161         if docker_req and kwargs.get("use_container") is not False:
162             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
163         else:
164             runtime_constraints["docker_image"] = "arvados/jobs"
165
166         resources = self.builder.resources
167         if resources is not None:
168             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
169             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
170             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
171
172         filters = [["repository", "=", "arvados"],
173                    ["script", "=", "crunchrunner"],
174                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
175         if not self.arvrunner.ignore_docker_for_reuse:
176             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
177
178         try:
179             response = self.arvrunner.api.jobs().create(
180                 body={
181                     "owner_uuid": self.arvrunner.project_uuid,
182                     "script": "crunchrunner",
183                     "repository": "arvados",
184                     "script_version": "master",
185                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
186                     "script_parameters": {"tasks": [script_parameters]},
187                     "runtime_constraints": runtime_constraints
188                 },
189                 filters=filters,
190                 find_or_create=kwargs.get("enable_reuse", True)
191             ).execute(num_retries=self.arvrunner.num_retries)
192
193             self.arvrunner.jobs[response["uuid"]] = self
194
195             self.arvrunner.pipeline["components"][self.name] = {"job": response}
196             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
197                                                                                      body={
198                                                                                          "components": self.arvrunner.pipeline["components"]
199                                                                                      }).execute(num_retries=self.arvrunner.num_retries)
200
201             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
202
203             if response["state"] in ("Complete", "Failed", "Cancelled"):
204                 self.done(response)
205         except Exception as e:
206             logger.error("Got error %s" % str(e))
207             self.output_callback({}, "permanentFail")
208
209     def update_pipeline_component(self, record):
210         self.arvrunner.pipeline["components"][self.name] = {"job": record}
211         self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
212                                                                                  body={
213                                                                                     "components": self.arvrunner.pipeline["components"]
214                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
215
216     def done(self, record):
217         try:
218             self.update_pipeline_component(record)
219         except:
220             pass
221
222         try:
223             if record["state"] == "Complete":
224                 processStatus = "success"
225             else:
226                 processStatus = "permanentFail"
227
228             try:
229                 outputs = {}
230                 if record["output"]:
231                     logc = arvados.collection.Collection(record["log"])
232                     log = logc.open(logc.keys()[0])
233                     tmpdir = None
234                     outdir = None
235                     keepdir = None
236                     for l in log:
237                         # Determine the tmpdir, outdir and keepdir paths from
238                         # the job run.  Unfortunately, we can't take the first
239                         # values we find (which are expected to be near the
240                         # top) and stop scanning because if the node fails and
241                         # the job restarts on a different node these values
242                         # will different runs, and we need to know about the
243                         # final run that actually produced output.
244
245                         g = tmpdirre.match(l)
246                         if g:
247                             tmpdir = g.group(1)
248                         g = outdirre.match(l)
249                         if g:
250                             outdir = g.group(1)
251                         g = keepre.match(l)
252                         if g:
253                             keepdir = g.group(1)
254
255                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
256
257                     # check if collection already exists with same owner, name and content
258                     collection_exists = self.arvrunner.api.collections().list(
259                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
260                                  ['portable_data_hash', '=', record["output"]],
261                                  ["name", "=", colname]]
262                     ).execute(num_retries=self.arvrunner.num_retries)
263
264                     if not collection_exists["items"]:
265                         # Create a collection located in the same project as the
266                         # pipeline with the contents of the output.
267                         # First, get output record.
268                         collections = self.arvrunner.api.collections().list(
269                             limit=1,
270                             filters=[['portable_data_hash', '=', record["output"]]],
271                             select=["manifest_text"]
272                         ).execute(num_retries=self.arvrunner.num_retries)
273
274                         if not collections["items"]:
275                             raise WorkflowException(
276                                 "Job output '%s' cannot be found on API server" % (
277                                     record["output"]))
278
279                         # Create new collection in the parent project
280                         # with the output contents.
281                         self.arvrunner.api.collections().create(body={
282                             "owner_uuid": self.arvrunner.project_uuid,
283                             "name": colname,
284                             "portable_data_hash": record["output"],
285                             "manifest_text": collections["items"][0]["manifest_text"]
286                         }, ensure_unique_name=True).execute(
287                             num_retries=self.arvrunner.num_retries)
288
289                     self.builder.outdir = outdir
290                     self.builder.pathmapper.keepdir = keepdir
291                     outputs = self.collect_outputs("keep:" + record["output"])
292             except WorkflowException as e:
293                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
294                 processStatus = "permanentFail"
295             except Exception as e:
296                 logger.exception("Got unknown exception while collecting job outputs:")
297                 processStatus = "permanentFail"
298
299             self.output_callback(outputs, processStatus)
300         finally:
301             del self.arvrunner.jobs[record["uuid"]]
302
303
304 class RunnerJob(object):
305     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
306
307     def __init__(self, runner, tool, job_order, enable_reuse):
308         self.arvrunner = runner
309         self.tool = tool
310         self.job_order = job_order
311         self.running = False
312         self.enable_reuse = enable_reuse
313
314     def update_pipeline_component(self, record):
315         pass
316
317     def upload_docker(self, tool):
318         if isinstance(tool, cwltool.draft2tool.CommandLineTool):
319             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
320             if docker_req:
321                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
322         elif isinstance(tool, cwltool.workflow.Workflow):
323             for s in tool.steps:
324                 self.upload_docker(s.embedded_tool)
325
326     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
327         """Create an Arvados job specification for this workflow.
328
329         The returned dict can be used to create a job (i.e., passed as
330         the +body+ argument to jobs().create()), or as a component in
331         a pipeline template or pipeline instance.
332         """
333         self.upload_docker(self.tool)
334
335         workflowfiles = set()
336         jobfiles = set()
337         workflowfiles.add(self.tool.tool["id"])
338
339         self.name = os.path.basename(self.tool.tool["id"])
340
341         def visitFiles(files, path):
342             files.add(path)
343             return path
344
345         document_loader, _, _ = cwltool.process.get_schema()
346         def loadref(b, u):
347             return document_loader.resolve_ref(u, base_url=b)[0]
348
349         sc = scandeps("", self.tool.tool,
350                       set(("$import", "run")),
351                       set(("$include", "$schemas", "path")),
352                       loadref)
353         adjustFiles(sc, functools.partial(visitFiles, workflowfiles))
354         adjustFiles(self.job_order, functools.partial(visitFiles, jobfiles))
355
356         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
357                                        "%s",
358                                        "%s/%s",
359                                        name=self.name,
360                                        **kwargs)
361
362         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
363                                   "%s",
364                                   "%s/%s",
365                                   name=os.path.basename(self.job_order.get("id", "#")),
366                                   **kwargs)
367
368         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
369
370         if "id" in self.job_order:
371             del self.job_order["id"]
372
373         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
374         return {
375             "script": "cwl-runner",
376             "script_version": "master",
377             "repository": "arvados",
378             "script_parameters": self.job_order,
379             "runtime_constraints": {
380                 "docker_image": "arvados/jobs"
381             }
382         }
383
384     def run(self, *args, **kwargs):
385         job_spec = self.arvados_job_spec(*args, **kwargs)
386         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
387
388         response = self.arvrunner.api.jobs().create(
389             body=job_spec,
390             find_or_create=self.enable_reuse
391         ).execute(num_retries=self.arvrunner.num_retries)
392
393         self.uuid = response["uuid"]
394         self.arvrunner.jobs[self.uuid] = self
395
396         logger.info("Submitted job %s", response["uuid"])
397
398         if response["state"] in ("Complete", "Failed", "Cancelled"):
399             self.done(response)
400
401     def done(self, record):
402         if record["state"] == "Complete":
403             processStatus = "success"
404         else:
405             processStatus = "permanentFail"
406
407         outputs = None
408         try:
409             try:
410                 outc = arvados.collection.Collection(record["output"])
411                 with outc.open("cwl.output.json") as f:
412                     outputs = json.load(f)
413             except Exception as e:
414                 logger.error("While getting final output object: %s", e)
415             self.arvrunner.output_callback(outputs, processStatus)
416         finally:
417             del self.arvrunner.jobs[record["uuid"]]
418
419
420 class RunnerTemplate(object):
421     """An Arvados pipeline template that invokes a CWL workflow."""
422
423     type_to_dataclass = {
424         'boolean': 'boolean',
425         'File': 'File',
426         'float': 'number',
427         'int': 'number',
428         'string': 'text',
429     }
430
431     def __init__(self, runner, tool, job_order, enable_reuse):
432         self.runner = runner
433         self.tool = tool
434         self.job = RunnerJob(
435             runner=runner,
436             tool=tool,
437             job_order=job_order,
438             enable_reuse=enable_reuse)
439
440     def pipeline_component_spec(self):
441         """Return a component that Workbench and a-r-p-i will understand.
442
443         Specifically, translate CWL input specs to Arvados pipeline
444         format, like {"dataclass":"File","value":"xyz"}.
445         """
446         spec = self.job.arvados_job_spec()
447
448         # Most of the component spec is exactly the same as the job
449         # spec (script, script_version, etc.).
450         # spec['script_parameters'] isn't right, though. A component
451         # spec's script_parameters hash is a translation of
452         # self.tool.tool['inputs'] with defaults/overrides taken from
453         # the job order. So we move the job parameters out of the way
454         # and build a new spec['script_parameters'].
455         job_params = spec['script_parameters']
456         spec['script_parameters'] = {}
457
458         for param in self.tool.tool['inputs']:
459             param = copy.deepcopy(param)
460
461             # Data type and "required" flag...
462             types = param['type']
463             if not isinstance(types, list):
464                 types = [types]
465             param['required'] = 'null' not in types
466             non_null_types = set(types) - set(['null'])
467             if len(non_null_types) == 1:
468                 the_type = [c for c in non_null_types][0]
469                 dataclass = self.type_to_dataclass.get(the_type)
470                 if dataclass:
471                     param['dataclass'] = dataclass
472             # Note: If we didn't figure out a single appropriate
473             # dataclass, we just left that attribute out.  We leave
474             # the "type" attribute there in any case, which might help
475             # downstream.
476
477             # Title and description...
478             title = param.pop('label', '')
479             descr = param.pop('description', '').rstrip('\n')
480             if title:
481                 param['title'] = title
482             if descr:
483                 param['description'] = descr
484
485             # Fill in the value from the current job order, if any.
486             param_id = shortname(param.pop('id'))
487             value = job_params.get(param_id)
488             if value is None:
489                 pass
490             elif not isinstance(value, dict):
491                 param['value'] = value
492             elif param.get('dataclass') == 'File' and value.get('path'):
493                 param['value'] = value['path']
494
495             spec['script_parameters'][param_id] = param
496         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
497         return spec
498
499     def save(self):
500         job_spec = self.pipeline_component_spec()
501         response = self.runner.api.pipeline_templates().create(body={
502             "components": {
503                 self.job.name: job_spec,
504             },
505             "name": self.job.name,
506             "owner_uuid": self.runner.project_uuid,
507         }).execute(num_retries=self.runner.num_retries)
508         self.uuid = response["uuid"]
509         logger.info("Created template %s", self.uuid)
510
511
512 class ArvPathMapper(cwltool.pathmapper.PathMapper):
513     """Convert container-local paths to and from Keep collection ids."""
514
515     def __init__(self, arvrunner, referenced_files, basedir,
516                  collection_pattern, file_pattern, name=None, **kwargs):
517         self._pathmap = arvrunner.get_uploaded()
518         uploadfiles = set()
519
520         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
521
522         for src in referenced_files:
523             if isinstance(src, basestring) and pdh_path.match(src):
524                 self._pathmap[src] = (src, collection_pattern % src[5:])
525             if "#" in src:
526                 src = src[:src.index("#")]
527             if src not in self._pathmap:
528                 ab = cwltool.pathmapper.abspath(src, basedir)
529                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
530                 if kwargs.get("conformance_test"):
531                     self._pathmap[src] = (src, ab)
532                 elif isinstance(st, arvados.commands.run.UploadFile):
533                     uploadfiles.add((src, ab, st))
534                 elif isinstance(st, arvados.commands.run.ArvFile):
535                     self._pathmap[src] = (ab, st.fn)
536                 else:
537                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
538
539         if uploadfiles:
540             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
541                                              arvrunner.api,
542                                              dry_run=kwargs.get("dry_run"),
543                                              num_retries=3,
544                                              fnPattern=file_pattern,
545                                              name=name,
546                                              project=arvrunner.project_uuid)
547
548         for src, ab, st in uploadfiles:
549             arvrunner.add_uploaded(src, (ab, st.fn))
550             self._pathmap[src] = (ab, st.fn)
551
552         self.keepdir = None
553
554     def reversemap(self, target):
555         if target.startswith("keep:"):
556             return (target, target)
557         elif self.keepdir and target.startswith(self.keepdir):
558             return (target, "keep:" + target[len(self.keepdir)+1:])
559         else:
560             return super(ArvPathMapper, self).reversemap(target)
561
562
563 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
564     """Wrap cwltool CommandLineTool to override selected methods."""
565
566     def __init__(self, arvrunner, toolpath_object, **kwargs):
567         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
568         self.arvrunner = arvrunner
569
570     def makeJobRunner(self):
571         return ArvadosJob(self.arvrunner)
572
573     def makePathMapper(self, reffiles, input_basedir, **kwargs):
574         return ArvPathMapper(self.arvrunner, reffiles, input_basedir,
575                              "$(task.keep)/%s",
576                              "$(task.keep)/%s/%s",
577                              **kwargs)
578
579
580 class ArvCwlRunner(object):
581     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
582     complete, and report output."""
583
584     def __init__(self, api_client):
585         self.api = api_client
586         self.jobs = {}
587         self.lock = threading.Lock()
588         self.cond = threading.Condition(self.lock)
589         self.final_output = None
590         self.uploaded = {}
591         self.num_retries = 4
592
593     def arvMakeTool(self, toolpath_object, **kwargs):
594         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
595             return ArvadosCommandTool(self, toolpath_object, **kwargs)
596         else:
597             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
598
599     def output_callback(self, out, processStatus):
600         if processStatus == "success":
601             logger.info("Overall job status is %s", processStatus)
602             if self.pipeline:
603                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
604                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
605
606         else:
607             logger.warn("Overall job status is %s", processStatus)
608             if self.pipeline:
609                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
610                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
611         self.final_output = out
612
613     def on_message(self, event):
614         if "object_uuid" in event:
615             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
616                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
617                     uuid = event["object_uuid"]
618                     with self.lock:
619                         j = self.jobs[uuid]
620                         logger.info("Job %s (%s) is Running", j.name, uuid)
621                         j.running = True
622                         j.update_pipeline_component(event["properties"]["new_attributes"])
623                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
624                     uuid = event["object_uuid"]
625                     try:
626                         self.cond.acquire()
627                         j = self.jobs[uuid]
628                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
629                         j.done(event["properties"]["new_attributes"])
630                         self.cond.notify()
631                     finally:
632                         self.cond.release()
633
634     def get_uploaded(self):
635         return self.uploaded.copy()
636
637     def add_uploaded(self, src, pair):
638         self.uploaded[src] = pair
639
640     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
641         self.debug = args.debug
642
643         if args.quiet:
644             logger.setLevel(logging.WARN)
645             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
646
647         useruuid = self.api.users().current().execute()["uuid"]
648         self.project_uuid = args.project_uuid if args.project_uuid else useruuid
649         self.pipeline = None
650
651         if args.create_template:
652             tmpl = RunnerTemplate(self, tool, job_order, args.enable_reuse)
653             tmpl.save()
654             # cwltool.main will write our return value to stdout.
655             return tmpl.uuid
656
657         if args.submit:
658             runnerjob = RunnerJob(self, tool, job_order, args.enable_reuse)
659             if not args.wait:
660                 runnerjob.run()
661                 return runnerjob.uuid
662
663         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
664
665         self.debug = args.debug
666         self.ignore_docker_for_reuse = args.ignore_docker_for_reuse
667         self.fs_access = CollectionFsAccess(input_basedir)
668
669         kwargs["fs_access"] = self.fs_access
670         kwargs["enable_reuse"] = args.enable_reuse
671
672         kwargs["outdir"] = "$(task.outdir)"
673         kwargs["tmpdir"] = "$(task.tmpdir)"
674
675         if kwargs.get("conformance_test"):
676             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
677         else:
678             if args.submit:
679                 jobiter = iter((runnerjob,))
680             else:
681                 components = {}
682                 if "cwl_runner_job" in kwargs:
683                     components[os.path.basename(tool.tool["id"])] = {"job": kwargs["cwl_runner_job"]}
684
685                 self.pipeline = self.api.pipeline_instances().create(
686                     body={
687                         "owner_uuid": self.project_uuid,
688                         "name": shortname(tool.tool["id"]),
689                         "components": components,
690                         "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
691
692                 logger.info("Pipeline instance %s", self.pipeline["uuid"])
693
694                 jobiter = tool.job(job_order,
695                                    input_basedir,
696                                    self.output_callback,
697                                    docker_outdir="$(task.outdir)",
698                                    **kwargs)
699
700             try:
701                 self.cond.acquire()
702                 # Will continue to hold the lock for the duration of this code
703                 # except when in cond.wait(), at which point on_message can update
704                 # job state and process output callbacks.
705
706                 for runnable in jobiter:
707                     if runnable:
708                         runnable.run(**kwargs)
709                     else:
710                         if self.jobs:
711                             self.cond.wait(1)
712                         else:
713                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
714                             break
715
716                 while self.jobs:
717                     self.cond.wait(1)
718
719                 events.close()
720
721                 if self.final_output is None:
722                     raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
723
724                 # create final output collection
725             except:
726                 if sys.exc_info()[0] is KeyboardInterrupt:
727                     logger.error("Interrupted, marking pipeline as failed")
728                 else:
729                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
730                 if self.pipeline:
731                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
732                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
733             finally:
734                 self.cond.release()
735
736             return self.final_output
737
738 def versionstring():
739     """Print version string of key packages for provenance and debugging."""
740
741     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
742     arvpkg = pkg_resources.require("arvados-python-client")
743     cwlpkg = pkg_resources.require("cwltool")
744
745     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
746                                     "arvados-python-client", arvpkg[0].version,
747                                     "cwltool", cwlpkg[0].version)
748
749 def main(args, stdout, stderr, api_client=None):
750     args.insert(0, "--leave-outputs")
751     parser = cwltool.main.arg_parser()
752
753     exgroup = parser.add_mutually_exclusive_group()
754     exgroup.add_argument("--enable-reuse", action="store_true",
755                         default=True, dest="enable_reuse",
756                         help="")
757     exgroup.add_argument("--disable-reuse", action="store_false",
758                         default=True, dest="enable_reuse",
759                         help="")
760
761     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs")
762     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
763                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
764                         default=False)
765
766     exgroup = parser.add_mutually_exclusive_group()
767     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
768                         default=True, dest="submit")
769     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
770                         default=True, dest="submit")
771     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
772
773     exgroup = parser.add_mutually_exclusive_group()
774     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
775                         default=True, dest="wait")
776     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
777                         default=True, dest="wait")
778
779     try:
780         if api_client is None:
781             api_client=arvados.api('v1', model=OrderedJsonModel())
782         runner = ArvCwlRunner(api_client)
783     except Exception as e:
784         logger.error(e)
785         return 1
786
787     return cwltool.main.main(args,
788                              stdout=stdout,
789                              stderr=stderr,
790                              executor=runner.arvExecutor,
791                              makeTool=runner.arvMakeTool,
792                              parser=parser,
793                              versionfunc=versionstring)