Merge branch 'master' into 8654-arv-jobs-cwl-runner
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_cwl
5 import arvados.collection
6 import arvados.util
7 from cwltool.process import shortname
8 import cwltool.main
9 import logging
10 import os
11 import json
12 import argparse
13 from arvados.api import OrderedJsonModel
14 from cwltool.process import adjustFiles
15
16 api = arvados.api("v1")
17
18 try:
19     job_order_object = arvados.current_job()['script_parameters']
20
21     print job_order_object
22
23     for k,v in job_order_object.items():
24         if arvados.util.keep_locator_pattern.match(v):
25             job_order_object[k] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
26
27     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
28
29     t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
30
31     np = argparse.Namespace()
32     np.project_uuid = arvados.current_job()["owner_uuid"]
33     np.enable_reuse = True
34     outputObj = runner.arvExecutor(t, job_order_object, "", np)
35
36     files = {}
37     def capture(path):
38         sp = path.split("/")
39         col = sp[0][5:]
40         if col not in files:
41             files[col] = set()
42         files[col].add("/".join(sp[1:]))
43         return path
44
45     adjustFiles(outputObj, capture)
46
47     final = arvados.collection.Collection()
48
49     for k,v in files.iteritems():
50         with arvados.collection.Collection(k) as c:
51             for f in c:
52                 final.copy(f, f, c, True)
53
54     def makeRelative(path):
55         return "/".join(path.split("/")[1:])
56
57     adjustFiles(outputObj, makeRelative)
58
59     with final.open("cwl.output.json", "w") as f:
60         json.dump(outputObj, f, indent=4)
61
62     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
63                                          body={
64                                              'output': final.save_new(create_collection_record=False),
65                                              'success': True,
66                                              'progress':1.0
67                                          }).execute()
68 except Exception as e:
69     logging.exception("Unhandled exception")
70     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
71                                          body={
72                                              'output': None,
73                                              'success': False,
74                                              'progress':1.0
75                                          }).execute()