Merge branch 'master' into 9998-unsigned_manifest
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Crunch script integration for running arvados-cwl-runner (importing
2 # arvados_cwl module) inside a crunch job.
3 #
4 # This gets the job record, transforms the script parameters into a valid CWL
5 # input object, then executes the CWL runner to run the underlying workflow or
6 # tool.  When the workflow completes, record the output object in an output
7 # collection for this runner job.
8
9 import arvados
10 import arvados_cwl
11 import arvados.collection
12 import arvados.util
13 import cwltool.main
14 import logging
15 import os
16 import json
17 import argparse
18 import re
19 import functools
20
21 from arvados.api import OrderedJsonModel
22 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
23 from cwltool.load_tool import load_tool
24 from cwltool.errors import WorkflowException
25
26 logger = logging.getLogger('arvados.cwl-runner')
27
28 def run():
29     # Timestamps are added by crunch-job, so don't print redundant timestamps.
30     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
31
32     # Print package versions
33     logger.info(arvados_cwl.versionstring())
34
35     api = arvados.api("v1")
36
37     arvados_cwl.add_arv_hints()
38
39     runner = None
40     try:
41         job_order_object = arvados.current_job()['script_parameters']
42         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
43
44         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
45
46         def keeppath(v):
47             if pdh_path.match(v):
48                 return "keep:%s" % v
49             else:
50                 return v
51
52         def keeppathObj(v):
53             v["location"] = keeppath(v["location"])
54
55         for k,v in job_order_object.items():
56             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
57                 job_order_object[k] = {
58                     "class": "File",
59                     "location": "keep:%s" % v
60                 }
61
62         adjustFileObjs(job_order_object, keeppathObj)
63         adjustDirObjs(job_order_object, keeppathObj)
64         normalizeFilesDirs(job_order_object)
65         adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
66
67         output_name = None
68         output_tags = None
69         enable_reuse = True
70         on_error = "continue"
71         if "arv:output_name" in job_order_object:
72             output_name = job_order_object["arv:output_name"]
73             del job_order_object["arv:output_name"]
74
75         if "arv:output_tags" in job_order_object:
76             output_tags = job_order_object["arv:output_tags"]
77             del job_order_object["arv:output_tags"]
78
79         if "arv:enable_reuse" in job_order_object:
80             enable_reuse = job_order_object["arv:enable_reuse"]
81             del job_order_object["arv:enable_reuse"]
82
83         if "arv:on_error" in job_order_object:
84             on_error = job_order_object["arv:on_error"]
85             del job_order_object["arv:on_error"]
86
87         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
88                                           output_name=output_name, output_tags=output_tags)
89
90         t = load_tool(toolpath, runner.arv_make_tool)
91
92         args = argparse.Namespace()
93         args.project_uuid = arvados.current_job()["owner_uuid"]
94         args.enable_reuse = enable_reuse
95         args.on_error = on_error
96         args.submit = False
97         args.debug = False
98         args.quiet = False
99         args.ignore_docker_for_reuse = False
100         args.basedir = os.getcwd()
101         args.name = None
102         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
103         runner.arv_executor(t, job_order_object, **vars(args))
104     except Exception as e:
105         if isinstance(e, WorkflowException):
106             logging.info("Workflow error %s", e)
107         else:
108             logging.exception("Unhandled exception")
109         if runner and runner.final_output_collection:
110             outputCollection = runner.final_output_collection.portable_data_hash()
111         else:
112             outputCollection = None
113         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
114                                              body={
115                                                  'output': outputCollection,
116                                                  'success': False,
117                                                  'progress':1.0
118                                              }).execute()