8654: Fix keeppath and add ensure_unique_name to upload.
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados_cwl
5 import arvados.collection
6 import arvados.util
7 from cwltool.process import shortname
8 import cwltool.main
9 import logging
10 import os
11 import json
12 import argparse
13 from arvados.api import OrderedJsonModel
14 from cwltool.process import adjustFiles
15
16 api = arvados.api("v1")
17
18 try:
19     job_order_object = arvados.current_job()['script_parameters']
20
21     print job_order_object
22
23     def keeppath(v):
24         if arvados.util.keep_locator_pattern.match(v):
25             return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
26
27     adjustFiles(job_order_object, keeppath)
28
29     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
30
31     t = cwltool.main.load_tool(job_order_object, False, True, runner.arvMakeTool, True)
32
33     np = argparse.Namespace()
34     np.project_uuid = arvados.current_job()["owner_uuid"]
35     np.enable_reuse = True
36     outputObj = runner.arvExecutor(t, job_order_object, "", np)
37
38     files = {}
39     def capture(path):
40         sp = path.split("/")
41         col = sp[0][5:]
42         if col not in files:
43             files[col] = set()
44         files[col].add("/".join(sp[1:]))
45         return path
46
47     adjustFiles(outputObj, capture)
48
49     final = arvados.collection.Collection()
50
51     for k,v in files.iteritems():
52         with arvados.collection.Collection(k) as c:
53             for f in c:
54                 final.copy(f, f, c, True)
55
56     def makeRelative(path):
57         return "/".join(path.split("/")[1:])
58
59     adjustFiles(outputObj, makeRelative)
60
61     with final.open("cwl.output.json", "w") as f:
62         json.dump(outputObj, f, indent=4)
63
64     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
65                                          body={
66                                              'output': final.save_new(create_collection_record=False),
67                                              'success': True,
68                                              'progress':1.0
69                                          }).execute()
70 except Exception as e:
71     logging.exception("Unhandled exception")
72     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
73                                          body={
74                                              'output': None,
75                                              'success': False,
76                                              'progress':1.0
77                                          }).execute()