13558: add req_id as a tag to Rails.logger
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 import arvados
14 import arvados_cwl
15 import arvados.collection
16 import arvados.util
17 import cwltool.main
18 import logging
19 import os
20 import json
21 import argparse
22 import re
23 import functools
24
25 from arvados.api import OrderedJsonModel
26 from cwltool.process import shortname
27 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
28 from cwltool.load_tool import load_tool
29 from cwltool.errors import WorkflowException
30
31 from .fsaccess import CollectionFetcher, CollectionFsAccess
32
33 logger = logging.getLogger('arvados.cwl-runner')
34
35 def run():
36     # Timestamps are added by crunch-job, so don't print redundant timestamps.
37     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
38
39     # Print package versions
40     logger.info(arvados_cwl.versionstring())
41
42     api = arvados.api("v1")
43
44     arvados_cwl.add_arv_hints()
45
46     runner = None
47     try:
48         job_order_object = arvados.current_job()['script_parameters']
49         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
50
51         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
52
53         def keeppath(v):
54             if pdh_path.match(v):
55                 return "keep:%s" % v
56             else:
57                 return v
58
59         def keeppathObj(v):
60             if "location" in v:
61                 v["location"] = keeppath(v["location"])
62
63         for k,v in job_order_object.items():
64             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
65                 job_order_object[k] = {
66                     "class": "File",
67                     "location": "keep:%s" % v
68                 }
69
70         adjustFileObjs(job_order_object, keeppathObj)
71         adjustDirObjs(job_order_object, keeppathObj)
72         normalizeFilesDirs(job_order_object)
73
74         output_name = None
75         output_tags = None
76         enable_reuse = True
77         on_error = "continue"
78         debug = False
79
80         if "arv:output_name" in job_order_object:
81             output_name = job_order_object["arv:output_name"]
82             del job_order_object["arv:output_name"]
83
84         if "arv:output_tags" in job_order_object:
85             output_tags = job_order_object["arv:output_tags"]
86             del job_order_object["arv:output_tags"]
87
88         if "arv:enable_reuse" in job_order_object:
89             enable_reuse = job_order_object["arv:enable_reuse"]
90             del job_order_object["arv:enable_reuse"]
91
92         if "arv:on_error" in job_order_object:
93             on_error = job_order_object["arv:on_error"]
94             del job_order_object["arv:on_error"]
95
96         if "arv:debug" in job_order_object:
97             debug = job_order_object["arv:debug"]
98             del job_order_object["arv:debug"]
99
100         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
101             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
102                                           output_name=output_name, output_tags=output_tags)
103
104         make_fs_access = functools.partial(CollectionFsAccess,
105                                  collection_cache=runner.collection_cache)
106
107         t = load_tool(toolpath, runner.arv_make_tool,
108                       fetcher_constructor=functools.partial(CollectionFetcher,
109                                                   api_client=runner.api,
110                                                   fs_access=make_fs_access(""),
111                                                   num_retries=runner.num_retries))
112
113         if debug:
114             logger.setLevel(logging.DEBUG)
115             logging.getLogger('arvados').setLevel(logging.DEBUG)
116             logging.getLogger("cwltool").setLevel(logging.DEBUG)
117
118         args = argparse.Namespace()
119         args.project_uuid = arvados.current_job()["owner_uuid"]
120         args.enable_reuse = enable_reuse
121         args.on_error = on_error
122         args.submit = False
123         args.debug = debug
124         args.quiet = False
125         args.ignore_docker_for_reuse = False
126         args.basedir = os.getcwd()
127         args.name = None
128         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
129         args.make_fs_access = make_fs_access
130         args.trash_intermediate = False
131         args.intermediate_output_ttl = 0
132         args.priority = arvados_cwl.DEFAULT_PRIORITY
133         args.do_validate = True
134         args.disable_js_validation = False
135         args.tmp_outdir_prefix = "tmp"
136
137         runner.arv_executor(t, job_order_object, **vars(args))
138     except Exception as e:
139         if isinstance(e, WorkflowException):
140             logging.info("Workflow error %s", e)
141         else:
142             logging.exception("Unhandled exception")
143         if runner and runner.final_output_collection:
144             outputCollection = runner.final_output_collection.portable_data_hash()
145         else:
146             outputCollection = None
147         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
148                                              body={
149                                                  'output': outputCollection,
150                                                  'success': False,
151                                                  'progress':1.0
152                                              }).execute()