Merge branch '9570-cwl-runner-fixes' closes #9570 (again)
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 # Crunch script integration for running arvados-cwl-runner (importing
4 # arvados_cwl module) inside a crunch job.
5 #
6 # This gets the job record, transforms the script parameters into a valid CWL
7 # input object, then executes the CWL runner to run the underlying workflow or
8 # tool.  When the workflow completes, record the output object in an output
9 # collection for this runner job.
10
11 import arvados
12 import arvados_cwl
13 import arvados.collection
14 import arvados.util
15 from cwltool.process import shortname
16 import cwltool.main
17 import logging
18 import os
19 import json
20 import argparse
21 from arvados.api import OrderedJsonModel
22 from cwltool.process import adjustFileObjs
23 from cwltool.load_tool import load_tool
24
25 # Print package versions
26 logging.info(cwltool.main.versionstring())
27
28 api = arvados.api("v1")
29
30 try:
31     job_order_object = arvados.current_job()['script_parameters']
32
33     def keeppath(v):
34         if arvados.util.keep_locator_pattern.match(v):
35             return "keep:%s" % v
36         else:
37             return v
38
39     def keeppathObj(v):
40         v["location"] = keeppath(v["location"])
41
42     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
43
44     for k,v in job_order_object.items():
45         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
46             job_order_object[k] = {
47                 "class": "File",
48                 "location": "keep:%s" % v
49             }
50
51     adjustFileObjs(job_order_object, keeppathObj)
52
53     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
54
55     t = load_tool(job_order_object, runner.arvMakeTool)
56
57     args = argparse.Namespace()
58     args.project_uuid = arvados.current_job()["owner_uuid"]
59     args.enable_reuse = True
60     args.submit = False
61     args.debug = True
62     args.quiet = False
63     args.ignore_docker_for_reuse = False
64     args.basedir = os.getcwd()
65     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
66     outputObj = runner.arvExecutor(t, job_order_object, **vars(args))
67
68     files = {}
69     def capture(fileobj):
70         path = fileobj["location"]
71         sp = path.split("/")
72         col = sp[0][5:]
73         if col not in files:
74             files[col] = set()
75         files[col].add("/".join(sp[1:]))
76         fileobj["location"] = path
77
78     adjustFileObjs(outputObj, capture)
79
80     final = arvados.collection.Collection()
81
82     for k,v in files.iteritems():
83         with arvados.collection.Collection(k) as c:
84             for f in c:
85                 final.copy(f, f, c, True)
86
87     def makeRelative(fileobj):
88         fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
89
90     adjustFileObjs(outputObj, makeRelative)
91
92     with final.open("cwl.output.json", "w") as f:
93         json.dump(outputObj, f, indent=4)
94
95     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
96                                          body={
97                                              'output': final.save_new(create_collection_record=False),
98                                              'success': True,
99                                              'progress':1.0
100                                          }).execute()
101 except Exception as e:
102     logging.exception("Unhandled exception")
103     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
104                                          body={
105                                              'output': None,
106                                              'success': False,
107                                              'progress':1.0
108                                          }).execute()