Merge branch '10200-cwl-crunch-script' closes #10200
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Crunch script integration for running arvados-cwl-runner (importing
2 # arvados_cwl module) inside a crunch job.
3 #
4 # This gets the job record, transforms the script parameters into a valid CWL
5 # input object, then executes the CWL runner to run the underlying workflow or
6 # tool.  When the workflow completes, record the output object in an output
7 # collection for this runner job.
8
9 import arvados
10 import arvados_cwl
11 import arvados.collection
12 import arvados.util
13 import cwltool.main
14 import logging
15 import os
16 import json
17 import argparse
18 import re
19 import functools
20
21 from arvados.api import OrderedJsonModel
22 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
23 from cwltool.load_tool import load_tool
24
25 logger = logging.getLogger('arvados.cwl-runner')
26
27 def run():
28     # Print package versions
29     logger.info(arvados_cwl.versionstring())
30
31     api = arvados.api("v1")
32
33     arvados_cwl.add_arv_hints()
34
35     try:
36         job_order_object = arvados.current_job()['script_parameters']
37
38         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
39
40         def keeppath(v):
41             if pdh_path.match(v):
42                 return "keep:%s" % v
43             else:
44                 return v
45
46         def keeppathObj(v):
47             v["location"] = keeppath(v["location"])
48
49         job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
50
51         for k,v in job_order_object.items():
52             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
53                 job_order_object[k] = {
54                     "class": "File",
55                     "location": "keep:%s" % v
56                 }
57
58         adjustFileObjs(job_order_object, keeppathObj)
59         adjustDirObjs(job_order_object, keeppathObj)
60         normalizeFilesDirs(job_order_object)
61         adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
62
63         output_name = None
64         if "arv:output_name" in job_order_object:
65             output_name = job_order_object["arv:output_name"]
66             del job_order_object["arv:output_name"]
67
68         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
69                                           output_name=output_name)
70
71         t = load_tool(job_order_object, runner.arv_make_tool)
72
73         args = argparse.Namespace()
74         args.project_uuid = arvados.current_job()["owner_uuid"]
75         args.enable_reuse = True
76         args.submit = False
77         args.debug = True
78         args.quiet = False
79         args.ignore_docker_for_reuse = False
80         args.basedir = os.getcwd()
81         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
82         outputObj = runner.arv_executor(t, job_order_object, **vars(args))
83
84         if runner.final_output_collection:
85             outputCollection = runner.final_output_collection.portable_data_hash()
86         else:
87             outputCollection = None
88
89         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
90                                              body={
91                                                  'output': outputCollection,
92                                                  'success': True,
93                                                  'progress':1.0
94                                              }).execute()
95     except Exception as e:
96         logging.exception("Unhandled exception")
97         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
98                                              body={
99                                                  'output': None,
100                                                  'success': False,
101                                                  'progress':1.0
102                                              }).execute()