Merge branch '10165-cwl-outputs' closes #10165
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 # Crunch script integration for running arvados-cwl-runner (importing
4 # arvados_cwl module) inside a crunch job.
5 #
6 # This gets the job record, transforms the script parameters into a valid CWL
7 # input object, then executes the CWL runner to run the underlying workflow or
8 # tool.  When the workflow completes, record the output object in an output
9 # collection for this runner job.
10
11 import arvados
12 import arvados_cwl
13 import arvados.collection
14 import arvados.util
15 import cwltool.main
16 import logging
17 import os
18 import json
19 import argparse
20 import re
21 import functools
22
23 from arvados.api import OrderedJsonModel
24 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
25 from cwltool.load_tool import load_tool
26
27 # Print package versions
28 logging.info(cwltool.main.versionstring())
29
30 api = arvados.api("v1")
31
32 try:
33     job_order_object = arvados.current_job()['script_parameters']
34
35     pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
36
37     def keeppath(v):
38         if pdh_path.match(v):
39             return "keep:%s" % v
40         else:
41             return v
42
43     def keeppathObj(v):
44         v["location"] = keeppath(v["location"])
45
46     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
47
48     for k,v in job_order_object.items():
49         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
50             job_order_object[k] = {
51                 "class": "File",
52                 "location": "keep:%s" % v
53             }
54
55     adjustFileObjs(job_order_object, keeppathObj)
56     adjustDirObjs(job_order_object, keeppathObj)
57     normalizeFilesDirs(job_order_object)
58     adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
59
60     output_name = None
61     if "arv:output_name" in job_order_object:
62         output_name = job_order_object["arv:output_name"]
63         del job_order_object["arv:output_name"]
64
65     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
66                                       output_name=output_name)
67
68     t = load_tool(job_order_object, runner.arv_make_tool)
69
70     args = argparse.Namespace()
71     args.project_uuid = arvados.current_job()["owner_uuid"]
72     args.enable_reuse = True
73     args.submit = False
74     args.debug = True
75     args.quiet = False
76     args.ignore_docker_for_reuse = False
77     args.basedir = os.getcwd()
78     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
79     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
80
81     if runner.final_output_collection:
82         outputCollection = runner.final_output_collection.portable_data_hash()
83     else:
84         outputCollection = None
85
86     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
87                                          body={
88                                              'output': outputCollection,
89                                              'success': True,
90                                              'progress':1.0
91                                          }).execute()
92 except Exception as e:
93     logging.exception("Unhandled exception")
94     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
95                                          body={
96                                              'output': None,
97                                              'success': False,
98                                              'progress':1.0
99                                          }).execute()