Merge branch '10081-update-cwl-runner' refs #10081
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 # Crunch script integration for running arvados-cwl-runner (importing
4 # arvados_cwl module) inside a crunch job.
5 #
6 # This gets the job record, transforms the script parameters into a valid CWL
7 # input object, then executes the CWL runner to run the underlying workflow or
8 # tool.  When the workflow completes, record the output object in an output
9 # collection for this runner job.
10
11 import arvados
12 import arvados_cwl
13 import arvados.collection
14 import arvados.util
15 from cwltool.process import shortname
16 import cwltool.main
17 import logging
18 import os
19 import json
20 import argparse
21 import re
22 from arvados.api import OrderedJsonModel
23 from cwltool.process import adjustFileObjs, adjustDirObjs
24 from cwltool.load_tool import load_tool
25
26 # Print package versions
27 logging.info(cwltool.main.versionstring())
28
29 api = arvados.api("v1")
30
31 try:
32     job_order_object = arvados.current_job()['script_parameters']
33
34     pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
35
36     def keeppath(v):
37         if pdh_path.match(v):
38             return "keep:%s" % v
39         else:
40             return v
41
42     def keeppathObj(v):
43         v["location"] = keeppath(v["location"])
44
45     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
46
47     for k,v in job_order_object.items():
48         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
49             job_order_object[k] = {
50                 "class": "File",
51                 "location": "keep:%s" % v
52             }
53
54     adjustFileObjs(job_order_object, keeppathObj)
55     adjustDirObjs(job_order_object, keeppathObj)
56
57     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
58
59     t = load_tool(job_order_object, runner.arv_make_tool)
60
61     args = argparse.Namespace()
62     args.project_uuid = arvados.current_job()["owner_uuid"]
63     args.enable_reuse = True
64     args.submit = False
65     args.debug = True
66     args.quiet = False
67     args.ignore_docker_for_reuse = False
68     args.basedir = os.getcwd()
69     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
70     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
71
72     files = {}
73     def capture(fileobj):
74         path = fileobj["location"]
75         sp = path.split("/")
76         col = sp[0][5:]
77         if col not in files:
78             files[col] = set()
79         files[col].add("/".join(sp[1:]))
80         fileobj["location"] = path
81
82     adjustFileObjs(outputObj, capture)
83
84     final = arvados.collection.Collection()
85
86     for k,v in files.iteritems():
87         with arvados.collection.Collection(k) as c:
88             for f in c:
89                 final.copy(f, f, c, True)
90
91     def makeRelative(fileobj):
92         fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
93
94     adjustFileObjs(outputObj, makeRelative)
95
96     with final.open("cwl.output.json", "w") as f:
97         json.dump(outputObj, f, indent=4)
98
99     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
100                                          body={
101                                              'output': final.save_new(create_collection_record=False),
102                                              'success': True,
103                                              'progress':1.0
104                                          }).execute()
105 except Exception as e:
106     logging.exception("Unhandled exception")
107     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
108                                          body={
109                                              'output': None,
110                                              'success': False,
111                                              'progress':1.0
112                                          }).execute()