8654: Add runner job to pipeline instance.
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_cwl
5 import arvados.collection
6 import arvados.util
7 from cwltool.process import shortname
8 import cwltool.main
9 import logging
10 import os
11 import json
12 import argparse
13 from arvados.api import OrderedJsonModel
14 from cwltool.process import adjustFiles
15
16 api = arvados.api("v1")
17
18 try:
19     job_order_object = arvados.current_job()['script_parameters']
20
21     print job_order_object
22
23     def keeppath(v):
24         if arvados.util.keep_locator_pattern.match(v):
25             return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
26
27     job_order_object["cwl:tool"] = keeppath(job_order_object["cwl:tool"])
28
29     adjustFiles(job_order_object, keeppath)
30
31     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
32
33     t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
34
35     np = argparse.Namespace()
36     np.project_uuid = arvados.current_job()["owner_uuid"]
37     np.enable_reuse = True
38     outputObj = runner.arvExecutor(t, job_order_object, "", np, cwl_runner_job=arvados.current_job())
39
40     files = {}
41     def capture(path):
42         sp = path.split("/")
43         col = sp[0][5:]
44         if col not in files:
45             files[col] = set()
46         files[col].add("/".join(sp[1:]))
47         return path
48
49     adjustFiles(outputObj, capture)
50
51     final = arvados.collection.Collection()
52
53     for k,v in files.iteritems():
54         with arvados.collection.Collection(k) as c:
55             for f in c:
56                 final.copy(f, f, c, True)
57
58     def makeRelative(path):
59         return "/".join(path.split("/")[1:])
60
61     adjustFiles(outputObj, makeRelative)
62
63     with final.open("cwl.output.json", "w") as f:
64         json.dump(outputObj, f, indent=4)
65
66     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
67                                          body={
68                                              'output': final.save_new(create_collection_record=False),
69                                              'success': True,
70                                              'progress':1.0
71                                          }).execute()
72 except Exception as e:
73     logging.exception("Unhandled exception")
74     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
75                                          body={
76                                              'output': None,
77                                              'success': False,
78                                              'progress':1.0
79                                          }).execute()