Merge branch '14087-federated-collection-by-pdh' refs #14087
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2 # Copyright (C) The Arvados Authors. All rights reserved.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5
6 # Crunch script integration for running arvados-cwl-runner inside a crunch job.
7
8 import arvados_cwl
9 import sys
10
11 try:
12     # Use the crunch script defined in the arvados_cwl package.  This helps
13     # prevent the crunch script from going out of sync with the rest of the
14     # arvados_cwl package.
15     import arvados_cwl.crunch_script
16     arvados_cwl.crunch_script.run()
17     sys.exit()
18 except ImportError:
19     pass
20
21 # When running against an older arvados-cwl-runner package without
22 # arvados_cwl.crunch_script, fall back to the old code.
23
24
25 # This gets the job record, transforms the script parameters into a valid CWL
26 # input object, then executes the CWL runner to run the underlying workflow or
27 # tool.  When the workflow completes, record the output object in an output
28 # collection for this runner job.
29
30 import arvados
31 import arvados.collection
32 import arvados.util
33 import cwltool.main
34 import logging
35 import os
36 import json
37 import argparse
38 import re
39 import functools
40
41 from arvados.api import OrderedJsonModel
42 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
43 from cwltool.load_tool import load_tool
44
45 # Print package versions
46 logging.info(cwltool.main.versionstring())
47
48 api = arvados.api("v1")
49
50 try:
51     job_order_object = arvados.current_job()['script_parameters']
52
53     pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
54
55     def keeppath(v):
56         if pdh_path.match(v):
57             return "keep:%s" % v
58         else:
59             return v
60
61     def keeppathObj(v):
62         v["location"] = keeppath(v["location"])
63
64     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
65
66     for k,v in job_order_object.items():
67         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
68             job_order_object[k] = {
69                 "class": "File",
70                 "location": "keep:%s" % v
71             }
72
73     adjustFileObjs(job_order_object, keeppathObj)
74     adjustDirObjs(job_order_object, keeppathObj)
75     normalizeFilesDirs(job_order_object)
76     adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
77
78     output_name = None
79     if "arv:output_name" in job_order_object:
80         output_name = job_order_object["arv:output_name"]
81         del job_order_object["arv:output_name"]
82
83     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
84                                       output_name=output_name)
85
86     t = load_tool(job_order_object, runner.arv_make_tool)
87
88     args = argparse.Namespace()
89     args.project_uuid = arvados.current_job()["owner_uuid"]
90     args.enable_reuse = True
91     args.submit = False
92     args.debug = True
93     args.quiet = False
94     args.ignore_docker_for_reuse = False
95     args.basedir = os.getcwd()
96     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
97     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
98
99     if runner.final_output_collection:
100         outputCollection = runner.final_output_collection.portable_data_hash()
101     else:
102         outputCollection = None
103
104     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
105                                          body={
106                                              'output': outputCollection,
107                                              'success': True,
108                                              'progress':1.0
109                                          }).execute()
110 except Exception as e:
111     logging.exception("Unhandled exception")
112     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
113                                          body={
114                                              'output': None,
115                                              'success': False,
116                                              'progress':1.0
117                                          }).execute()