Merge branch '11788-arvput-dir-references'
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Crunch script integration for running arvados-cwl-runner (importing
2 # arvados_cwl module) inside a crunch job.
3 #
4 # This gets the job record, transforms the script parameters into a valid CWL
5 # input object, then executes the CWL runner to run the underlying workflow or
6 # tool.  When the workflow completes, record the output object in an output
7 # collection for this runner job.
8
9 import arvados
10 import arvados_cwl
11 import arvados.collection
12 import arvados.util
13 import cwltool.main
14 import logging
15 import os
16 import json
17 import argparse
18 import re
19 import functools
20
21 from arvados.api import OrderedJsonModel
22 from cwltool.process import shortname
23 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, normalizeFilesDirs
24 from cwltool.load_tool import load_tool
25 from cwltool.errors import WorkflowException
26
27 from .fsaccess import CollectionFetcher, CollectionFsAccess
28
29 logger = logging.getLogger('arvados.cwl-runner')
30
31 def run():
32     # Timestamps are added by crunch-job, so don't print redundant timestamps.
33     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
34
35     # Print package versions
36     logger.info(arvados_cwl.versionstring())
37
38     api = arvados.api("v1")
39
40     arvados_cwl.add_arv_hints()
41
42     runner = None
43     try:
44         job_order_object = arvados.current_job()['script_parameters']
45         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
46
47         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
48
49         def keeppath(v):
50             if pdh_path.match(v):
51                 return "keep:%s" % v
52             else:
53                 return v
54
55         def keeppathObj(v):
56             if "location" in v:
57                 v["location"] = keeppath(v["location"])
58
59         for k,v in job_order_object.items():
60             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
61                 job_order_object[k] = {
62                     "class": "File",
63                     "location": "keep:%s" % v
64                 }
65
66         adjustFileObjs(job_order_object, keeppathObj)
67         adjustDirObjs(job_order_object, keeppathObj)
68         normalizeFilesDirs(job_order_object)
69
70         output_name = None
71         output_tags = None
72         enable_reuse = True
73         on_error = "continue"
74         if "arv:output_name" in job_order_object:
75             output_name = job_order_object["arv:output_name"]
76             del job_order_object["arv:output_name"]
77
78         if "arv:output_tags" in job_order_object:
79             output_tags = job_order_object["arv:output_tags"]
80             del job_order_object["arv:output_tags"]
81
82         if "arv:enable_reuse" in job_order_object:
83             enable_reuse = job_order_object["arv:enable_reuse"]
84             del job_order_object["arv:enable_reuse"]
85
86         if "arv:on_error" in job_order_object:
87             on_error = job_order_object["arv:on_error"]
88             del job_order_object["arv:on_error"]
89
90         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
91                                           output_name=output_name, output_tags=output_tags)
92
93         make_fs_access = functools.partial(CollectionFsAccess,
94                                  collection_cache=runner.collection_cache)
95
96         t = load_tool(toolpath, runner.arv_make_tool,
97                       fetcher_constructor=functools.partial(CollectionFetcher,
98                                                   api_client=runner.api,
99                                                   fs_access=make_fs_access(""),
100                                                   num_retries=runner.num_retries))
101
102         args = argparse.Namespace()
103         args.project_uuid = arvados.current_job()["owner_uuid"]
104         args.enable_reuse = enable_reuse
105         args.on_error = on_error
106         args.submit = False
107         args.debug = False
108         args.quiet = False
109         args.ignore_docker_for_reuse = False
110         args.basedir = os.getcwd()
111         args.name = None
112         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
113         args.make_fs_access = make_fs_access
114         args.trash_intermediate = False
115         args.intermediate_output_ttl = 0
116
117         runner.arv_executor(t, job_order_object, **vars(args))
118     except Exception as e:
119         if isinstance(e, WorkflowException):
120             logging.info("Workflow error %s", e)
121         else:
122             logging.exception("Unhandled exception")
123         if runner and runner.final_output_collection:
124             outputCollection = runner.final_output_collection.portable_data_hash()
125         else:
126             outputCollection = None
127         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
128                                              body={
129                                                  'output': outputCollection,
130                                                  'success': False,
131                                                  'progress':1.0
132                                              }).execute()