14325: Move magic number to const.
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 import arvados
14 import arvados_cwl
15 import arvados.collection
16 import arvados.util
17 import cwltool.main
18 import logging
19 import os
20 import json
21 import argparse
22 import re
23 import functools
24
25 from arvados.api import OrderedJsonModel
26 from cwltool.process import shortname
27 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
28 from cwltool.load_tool import load_tool
29 from cwltool.errors import WorkflowException
30 from arvados_cwl.context import ArvRuntimeContext
31
32 from .fsaccess import CollectionFetcher, CollectionFsAccess
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def run():
37     # Timestamps are added by crunch-job, so don't print redundant timestamps.
38     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
39
40     # Print package versions
41     logger.info(arvados_cwl.versionstring())
42
43     api = arvados.api("v1")
44
45     arvados_cwl.add_arv_hints()
46
47     runner = None
48     try:
49         job_order_object = arvados.current_job()['script_parameters']
50         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
51
52         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
53
54         def keeppath(v):
55             if pdh_path.match(v):
56                 return "keep:%s" % v
57             else:
58                 return v
59
60         def keeppathObj(v):
61             if "location" in v:
62                 v["location"] = keeppath(v["location"])
63
64         for k,v in job_order_object.items():
65             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
66                 job_order_object[k] = {
67                     "class": "File",
68                     "location": "keep:%s" % v
69                 }
70
71         adjustFileObjs(job_order_object, keeppathObj)
72         adjustDirObjs(job_order_object, keeppathObj)
73         normalizeFilesDirs(job_order_object)
74
75         output_name = None
76         output_tags = None
77         enable_reuse = True
78         on_error = "continue"
79         debug = False
80
81         if "arv:output_name" in job_order_object:
82             output_name = job_order_object["arv:output_name"]
83             del job_order_object["arv:output_name"]
84
85         if "arv:output_tags" in job_order_object:
86             output_tags = job_order_object["arv:output_tags"]
87             del job_order_object["arv:output_tags"]
88
89         if "arv:enable_reuse" in job_order_object:
90             enable_reuse = job_order_object["arv:enable_reuse"]
91             del job_order_object["arv:enable_reuse"]
92
93         if "arv:on_error" in job_order_object:
94             on_error = job_order_object["arv:on_error"]
95             del job_order_object["arv:on_error"]
96
97         if "arv:debug" in job_order_object:
98             debug = job_order_object["arv:debug"]
99             del job_order_object["arv:debug"]
100
101         arvargs = argparse.Namespace()
102         arvargs.work_api = "jobs"
103         arvargs.output_name = output_name
104         arvargs.output_tags = output_tags
105         arvargs.thread_count = 1
106         arvargs.collection_cache_size = None
107
108         runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
109             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
110                                           arvargs=arvargs)
111
112         make_fs_access = functools.partial(CollectionFsAccess,
113                                  collection_cache=runner.collection_cache)
114
115         t = load_tool(toolpath, runner.loadingContext)
116
117         if debug:
118             logger.setLevel(logging.DEBUG)
119             logging.getLogger('arvados').setLevel(logging.DEBUG)
120             logging.getLogger("cwltool").setLevel(logging.DEBUG)
121
122         args = ArvRuntimeContext(vars(arvargs))
123         args.project_uuid = arvados.current_job()["owner_uuid"]
124         args.enable_reuse = enable_reuse
125         args.on_error = on_error
126         args.submit = False
127         args.debug = debug
128         args.quiet = False
129         args.ignore_docker_for_reuse = False
130         args.basedir = os.getcwd()
131         args.name = None
132         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
133         args.make_fs_access = make_fs_access
134         args.trash_intermediate = False
135         args.intermediate_output_ttl = 0
136         args.priority = arvados_cwl.DEFAULT_PRIORITY
137         args.do_validate = True
138         args.disable_js_validation = False
139         args.tmp_outdir_prefix = "tmp"
140
141         runner.arv_executor(t, job_order_object, args, logger=logger)
142     except Exception as e:
143         if isinstance(e, WorkflowException):
144             logging.info("Workflow error %s", e)
145         else:
146             logging.exception("Unhandled exception")
147         if runner and runner.final_output_collection:
148             outputCollection = runner.final_output_collection.portable_data_hash()
149         else:
150             outputCollection = None
151         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
152                                              body={
153                                                  'output': outputCollection,
154                                                  'success': False,
155                                                  'progress':1.0
156                                              }).execute()