Bugfix submitting cwl jobs with arvados-cwl-runner refs #9275
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 # Implement cwl-runner interface for submitting and running jobs on Arvados.
4
5 import argparse
6 import arvados
7 import arvados.collection
8 import arvados.commands.keepdocker
9 import arvados.commands.run
10 import arvados.events
11 import arvados.util
12 import copy
13 import cwltool.docker
14 from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
15 from cwltool.errors import WorkflowException
16 import cwltool.main
17 import cwltool.workflow
18 import fnmatch
19 from functools import partial
20 import json
21 import logging
22 import os
23 import pkg_resources  # part of setuptools
24 import re
25 import sys
26 import threading
27 from cwltool.load_tool import fetch_document
28 from cwltool.builder import Builder
29 import urlparse
30
31 from cwltool.process import shortname, get_feature, adjustFiles, adjustFileObjs, scandeps
32 from arvados.api import OrderedJsonModel
33
34 logger = logging.getLogger('arvados.cwl-runner')
35 logger.setLevel(logging.INFO)
36
37 tmpdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.tmpdir\)=(.*)")
38 outdirre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.outdir\)=(.*)")
39 keepre = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.keep\)=(.*)")
40
41
42 def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid):
43     """Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
44
45     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
46         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
47
48     sp = dockerRequirement["dockerImageId"].split(":")
49     image_name = sp[0]
50     image_tag = sp[1] if len(sp) > 1 else None
51
52     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
53                                                             image_name=image_name,
54                                                             image_tag=image_tag)
55
56     if not images:
57         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
58         args = ["--project-uuid="+project_uuid, image_name]
59         if image_tag:
60             args.append(image_tag)
61         logger.info("Uploading Docker image %s", ":".join(args[1:]))
62         arvados.commands.keepdocker.main(args, stdout=sys.stderr)
63
64     return dockerRequirement["dockerImageId"]
65
66
67 class CollectionFsAccess(cwltool.process.StdFsAccess):
68     """Implement the cwltool FsAccess interface for Arvados Collections."""
69
70     def __init__(self, basedir):
71         super(CollectionFsAccess, self).__init__(basedir)
72         self.collections = {}
73
74     def get_collection(self, path):
75         p = path.split("/")
76         if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
77             pdh = p[0][5:]
78             if pdh not in self.collections:
79                 self.collections[pdh] = arvados.collection.CollectionReader(pdh)
80             return (self.collections[pdh], "/".join(p[1:]))
81         else:
82             return (None, path)
83
84     def _match(self, collection, patternsegments, parent):
85         if not patternsegments:
86             return []
87
88         if not isinstance(collection, arvados.collection.RichCollectionBase):
89             return []
90
91         ret = []
92         # iterate over the files and subcollections in 'collection'
93         for filename in collection:
94             if patternsegments[0] == '.':
95                 # Pattern contains something like "./foo" so just shift
96                 # past the "./"
97                 ret.extend(self._match(collection, patternsegments[1:], parent))
98             elif fnmatch.fnmatch(filename, patternsegments[0]):
99                 cur = os.path.join(parent, filename)
100                 if len(patternsegments) == 1:
101                     ret.append(cur)
102                 else:
103                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
104         return ret
105
106     def glob(self, pattern):
107         collection, rest = self.get_collection(pattern)
108         patternsegments = rest.split("/")
109         return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
110
111     def open(self, fn, mode):
112         collection, rest = self.get_collection(fn)
113         if collection:
114             return collection.open(rest, mode)
115         else:
116             return open(self._abs(fn), mode)
117
118     def exists(self, fn):
119         collection, rest = self.get_collection(fn)
120         if collection:
121             return collection.exists(rest)
122         else:
123             return os.path.exists(self._abs(fn))
124
125 class ArvadosJob(object):
126     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
127
128     def __init__(self, runner):
129         self.arvrunner = runner
130         self.running = False
131
132     def run(self, dry_run=False, pull_image=True, **kwargs):
133         script_parameters = {
134             "command": self.command_line
135         }
136         runtime_constraints = {}
137
138         if self.generatefiles:
139             vwd = arvados.collection.Collection()
140             script_parameters["task.vwd"] = {}
141             for t in self.generatefiles:
142                 if isinstance(self.generatefiles[t], dict):
143                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
144                     vwd.copy(rest, t, source_collection=src)
145                 else:
146                     with vwd.open(t, "w") as f:
147                         f.write(self.generatefiles[t])
148             vwd.save_new()
149             for t in self.generatefiles:
150                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
151
152         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
153         if self.environment:
154             script_parameters["task.env"].update(self.environment)
155
156         if self.stdin:
157             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
158
159         if self.stdout:
160             script_parameters["task.stdout"] = self.stdout
161
162         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
163         if docker_req and kwargs.get("use_container") is not False:
164             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
165         else:
166             runtime_constraints["docker_image"] = "arvados/jobs"
167
168         resources = self.builder.resources
169         if resources is not None:
170             runtime_constraints["min_cores_per_node"] = resources.get("cores", 1)
171             runtime_constraints["min_ram_mb_per_node"] = resources.get("ram")
172             runtime_constraints["min_scratch_mb_per_node"] = resources.get("tmpdirSize", 0) + resources.get("outdirSize", 0)
173
174         filters = [["repository", "=", "arvados"],
175                    ["script", "=", "crunchrunner"],
176                    ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
177         if not self.arvrunner.ignore_docker_for_reuse:
178             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
179
180         try:
181             response = self.arvrunner.api.jobs().create(
182                 body={
183                     "owner_uuid": self.arvrunner.project_uuid,
184                     "script": "crunchrunner",
185                     "repository": "arvados",
186                     "script_version": "master",
187                     "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
188                     "script_parameters": {"tasks": [script_parameters]},
189                     "runtime_constraints": runtime_constraints
190                 },
191                 filters=filters,
192                 find_or_create=kwargs.get("enable_reuse", True)
193             ).execute(num_retries=self.arvrunner.num_retries)
194
195             self.arvrunner.jobs[response["uuid"]] = self
196
197             self.update_pipeline_component(response)
198
199             logger.info("Job %s (%s) is %s", self.name, response["uuid"], response["state"])
200
201             if response["state"] in ("Complete", "Failed", "Cancelled"):
202                 self.done(response)
203         except Exception as e:
204             logger.error("Got error %s" % str(e))
205             self.output_callback({}, "permanentFail")
206
207     def update_pipeline_component(self, record):
208         if self.arvrunner.pipeline:
209             self.arvrunner.pipeline["components"][self.name] = {"job": record}
210             self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(uuid=self.arvrunner.pipeline["uuid"],
211                                                                                  body={
212                                                                                     "components": self.arvrunner.pipeline["components"]
213                                                                                  }).execute(num_retries=self.arvrunner.num_retries)
214         if self.arvrunner.uuid:
215             try:
216                 job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
217                 if job:
218                     components = job["components"]
219                     components[self.name] = record["uuid"]
220                     self.arvrunner.api.jobs().update(uuid=self.arvrunner.uuid,
221                         body={
222                             "components": components
223                         }).execute(num_retries=self.arvrunner.num_retries)
224             except Exception as e:
225                 logger.info("Error adding to components: %s", e)
226
227     def done(self, record):
228         try:
229             self.update_pipeline_component(record)
230         except:
231             pass
232
233         try:
234             if record["state"] == "Complete":
235                 processStatus = "success"
236             else:
237                 processStatus = "permanentFail"
238
239             try:
240                 outputs = {}
241                 if record["output"]:
242                     logc = arvados.collection.Collection(record["log"])
243                     log = logc.open(logc.keys()[0])
244                     tmpdir = None
245                     outdir = None
246                     keepdir = None
247                     for l in log:
248                         # Determine the tmpdir, outdir and keepdir paths from
249                         # the job run.  Unfortunately, we can't take the first
250                         # values we find (which are expected to be near the
251                         # top) and stop scanning because if the node fails and
252                         # the job restarts on a different node these values
253                         # will different runs, and we need to know about the
254                         # final run that actually produced output.
255
256                         g = tmpdirre.match(l)
257                         if g:
258                             tmpdir = g.group(1)
259                         g = outdirre.match(l)
260                         if g:
261                             outdir = g.group(1)
262                         g = keepre.match(l)
263                         if g:
264                             keepdir = g.group(1)
265
266                     colname = "Output %s of %s" % (record["output"][0:7], self.name)
267
268                     # check if collection already exists with same owner, name and content
269                     collection_exists = self.arvrunner.api.collections().list(
270                         filters=[["owner_uuid", "=", self.arvrunner.project_uuid],
271                                  ['portable_data_hash', '=', record["output"]],
272                                  ["name", "=", colname]]
273                     ).execute(num_retries=self.arvrunner.num_retries)
274
275                     if not collection_exists["items"]:
276                         # Create a collection located in the same project as the
277                         # pipeline with the contents of the output.
278                         # First, get output record.
279                         collections = self.arvrunner.api.collections().list(
280                             limit=1,
281                             filters=[['portable_data_hash', '=', record["output"]]],
282                             select=["manifest_text"]
283                         ).execute(num_retries=self.arvrunner.num_retries)
284
285                         if not collections["items"]:
286                             raise WorkflowException(
287                                 "Job output '%s' cannot be found on API server" % (
288                                     record["output"]))
289
290                         # Create new collection in the parent project
291                         # with the output contents.
292                         self.arvrunner.api.collections().create(body={
293                             "owner_uuid": self.arvrunner.project_uuid,
294                             "name": colname,
295                             "portable_data_hash": record["output"],
296                             "manifest_text": collections["items"][0]["manifest_text"]
297                         }, ensure_unique_name=True).execute(
298                             num_retries=self.arvrunner.num_retries)
299
300                     self.builder.outdir = outdir
301                     self.builder.pathmapper.keepdir = keepdir
302                     outputs = self.collect_outputs("keep:" + record["output"])
303             except WorkflowException as e:
304                 logger.error("Error while collecting job outputs:\n%s", e, exc_info=(e if self.arvrunner.debug else False))
305                 processStatus = "permanentFail"
306             except Exception as e:
307                 logger.exception("Got unknown exception while collecting job outputs:")
308                 processStatus = "permanentFail"
309
310             self.output_callback(outputs, processStatus)
311         finally:
312             del self.arvrunner.jobs[record["uuid"]]
313
314
315 class RunnerJob(object):
316     """Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
317
318     def __init__(self, runner, tool, job_order, enable_reuse):
319         self.arvrunner = runner
320         self.tool = tool
321         self.job_order = job_order
322         self.running = False
323         self.enable_reuse = enable_reuse
324
325     def update_pipeline_component(self, record):
326         pass
327
328     def upload_docker(self, tool):
329         if isinstance(tool, CommandLineTool):
330             (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
331             if docker_req:
332                 arv_docker_get_image(self.arvrunner.api, docker_req, True, self.arvrunner.project_uuid)
333         elif isinstance(tool, cwltool.workflow.Workflow):
334             for s in tool.steps:
335                 self.upload_docker(s.embedded_tool)
336
337     def arvados_job_spec(self, dry_run=False, pull_image=True, **kwargs):
338         """Create an Arvados job specification for this workflow.
339
340         The returned dict can be used to create a job (i.e., passed as
341         the +body+ argument to jobs().create()), or as a component in
342         a pipeline template or pipeline instance.
343         """
344         self.upload_docker(self.tool)
345
346         workflowfiles = set()
347         jobfiles = set()
348         workflowfiles.add(self.tool.tool["id"])
349
350         self.name = os.path.basename(self.tool.tool["id"])
351
352         def visitFiles(files, path):
353             files.add(path)
354             return path
355
356         document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
357         def loadref(b, u):
358             return document_loader.fetch(urlparse.urljoin(b, u))
359
360         sc = scandeps(uri, workflowobj,
361                       set(("$import", "run")),
362                       set(("$include", "$schemas", "path")),
363                       loadref)
364         adjustFiles(sc, partial(visitFiles, workflowfiles))
365         adjustFiles(self.job_order, partial(visitFiles, jobfiles))
366
367         workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
368                                        "%s",
369                                        "%s/%s",
370                                        name=self.name,
371                                        **kwargs)
372
373         jobmapper = ArvPathMapper(self.arvrunner, jobfiles, "",
374                                   "%s",
375                                   "%s/%s",
376                                   name=os.path.basename(self.job_order.get("id", "#")),
377                                   **kwargs)
378
379         adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
380
381         if "id" in self.job_order:
382             del self.job_order["id"]
383
384         self.job_order["cwl:tool"] = workflowmapper.mapper(self.tool.tool["id"])[1]
385         return {
386             "script": "cwl-runner",
387             "script_version": "master",
388             "repository": "arvados",
389             "script_parameters": self.job_order,
390             "runtime_constraints": {
391                 "docker_image": "arvados/jobs"
392             }
393         }
394
395     def run(self, *args, **kwargs):
396         job_spec = self.arvados_job_spec(*args, **kwargs)
397         job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
398
399         response = self.arvrunner.api.jobs().create(
400             body=job_spec,
401             find_or_create=self.enable_reuse
402         ).execute(num_retries=self.arvrunner.num_retries)
403
404         self.uuid = response["uuid"]
405         self.arvrunner.jobs[self.uuid] = self
406
407         logger.info("Submitted job %s", response["uuid"])
408
409         if kwargs.get("submit"):
410             self.pipeline = self.arvrunner.api.pipeline_instances().create(
411                 body={
412                     "owner_uuid": self.arvrunner.project_uuid,
413                     "name": shortname(self.tool.tool["id"]),
414                     "components": {"cwl-runner": {"job": {"uuid": self.uuid, "state": response["state"]} } },
415                     "state": "RunningOnClient"}).execute(num_retries=self.arvrunner.num_retries)
416
417         if response["state"] in ("Complete", "Failed", "Cancelled"):
418             self.done(response)
419
420     def done(self, record):
421         if record["state"] == "Complete":
422             processStatus = "success"
423         else:
424             processStatus = "permanentFail"
425
426         outputs = None
427         try:
428             try:
429                 outc = arvados.collection.Collection(record["output"])
430                 with outc.open("cwl.output.json") as f:
431                     outputs = json.load(f)
432                 def keepify(path):
433                     if not path.startswith("keep:"):
434                         return "keep:%s/%s" % (record["output"], path)
435                 adjustFiles(outputs, keepify)
436             except Exception as e:
437                 logger.error("While getting final output object: %s", e)
438             self.arvrunner.output_callback(outputs, processStatus)
439         finally:
440             del self.arvrunner.jobs[record["uuid"]]
441
442
443 class RunnerTemplate(object):
444     """An Arvados pipeline template that invokes a CWL workflow."""
445
446     type_to_dataclass = {
447         'boolean': 'boolean',
448         'File': 'File',
449         'float': 'number',
450         'int': 'number',
451         'string': 'text',
452     }
453
454     def __init__(self, runner, tool, job_order, enable_reuse):
455         self.runner = runner
456         self.tool = tool
457         self.job = RunnerJob(
458             runner=runner,
459             tool=tool,
460             job_order=job_order,
461             enable_reuse=enable_reuse)
462
463     def pipeline_component_spec(self):
464         """Return a component that Workbench and a-r-p-i will understand.
465
466         Specifically, translate CWL input specs to Arvados pipeline
467         format, like {"dataclass":"File","value":"xyz"}.
468         """
469         spec = self.job.arvados_job_spec()
470
471         # Most of the component spec is exactly the same as the job
472         # spec (script, script_version, etc.).
473         # spec['script_parameters'] isn't right, though. A component
474         # spec's script_parameters hash is a translation of
475         # self.tool.tool['inputs'] with defaults/overrides taken from
476         # the job order. So we move the job parameters out of the way
477         # and build a new spec['script_parameters'].
478         job_params = spec['script_parameters']
479         spec['script_parameters'] = {}
480
481         for param in self.tool.tool['inputs']:
482             param = copy.deepcopy(param)
483
484             # Data type and "required" flag...
485             types = param['type']
486             if not isinstance(types, list):
487                 types = [types]
488             param['required'] = 'null' not in types
489             non_null_types = set(types) - set(['null'])
490             if len(non_null_types) == 1:
491                 the_type = [c for c in non_null_types][0]
492                 dataclass = self.type_to_dataclass.get(the_type)
493                 if dataclass:
494                     param['dataclass'] = dataclass
495             # Note: If we didn't figure out a single appropriate
496             # dataclass, we just left that attribute out.  We leave
497             # the "type" attribute there in any case, which might help
498             # downstream.
499
500             # Title and description...
501             title = param.pop('label', '')
502             descr = param.pop('description', '').rstrip('\n')
503             if title:
504                 param['title'] = title
505             if descr:
506                 param['description'] = descr
507
508             # Fill in the value from the current job order, if any.
509             param_id = shortname(param.pop('id'))
510             value = job_params.get(param_id)
511             if value is None:
512                 pass
513             elif not isinstance(value, dict):
514                 param['value'] = value
515             elif param.get('dataclass') == 'File' and value.get('path'):
516                 param['value'] = value['path']
517
518             spec['script_parameters'][param_id] = param
519         spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
520         return spec
521
522     def save(self):
523         job_spec = self.pipeline_component_spec()
524         response = self.runner.api.pipeline_templates().create(body={
525             "components": {
526                 self.job.name: job_spec,
527             },
528             "name": self.job.name,
529             "owner_uuid": self.runner.project_uuid,
530         }, ensure_unique_name=True).execute(num_retries=self.runner.num_retries)
531         self.uuid = response["uuid"]
532         logger.info("Created template %s", self.uuid)
533
534
535 class ArvPathMapper(cwltool.pathmapper.PathMapper):
536     """Convert container-local paths to and from Keep collection ids."""
537
538     def __init__(self, arvrunner, referenced_files, input_basedir,
539                  collection_pattern, file_pattern, name=None, **kwargs):
540         self._pathmap = arvrunner.get_uploaded()
541         uploadfiles = set()
542
543         pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
544
545         for src in referenced_files:
546             if isinstance(src, basestring) and pdh_path.match(src):
547                 self._pathmap[src] = (src, collection_pattern % src[5:])
548             if "#" in src:
549                 src = src[:src.index("#")]
550             if src not in self._pathmap:
551                 ab = cwltool.pathmapper.abspath(src, input_basedir)
552                 st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
553                 if kwargs.get("conformance_test"):
554                     self._pathmap[src] = (src, ab)
555                 elif isinstance(st, arvados.commands.run.UploadFile):
556                     uploadfiles.add((src, ab, st))
557                 elif isinstance(st, arvados.commands.run.ArvFile):
558                     self._pathmap[src] = (ab, st.fn)
559                 else:
560                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
561
562         if uploadfiles:
563             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
564                                              arvrunner.api,
565                                              dry_run=kwargs.get("dry_run"),
566                                              num_retries=3,
567                                              fnPattern=file_pattern,
568                                              name=name,
569                                              project=arvrunner.project_uuid)
570
571         for src, ab, st in uploadfiles:
572             arvrunner.add_uploaded(src, (ab, st.fn))
573             self._pathmap[src] = (ab, st.fn)
574
575         self.keepdir = None
576
577     def reversemap(self, target):
578         if target.startswith("keep:"):
579             return (target, target)
580         elif self.keepdir and target.startswith(self.keepdir):
581             return (target, "keep:" + target[len(self.keepdir)+1:])
582         else:
583             return super(ArvPathMapper, self).reversemap(target)
584
585
586 class ArvadosCommandTool(CommandLineTool):
587     """Wrap cwltool CommandLineTool to override selected methods."""
588
589     def __init__(self, arvrunner, toolpath_object, **kwargs):
590         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
591         self.arvrunner = arvrunner
592
593     def makeJobRunner(self):
594         return ArvadosJob(self.arvrunner)
595
596     def makePathMapper(self, reffiles, **kwargs):
597         return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
598                              "$(task.keep)/%s",
599                              "$(task.keep)/%s/%s",
600                              **kwargs)
601
602
603 class ArvCwlRunner(object):
604     """Execute a CWL tool or workflow, submit crunch jobs, wait for them to
605     complete, and report output."""
606
607     def __init__(self, api_client):
608         self.api = api_client
609         self.jobs = {}
610         self.lock = threading.Lock()
611         self.cond = threading.Condition(self.lock)
612         self.final_output = None
613         self.uploaded = {}
614         self.num_retries = 4
615         self.uuid = None
616
617     def arvMakeTool(self, toolpath_object, **kwargs):
618         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
619             return ArvadosCommandTool(self, toolpath_object, **kwargs)
620         else:
621             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
622
623     def output_callback(self, out, processStatus):
624         if processStatus == "success":
625             logger.info("Overall job status is %s", processStatus)
626             if self.pipeline:
627                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
628                                                      body={"state": "Complete"}).execute(num_retries=self.num_retries)
629
630         else:
631             logger.warn("Overall job status is %s", processStatus)
632             if self.pipeline:
633                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
634                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
635         self.final_output = out
636
637     def on_message(self, event):
638         if "object_uuid" in event:
639             if event["object_uuid"] in self.jobs and event["event_type"] == "update":
640                 if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
641                     uuid = event["object_uuid"]
642                     with self.lock:
643                         j = self.jobs[uuid]
644                         logger.info("Job %s (%s) is Running", j.name, uuid)
645                         j.running = True
646                         j.update_pipeline_component(event["properties"]["new_attributes"])
647                 elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
648                     uuid = event["object_uuid"]
649                     try:
650                         self.cond.acquire()
651                         j = self.jobs[uuid]
652                         logger.info("Job %s (%s) is %s", j.name, uuid, event["properties"]["new_attributes"]["state"])
653                         j.done(event["properties"]["new_attributes"])
654                         self.cond.notify()
655                     finally:
656                         self.cond.release()
657
658     def get_uploaded(self):
659         return self.uploaded.copy()
660
661     def add_uploaded(self, src, pair):
662         self.uploaded[src] = pair
663
664     def arvExecutor(self, tool, job_order, **kwargs):
665         self.debug = kwargs.get("debug")
666
667         if kwargs.get("quiet"):
668             logger.setLevel(logging.WARN)
669             logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
670
671         useruuid = self.api.users().current().execute()["uuid"]
672         self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
673         self.pipeline = None
674
675         if kwargs.get("create_template"):
676             tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
677             tmpl.save()
678             # cwltool.main will write our return value to stdout.
679             return tmpl.uuid
680
681         if kwargs.get("submit"):
682             runnerjob = RunnerJob(self, tool, job_order, kwargs.get("enable_reuse"))
683
684         if not kwargs.get("submit") and "cwl_runner_job" not in kwargs:
685             # Create pipeline for local run
686             self.pipeline = self.api.pipeline_instances().create(
687                 body={
688                     "owner_uuid": self.project_uuid,
689                     "name": shortname(tool.tool["id"]),
690                     "components": {},
691                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
692             logger.info("Pipeline instance %s", self.pipeline["uuid"])
693
694         if kwargs.get("submit") and not kwargs.get("wait"):
695                 runnerjob.run()
696                 return runnerjob.uuid
697
698         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
699
700         self.debug = kwargs.get("debug")
701         self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
702         self.fs_access = CollectionFsAccess(kwargs["basedir"])
703
704         kwargs["fs_access"] = self.fs_access
705         kwargs["enable_reuse"] = kwargs.get("enable_reuse")
706
707         kwargs["outdir"] = "$(task.outdir)"
708         kwargs["tmpdir"] = "$(task.tmpdir)"
709
710         if kwargs.get("conformance_test"):
711             return cwltool.main.single_job_executor(tool, job_order, **kwargs)
712         else:
713             if kwargs.get("submit"):
714                 jobiter = iter((runnerjob,))
715             else:
716                 if "cwl_runner_job" in kwargs:
717                     self.uuid = kwargs.get("cwl_runner_job").get('uuid')
718                 jobiter = tool.job(job_order,
719                                    self.output_callback,
720                                    docker_outdir="$(task.outdir)",
721                                    **kwargs)
722
723             try:
724                 self.cond.acquire()
725                 # Will continue to hold the lock for the duration of this code
726                 # except when in cond.wait(), at which point on_message can update
727                 # job state and process output callbacks.
728
729                 for runnable in jobiter:
730                     if runnable:
731                         runnable.run(**kwargs)
732                     else:
733                         if self.jobs:
734                             self.cond.wait(1)
735                         else:
736                             logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
737                             break
738
739                 while self.jobs:
740                     self.cond.wait(1)
741
742                 events.close()
743             except:
744                 if sys.exc_info()[0] is KeyboardInterrupt:
745                     logger.error("Interrupted, marking pipeline as failed")
746                 else:
747                     logger.error("Caught unhandled exception, marking pipeline as failed.  Error was: %s", sys.exc_info()[0], exc_info=(sys.exc_info()[1] if self.debug else False))
748                 if self.pipeline:
749                     self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
750                                                          body={"state": "Failed"}).execute(num_retries=self.num_retries)
751             finally:
752                 self.cond.release()
753
754             if self.final_output is None:
755                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
756
757             return self.final_output
758
759 def versionstring():
760     """Print version string of key packages for provenance and debugging."""
761
762     arvcwlpkg = pkg_resources.require("arvados-cwl-runner")
763     arvpkg = pkg_resources.require("arvados-python-client")
764     cwlpkg = pkg_resources.require("cwltool")
765
766     return "%s %s, %s %s, %s %s" % (sys.argv[0], arvcwlpkg[0].version,
767                                     "arvados-python-client", arvpkg[0].version,
768                                     "cwltool", cwlpkg[0].version)
769
770 def arg_parser():  # type: () -> argparse.ArgumentParser
771     parser = argparse.ArgumentParser(description='Arvados executor for Common Workflow Language')
772
773     parser.add_argument("--conformance-test", action="store_true")
774     parser.add_argument("--basedir", type=str,
775                         help="Base directory used to resolve relative references in the input, default to directory of input object file or current directory (if inputs piped/provided on command line).")
776     parser.add_argument("--outdir", type=str, default=os.path.abspath('.'),
777                         help="Output directory, default current directory")
778
779     parser.add_argument("--eval-timeout",
780                         help="Time to wait for a Javascript expression to evaluate before giving an error, default 20s.",
781                         type=float,
782                         default=20)
783     parser.add_argument("--version", action="store_true", help="Print version and exit")
784
785     exgroup = parser.add_mutually_exclusive_group()
786     exgroup.add_argument("--verbose", action="store_true", help="Default logging")
787     exgroup.add_argument("--quiet", action="store_true", help="Only print warnings and errors.")
788     exgroup.add_argument("--debug", action="store_true", help="Print even more logging")
789
790     parser.add_argument("--tool-help", action="store_true", help="Print command line help for tool")
791
792     exgroup = parser.add_mutually_exclusive_group()
793     exgroup.add_argument("--enable-reuse", action="store_true",
794                         default=True, dest="enable_reuse",
795                         help="")
796     exgroup.add_argument("--disable-reuse", action="store_false",
797                         default=True, dest="enable_reuse",
798                         help="")
799
800     parser.add_argument("--project-uuid", type=str, help="Project that will own the workflow jobs, if not provided, will go to home project.")
801     parser.add_argument("--ignore-docker-for-reuse", action="store_true",
802                         help="Ignore Docker image version when deciding whether to reuse past jobs.",
803                         default=False)
804
805     exgroup = parser.add_mutually_exclusive_group()
806     exgroup.add_argument("--submit", action="store_true", help="Submit workflow to run on Arvados.",
807                         default=True, dest="submit")
808     exgroup.add_argument("--local", action="store_false", help="Run workflow on local host (submits jobs to Arvados).",
809                         default=True, dest="submit")
810     exgroup.add_argument("--create-template", action="store_true", help="Create an Arvados pipeline template.")
811
812     exgroup = parser.add_mutually_exclusive_group()
813     exgroup.add_argument("--wait", action="store_true", help="After submitting workflow runner job, wait for completion.",
814                         default=True, dest="wait")
815     exgroup.add_argument("--no-wait", action="store_false", help="Submit workflow runner job and exit.",
816                         default=True, dest="wait")
817
818     parser.add_argument("workflow", type=str, nargs="?", default=None)
819     parser.add_argument("job_order", nargs=argparse.REMAINDER)
820
821     return parser
822
823 def main(args, stdout, stderr, api_client=None):
824     parser = arg_parser()
825
826     job_order_object = None
827     arvargs = parser.parse_args(args)
828     if arvargs.create_template and not arvargs.job_order:
829         job_order_object = ({}, "")
830
831     try:
832         if api_client is None:
833             api_client=arvados.api('v1', model=OrderedJsonModel())
834         runner = ArvCwlRunner(api_client)
835     except Exception as e:
836         logger.error(e)
837         return 1
838
839     return cwltool.main.main(args=arvargs,
840                              stdout=stdout,
841                              stderr=stderr,
842                              executor=runner.arvExecutor,
843                              makeTool=runner.arvMakeTool,
844                              versionfunc=versionstring,
845                              job_order_object=job_order_object)