13306: Changes to arvados-cwl-runner code after running futurize --stage2
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 from past.builtins import basestring
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Crunch script integration for running arvados-cwl-runner (importing
7 # arvados_cwl module) inside a crunch job.
8 #
9 # This gets the job record, transforms the script parameters into a valid CWL
10 # input object, then executes the CWL runner to run the underlying workflow or
11 # tool.  When the workflow completes, record the output object in an output
12 # collection for this runner job.
13
14 import arvados
15 import arvados_cwl
16 import arvados.collection
17 import arvados.util
18 import cwltool.main
19 import logging
20 import os
21 import json
22 import argparse
23 import re
24 import functools
25
26 from arvados.api import OrderedJsonModel
27 from cwltool.process import shortname
28 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
29 from cwltool.load_tool import load_tool
30 from cwltool.errors import WorkflowException
31 from arvados_cwl.context import ArvRuntimeContext
32
33 from .fsaccess import CollectionFetcher, CollectionFsAccess
34
35 logger = logging.getLogger('arvados.cwl-runner')
36
37 def run():
38     # Timestamps are added by crunch-job, so don't print redundant timestamps.
39     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
40
41     # Print package versions
42     logger.info(arvados_cwl.versionstring())
43
44     api = arvados.api("v1")
45
46     arvados_cwl.add_arv_hints()
47
48     runner = None
49     try:
50         job_order_object = arvados.current_job()['script_parameters']
51         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
52
53         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
54
55         def keeppath(v):
56             if pdh_path.match(v):
57                 return "keep:%s" % v
58             else:
59                 return v
60
61         def keeppathObj(v):
62             if "location" in v:
63                 v["location"] = keeppath(v["location"])
64
65         for k,v in list(job_order_object.items()):
66             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
67                 job_order_object[k] = {
68                     "class": "File",
69                     "location": "keep:%s" % v
70                 }
71
72         adjustFileObjs(job_order_object, keeppathObj)
73         adjustDirObjs(job_order_object, keeppathObj)
74         normalizeFilesDirs(job_order_object)
75
76         output_name = None
77         output_tags = None
78         enable_reuse = True
79         on_error = "continue"
80         debug = False
81
82         if "arv:output_name" in job_order_object:
83             output_name = job_order_object["arv:output_name"]
84             del job_order_object["arv:output_name"]
85
86         if "arv:output_tags" in job_order_object:
87             output_tags = job_order_object["arv:output_tags"]
88             del job_order_object["arv:output_tags"]
89
90         if "arv:enable_reuse" in job_order_object:
91             enable_reuse = job_order_object["arv:enable_reuse"]
92             del job_order_object["arv:enable_reuse"]
93
94         if "arv:on_error" in job_order_object:
95             on_error = job_order_object["arv:on_error"]
96             del job_order_object["arv:on_error"]
97
98         if "arv:debug" in job_order_object:
99             debug = job_order_object["arv:debug"]
100             del job_order_object["arv:debug"]
101
102         arvargs = argparse.Namespace()
103         arvargs.work_api = "jobs"
104         arvargs.output_name = output_name
105         arvargs.output_tags = output_tags
106         arvargs.thread_count = 1
107         arvargs.collection_cache_size = None
108
109         runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
110             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
111                                           arvargs=arvargs)
112
113         make_fs_access = functools.partial(CollectionFsAccess,
114                                  collection_cache=runner.collection_cache)
115
116         t = load_tool(toolpath, runner.loadingContext)
117
118         if debug:
119             logger.setLevel(logging.DEBUG)
120             logging.getLogger('arvados').setLevel(logging.DEBUG)
121             logging.getLogger("cwltool").setLevel(logging.DEBUG)
122
123         args = ArvRuntimeContext(vars(arvargs))
124         args.project_uuid = arvados.current_job()["owner_uuid"]
125         args.enable_reuse = enable_reuse
126         args.on_error = on_error
127         args.submit = False
128         args.debug = debug
129         args.quiet = False
130         args.ignore_docker_for_reuse = False
131         args.basedir = os.getcwd()
132         args.name = None
133         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
134         args.make_fs_access = make_fs_access
135         args.trash_intermediate = False
136         args.intermediate_output_ttl = 0
137         args.priority = arvados_cwl.DEFAULT_PRIORITY
138         args.do_validate = True
139         args.disable_js_validation = False
140         args.tmp_outdir_prefix = "tmp"
141
142         runner.arv_executor(t, job_order_object, args, logger=logger)
143     except Exception as e:
144         if isinstance(e, WorkflowException):
145             logging.info("Workflow error %s", e)
146         else:
147             logging.exception("Unhandled exception")
148         if runner and runner.final_output_collection:
149             outputCollection = runner.final_output_collection.portable_data_hash()
150         else:
151             outputCollection = None
152         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
153                                              body={
154                                                  'output': outputCollection,
155                                                  'success': False,
156                                                  'progress':1.0
157                                              }).execute()