Merge branch '8784-dir-listings'
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 # Crunch script integration for running arvados-cwl-runner (importing
6 # arvados_cwl module) inside a crunch job.
7 #
8 # This gets the job record, transforms the script parameters into a valid CWL
9 # input object, then executes the CWL runner to run the underlying workflow or
10 # tool.  When the workflow completes, record the output object in an output
11 # collection for this runner job.
12
13 import arvados
14 import arvados_cwl
15 import arvados.collection
16 import arvados.util
17 import cwltool.main
18 import logging
19 import os
20 import json
21 import argparse
22 import re
23 import functools
24
25 from arvados.api import OrderedJsonModel
26 from cwltool.process import shortname
27 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
28 from cwltool.load_tool import load_tool
29 from cwltool.errors import WorkflowException
30
31 from .fsaccess import CollectionFetcher, CollectionFsAccess
32
33 logger = logging.getLogger('arvados.cwl-runner')
34
35 def run():
36     # Timestamps are added by crunch-job, so don't print redundant timestamps.
37     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
38
39     # Print package versions
40     logger.info(arvados_cwl.versionstring())
41
42     api = arvados.api("v1")
43
44     arvados_cwl.add_arv_hints()
45
46     runner = None
47     try:
48         job_order_object = arvados.current_job()['script_parameters']
49         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
50
51         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
52
53         def keeppath(v):
54             if pdh_path.match(v):
55                 return "keep:%s" % v
56             else:
57                 return v
58
59         def keeppathObj(v):
60             if "location" in v:
61                 v["location"] = keeppath(v["location"])
62
63         for k,v in job_order_object.items():
64             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
65                 job_order_object[k] = {
66                     "class": "File",
67                     "location": "keep:%s" % v
68                 }
69
70         adjustFileObjs(job_order_object, keeppathObj)
71         adjustDirObjs(job_order_object, keeppathObj)
72         normalizeFilesDirs(job_order_object)
73
74         output_name = None
75         output_tags = None
76         enable_reuse = True
77         on_error = "continue"
78         if "arv:output_name" in job_order_object:
79             output_name = job_order_object["arv:output_name"]
80             del job_order_object["arv:output_name"]
81
82         if "arv:output_tags" in job_order_object:
83             output_tags = job_order_object["arv:output_tags"]
84             del job_order_object["arv:output_tags"]
85
86         if "arv:enable_reuse" in job_order_object:
87             enable_reuse = job_order_object["arv:enable_reuse"]
88             del job_order_object["arv:enable_reuse"]
89
90         if "arv:on_error" in job_order_object:
91             on_error = job_order_object["arv:on_error"]
92             del job_order_object["arv:on_error"]
93
94         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
95                                           output_name=output_name, output_tags=output_tags)
96
97         make_fs_access = functools.partial(CollectionFsAccess,
98                                  collection_cache=runner.collection_cache)
99
100         t = load_tool(toolpath, runner.arv_make_tool,
101                       fetcher_constructor=functools.partial(CollectionFetcher,
102                                                   api_client=runner.api,
103                                                   fs_access=make_fs_access(""),
104                                                   num_retries=runner.num_retries))
105
106         args = argparse.Namespace()
107         args.project_uuid = arvados.current_job()["owner_uuid"]
108         args.enable_reuse = enable_reuse
109         args.on_error = on_error
110         args.submit = False
111         args.debug = False
112         args.quiet = False
113         args.ignore_docker_for_reuse = False
114         args.basedir = os.getcwd()
115         args.name = None
116         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
117         args.make_fs_access = make_fs_access
118         args.trash_intermediate = False
119         args.intermediate_output_ttl = 0
120
121         runner.arv_executor(t, job_order_object, **vars(args))
122     except Exception as e:
123         if isinstance(e, WorkflowException):
124             logging.info("Workflow error %s", e)
125         else:
126             logging.exception("Unhandled exception")
127         if runner and runner.final_output_collection:
128             outputCollection = runner.final_output_collection.portable_data_hash()
129         else:
130             outputCollection = None
131         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
132                                              body={
133                                                  'output': outputCollection,
134                                                  'success': False,
135                                                  'progress':1.0
136                                              }).execute()