13140: Bump cwltool version
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 import arvados
14 import arvados_cwl
15 import arvados.collection
16 import arvados.util
17 import cwltool.main
18 import logging
19 import os
20 import json
21 import argparse
22 import re
23 import functools
24
25 from arvados.api import OrderedJsonModel
26 from cwltool.process import shortname
27 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
28 from cwltool.load_tool import load_tool
29 from cwltool.errors import WorkflowException
30
31 from .fsaccess import CollectionFetcher, CollectionFsAccess
32
33 logger = logging.getLogger('arvados.cwl-runner')
34
35 def run():
36     # Timestamps are added by crunch-job, so don't print redundant timestamps.
37     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
38
39     # Print package versions
40     logger.info(arvados_cwl.versionstring())
41
42     api = arvados.api("v1")
43
44     arvados_cwl.add_arv_hints()
45
46     runner = None
47     try:
48         job_order_object = arvados.current_job()['script_parameters']
49         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
50
51         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
52
53         def keeppath(v):
54             if pdh_path.match(v):
55                 return "keep:%s" % v
56             else:
57                 return v
58
59         def keeppathObj(v):
60             if "location" in v:
61                 v["location"] = keeppath(v["location"])
62
63         for k,v in job_order_object.items():
64             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
65                 job_order_object[k] = {
66                     "class": "File",
67                     "location": "keep:%s" % v
68                 }
69
70         adjustFileObjs(job_order_object, keeppathObj)
71         adjustDirObjs(job_order_object, keeppathObj)
72         normalizeFilesDirs(job_order_object)
73
74         output_name = None
75         output_tags = None
76         enable_reuse = True
77         on_error = "continue"
78         debug = False
79
80         if "arv:output_name" in job_order_object:
81             output_name = job_order_object["arv:output_name"]
82             del job_order_object["arv:output_name"]
83
84         if "arv:output_tags" in job_order_object:
85             output_tags = job_order_object["arv:output_tags"]
86             del job_order_object["arv:output_tags"]
87
88         if "arv:enable_reuse" in job_order_object:
89             enable_reuse = job_order_object["arv:enable_reuse"]
90             del job_order_object["arv:enable_reuse"]
91
92         if "arv:on_error" in job_order_object:
93             on_error = job_order_object["arv:on_error"]
94             del job_order_object["arv:on_error"]
95
96         if "arv:debug" in job_order_object:
97             debug = job_order_object["arv:debug"]
98             del job_order_object["arv:debug"]
99
100         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
101                                           output_name=output_name, output_tags=output_tags)
102
103         make_fs_access = functools.partial(CollectionFsAccess,
104                                  collection_cache=runner.collection_cache)
105
106         t = load_tool(toolpath, runner.arv_make_tool,
107                       fetcher_constructor=functools.partial(CollectionFetcher,
108                                                   api_client=runner.api,
109                                                   fs_access=make_fs_access(""),
110                                                   num_retries=runner.num_retries))
111
112         if debug:
113             logger.setLevel(logging.DEBUG)
114             logging.getLogger('arvados').setLevel(logging.DEBUG)
115             logging.getLogger("cwltool").setLevel(logging.DEBUG)
116
117         args = argparse.Namespace()
118         args.project_uuid = arvados.current_job()["owner_uuid"]
119         args.enable_reuse = enable_reuse
120         args.on_error = on_error
121         args.submit = False
122         args.debug = debug
123         args.quiet = False
124         args.ignore_docker_for_reuse = False
125         args.basedir = os.getcwd()
126         args.name = None
127         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
128         args.make_fs_access = make_fs_access
129         args.trash_intermediate = False
130         args.intermediate_output_ttl = 0
131
132         runner.arv_executor(t, job_order_object, **vars(args))
133     except Exception as e:
134         if isinstance(e, WorkflowException):
135             logging.info("Workflow error %s", e)
136         else:
137             logging.exception("Unhandled exception")
138         if runner and runner.final_output_collection:
139             outputCollection = runner.final_output_collection.portable_data_hash()
140         else:
141             outputCollection = None
142         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
143                                              body={
144                                                  'output': outputCollection,
145                                                  'success': False,
146                                                  'progress':1.0
147                                              }).execute()