Crunch script doesn't fail in file/directory literals that don't have a 'location...
[arvados.git] / sdk / cwl / arvados_cwl / crunch_script.py
1 # Crunch script integration for running arvados-cwl-runner (importing
2 # arvados_cwl module) inside a crunch job.
3 #
4 # This gets the job record, transforms the script parameters into a valid CWL
5 # input object, then executes the CWL runner to run the underlying workflow or
6 # tool.  When the workflow completes, record the output object in an output
7 # collection for this runner job.
8
9 import arvados
10 import arvados_cwl
11 import arvados.collection
12 import arvados.util
13 import cwltool.main
14 import logging
15 import os
16 import json
17 import argparse
18 import re
19 import functools
20
21 from arvados.api import OrderedJsonModel
22 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, normalizeFilesDirs
23 from cwltool.load_tool import load_tool
24 from cwltool.errors import WorkflowException
25
26 from .fsaccess import CollectionFetcher, CollectionFsAccess
27
28 logger = logging.getLogger('arvados.cwl-runner')
29
30 def run():
31     # Timestamps are added by crunch-job, so don't print redundant timestamps.
32     arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
33
34     # Print package versions
35     logger.info(arvados_cwl.versionstring())
36
37     api = arvados.api("v1")
38
39     arvados_cwl.add_arv_hints()
40
41     runner = None
42     try:
43         job_order_object = arvados.current_job()['script_parameters']
44         toolpath = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object.pop("cwl:tool"))
45
46         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
47
48         def keeppath(v):
49             if pdh_path.match(v):
50                 return "keep:%s" % v
51             else:
52                 return v
53
54         def keeppathObj(v):
55             if "location" in v:
56                 v["location"] = keeppath(v["location"])
57
58         for k,v in job_order_object.items():
59             if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
60                 job_order_object[k] = {
61                     "class": "File",
62                     "location": "keep:%s" % v
63                 }
64
65         adjustFileObjs(job_order_object, keeppathObj)
66         adjustDirObjs(job_order_object, keeppathObj)
67         normalizeFilesDirs(job_order_object)
68
69         output_name = None
70         output_tags = None
71         enable_reuse = True
72         on_error = "continue"
73         if "arv:output_name" in job_order_object:
74             output_name = job_order_object["arv:output_name"]
75             del job_order_object["arv:output_name"]
76
77         if "arv:output_tags" in job_order_object:
78             output_tags = job_order_object["arv:output_tags"]
79             del job_order_object["arv:output_tags"]
80
81         if "arv:enable_reuse" in job_order_object:
82             enable_reuse = job_order_object["arv:enable_reuse"]
83             del job_order_object["arv:enable_reuse"]
84
85         if "arv:on_error" in job_order_object:
86             on_error = job_order_object["arv:on_error"]
87             del job_order_object["arv:on_error"]
88
89         runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
90                                           output_name=output_name, output_tags=output_tags)
91
92         make_fs_access = functools.partial(CollectionFsAccess,
93                                  collection_cache=runner.collection_cache)
94
95         t = load_tool(toolpath, runner.arv_make_tool,
96                       fetcher_constructor=functools.partial(CollectionFetcher,
97                                                   api_client=runner.api,
98                                                   fs_access=make_fs_access(""),
99                                                   num_retries=runner.num_retries))
100
101         args = argparse.Namespace()
102         args.project_uuid = arvados.current_job()["owner_uuid"]
103         args.enable_reuse = enable_reuse
104         args.on_error = on_error
105         args.submit = False
106         args.debug = False
107         args.quiet = False
108         args.ignore_docker_for_reuse = False
109         args.basedir = os.getcwd()
110         args.name = None
111         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
112         args.make_fs_access = make_fs_access
113
114         runner.arv_executor(t, job_order_object, **vars(args))
115     except Exception as e:
116         if isinstance(e, WorkflowException):
117             logging.info("Workflow error %s", e)
118         else:
119             logging.exception("Unhandled exception")
120         if runner and runner.final_output_collection:
121             outputCollection = runner.final_output_collection.portable_data_hash()
122         else:
123             outputCollection = None
124         api.job_tasks().update(uuid=arvados.current_task()['uuid'],
125                                              body={
126                                                  'output': outputCollection,
127                                                  'success': False,
128                                                  'progress':1.0
129                                              }).execute()