Add 'sdk/java-v2/' from commit '55f103e336ca9fb8bf1720d2ef4ee8dd4e221118'
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 from past.builtins import basestring
14 from future.utils import viewitems
15
16 import arvados
17 import arvados_cwl
18 import arvados.collection
19 import arvados.util
20 import cwltool.main
21 import logging
22 import os
23 import json
24 import argparse
25 import re
26 import functools
27
28 from arvados.api import OrderedJsonModel
29 from cwltool.process import shortname
30 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
31 from cwltool.load_tool import load_tool
32 from cwltool.errors import WorkflowException
33 from arvados_cwl.context import ArvRuntimeContext
34
35 from .fsaccess import CollectionFetcher, CollectionFsAccess
36
37 logger = logging.getLogger('arvados.cwl-runner')
38
39 def run():
40     # Timestamps are added by crunch-job, so don't print redundant timestamps.
41     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
42
43     # Print package versions
44     logger.info(arvados_cwl.versionstring())
45
46     api = arvados.api("v1")
47
48     arvados_cwl.add_arv_hints()
49
50     runner = None
51     try:
52         job_order_object = arvados.current_job()['script_parameters']
53         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
54
55         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
56
57         def keeppath(v):
58             if pdh_path.match(v):
59                 return "keep:%s" % v
60             else:
61                 return v
62
63         def keeppathObj(v):
64             if "location" in v:
65                 v["location"] = keeppath(v["location"])
66
67         for k,v in viewitems(job_order_object):
68             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
69                 job_order_object[k] = {
70                     "class": "File",
71                     "location": "keep:%s" % v
72                 }
73
74         adjustFileObjs(job_order_object, keeppathObj)
75         adjustDirObjs(job_order_object, keeppathObj)
76         normalizeFilesDirs(job_order_object)
77
78         output_name = None
79         output_tags = None
80         enable_reuse = True
81         on_error = "continue"
82         debug = False
83
84         if "arv:output_name" in job_order_object:
85             output_name = job_order_object["arv:output_name"]
86             del job_order_object["arv:output_name"]
87
88         if "arv:output_tags" in job_order_object:
89             output_tags = job_order_object["arv:output_tags"]
90             del job_order_object["arv:output_tags"]
91
92         if "arv:enable_reuse" in job_order_object:
93             enable_reuse = job_order_object["arv:enable_reuse"]
94             del job_order_object["arv:enable_reuse"]
95
96         if "arv:on_error" in job_order_object:
97             on_error = job_order_object["arv:on_error"]
98             del job_order_object["arv:on_error"]
99
100         if "arv:debug" in job_order_object:
101             debug = job_order_object["arv:debug"]
102             del job_order_object["arv:debug"]
103
104         arvargs = argparse.Namespace()
105         arvargs.work_api = "jobs"
106         arvargs.output_name = output_name
107         arvargs.output_tags = output_tags
108         arvargs.thread_count = 1
109         arvargs.collection_cache_size = None
110
111         runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
112             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
113                                           arvargs=arvargs)
114
115         make_fs_access = functools.partial(CollectionFsAccess,
116                                  collection_cache=runner.collection_cache)
117
118         t = load_tool(toolpath, runner.loadingContext)
119
120         if debug:
121             logger.setLevel(logging.DEBUG)
122             logging.getLogger('arvados').setLevel(logging.DEBUG)
123             logging.getLogger("cwltool").setLevel(logging.DEBUG)
124
125         args = ArvRuntimeContext(vars(arvargs))
126         args.project_uuid = arvados.current_job()["owner_uuid"]
127         args.enable_reuse = enable_reuse
128         args.on_error = on_error
129         args.submit = False
130         args.debug = debug
131         args.quiet = False
132         args.ignore_docker_for_reuse = False
133         args.basedir = os.getcwd()
134         args.name = None
135         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
136         args.make_fs_access = make_fs_access
137         args.trash_intermediate = False
138         args.intermediate_output_ttl = 0
139         args.priority = arvados_cwl.DEFAULT_PRIORITY
140         args.do_validate = True
141         args.disable_js_validation = False
142         args.tmp_outdir_prefix = "tmp"
143
144         runner.arv_executor(t, job_order_object, args, logger=logger)
145     except Exception as e:
146         if isinstance(e, WorkflowException):
147             logging.info("Workflow error %s", e)
148         else:
149             logging.exception("Unhandled exception")
150         if runner and runner.final_output_collection:
151             outputCollection = runner.final_output_collection.portable_data_hash()
152         else:
153             outputCollection = None
154         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
155                                              body={
156                                                  'output': outputCollection,
157                                                  'success': False,
158                                                  'progress':1.0
159                                              }).execute()