9701: Merge branch '9463-change-arvput-use-collection-class' into 9701-collection...
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 # Crunch script integration for running arvados-cwl-runner (importing
4 # arvados_cwl module) inside a crunch job.
5 #
6 # This gets the job record, transforms the script parameters into a valid CWL
7 # input object, then executes the CWL runner to run the underlying workflow or
8 # tool.  When the workflow completes, record the output object in an output
9 # collection for this runner job.
10
11 import arvados
12 import arvados_cwl
13 import arvados.collection
14 import arvados.util
15 import cwltool.main
16 import logging
17 import os
18 import json
19 import argparse
20 import re
21 import functools
22
23 from arvados.api import OrderedJsonModel
24 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
25 from cwltool.load_tool import load_tool
26
27 # Print package versions
28 logging.info(cwltool.main.versionstring())
29
30 api = arvados.api("v1")
31
32 try:
33     job_order_object = arvados.current_job()['script_parameters']
34
35     pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
36
37     def keeppath(v):
38         if pdh_path.match(v):
39             return "keep:%s" % v
40         else:
41             return v
42
43     def keeppathObj(v):
44         v["location"] = keeppath(v["location"])
45
46     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
47
48     for k,v in job_order_object.items():
49         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
50             job_order_object[k] = {
51                 "class": "File",
52                 "location": "keep:%s" % v
53             }
54
55     adjustFileObjs(job_order_object, keeppathObj)
56     adjustDirObjs(job_order_object, keeppathObj)
57     normalizeFilesDirs(job_order_object)
58     adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
59
60     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
61
62     t = load_tool(job_order_object, runner.arv_make_tool)
63
64     args = argparse.Namespace()
65     args.project_uuid = arvados.current_job()["owner_uuid"]
66     args.enable_reuse = True
67     args.submit = False
68     args.debug = True
69     args.quiet = False
70     args.ignore_docker_for_reuse = False
71     args.basedir = os.getcwd()
72     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
73     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
74
75     files = {}
76     def capture(fileobj):
77         path = fileobj["location"]
78         sp = path.split("/")
79         col = sp[0][5:]
80         if col not in files:
81             files[col] = set()
82         files[col].add("/".join(sp[1:]))
83         fileobj["location"] = path
84
85     adjustFileObjs(outputObj, capture)
86
87     final = arvados.collection.Collection()
88
89     for k,v in files.iteritems():
90         with arvados.collection.Collection(k) as c:
91             for f in c:
92                 final.copy(f, f, c, True)
93
94     def makeRelative(fileobj):
95         fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
96
97     adjustFileObjs(outputObj, makeRelative)
98
99     with final.open("cwl.output.json", "w") as f:
100         json.dump(outputObj, f, indent=4)
101
102     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
103                                          body={
104                                              'output': final.save_new(create_collection_record=False),
105                                              'success': True,
106                                              'progress':1.0
107                                          }).execute()
108 except Exception as e:
109     logging.exception("Unhandled exception")
110     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
111                                          body={
112                                              'output': None,
113                                              'success': False,
114                                              'progress':1.0
115                                          }).execute()