Merge branch '14087-federated-collection-by-pdh' refs #14087
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 import arvados
14 import arvados_cwl
15 import arvados.collection
16 import arvados.util
17 import cwltool.main
18 import logging
19 import os
20 import json
21 import argparse
22 import re
23 import functools
24
25 from arvados.api import OrderedJsonModel
26 from cwltool.process import shortname
27 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
28 from cwltool.load_tool import load_tool
29 from cwltool.errors import WorkflowException
30 from arvados_cwl.context import ArvRuntimeContext
31
32 from .fsaccess import CollectionFetcher, CollectionFsAccess
33
34 logger = logging.getLogger('arvados.cwl-runner')
35
36 def run():
37     # Timestamps are added by crunch-job, so don't print redundant timestamps.
38     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
39
40     # Print package versions
41     logger.info(arvados_cwl.versionstring())
42
43     api = arvados.api("v1")
44
45     arvados_cwl.add_arv_hints()
46
47     runner = None
48     try:
49         job_order_object = arvados.current_job()['script_parameters']
50         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
51
52         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
53
54         def keeppath(v):
55             if pdh_path.match(v):
56                 return "keep:%s" % v
57             else:
58                 return v
59
60         def keeppathObj(v):
61             if "location" in v:
62                 v["location"] = keeppath(v["location"])
63
64         for k,v in job_order_object.items():
65             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
66                 job_order_object[k] = {
67                     "class": "File",
68                     "location": "keep:%s" % v
69                 }
70
71         adjustFileObjs(job_order_object, keeppathObj)
72         adjustDirObjs(job_order_object, keeppathObj)
73         normalizeFilesDirs(job_order_object)
74
75         output_name = None
76         output_tags = None
77         enable_reuse = True
78         on_error = "continue"
79         debug = False
80
81         if "arv:output_name" in job_order_object:
82             output_name = job_order_object["arv:output_name"]
83             del job_order_object["arv:output_name"]
84
85         if "arv:output_tags" in job_order_object:
86             output_tags = job_order_object["arv:output_tags"]
87             del job_order_object["arv:output_tags"]
88
89         if "arv:enable_reuse" in job_order_object:
90             enable_reuse = job_order_object["arv:enable_reuse"]
91             del job_order_object["arv:enable_reuse"]
92
93         if "arv:on_error" in job_order_object:
94             on_error = job_order_object["arv:on_error"]
95             del job_order_object["arv:on_error"]
96
97         if "arv:debug" in job_order_object:
98             debug = job_order_object["arv:debug"]
99             del job_order_object["arv:debug"]
100
101         arvargs = argparse.Namespace()
102         arvargs.work_api = "jobs"
103         arvargs.output_name = output_name
104         arvargs.output_tags = output_tags
105         arvargs.thread_count = 1
106
107         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
108             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
109                                           arvargs=arvargs)
110
111         make_fs_access = functools.partial(CollectionFsAccess,
112                                  collection_cache=runner.collection_cache)
113
114         t = load_tool(toolpath, runner.loadingContext)
115
116         if debug:
117             logger.setLevel(logging.DEBUG)
118             logging.getLogger('arvados').setLevel(logging.DEBUG)
119             logging.getLogger("cwltool").setLevel(logging.DEBUG)
120
121         args = ArvRuntimeContext(vars(arvargs))
122         args.project_uuid = arvados.current_job()["owner_uuid"]
123         args.enable_reuse = enable_reuse
124         args.on_error = on_error
125         args.submit = False
126         args.debug = debug
127         args.quiet = False
128         args.ignore_docker_for_reuse = False
129         args.basedir = os.getcwd()
130         args.name = None
131         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
132         args.make_fs_access = make_fs_access
133         args.trash_intermediate = False
134         args.intermediate_output_ttl = 0
135         args.priority = arvados_cwl.DEFAULT_PRIORITY
136         args.do_validate = True
137         args.disable_js_validation = False
138         args.tmp_outdir_prefix = "tmp"
139
140         runner.arv_executor(t, job_order_object, args, logger=logger)
141     except Exception as e:
142         if isinstance(e, WorkflowException):
143             logging.info("Workflow error %s", e)
144         else:
145             logging.exception("Unhandled exception")
146         if runner and runner.final_output_collection:
147             outputCollection = runner.final_output_collection.portable_data_hash()
148         else:
149             outputCollection = None
150         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
151                                              body={
152                                                  'output': outputCollection,
153                                                  'success': False,
154                                                  'progress':1.0
155                                              }).execute()