11349: Report wishlist size in status["nodes_wish"].
[arvados.git] / crunch_scripts / cwl-runner
1 #!/usr/bin/env python
2
3 # Crunch script integration for running arvados-cwl-runner inside a crunch job.
4
5 import arvados_cwl
6 import sys
7
8 try:
9     # Use the crunch script defined in the arvados_cwl package.  This helps
10     # prevent the crunch script from going out of sync with the rest of the
11     # arvados_cwl package.
12     import arvados_cwl.crunch_script
13     arvados_cwl.crunch_script.run()
14     sys.exit()
15 except ImportError:
16     pass
17
18 # When running against an older arvados-cwl-runner package without
19 # arvados_cwl.crunch_script, fall back to the old code.
20
21
22 # This gets the job record, transforms the script parameters into a valid CWL
23 # input object, then executes the CWL runner to run the underlying workflow or
24 # tool.  When the workflow completes, record the output object in an output
25 # collection for this runner job.
26
27 import arvados
28 import arvados.collection
29 import arvados.util
30 import cwltool.main
31 import logging
32 import os
33 import json
34 import argparse
35 import re
36 import functools
37
38 from arvados.api import OrderedJsonModel
39 from cwltool.process import shortname, adjustFileObjs, adjustDirObjs, getListing, normalizeFilesDirs
40 from cwltool.load_tool import load_tool
41
42 # Print package versions
43 logging.info(cwltool.main.versionstring())
44
45 api = arvados.api("v1")
46
47 try:
48     job_order_object = arvados.current_job()['script_parameters']
49
50     pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+(/.+)?$')
51
52     def keeppath(v):
53         if pdh_path.match(v):
54             return "keep:%s" % v
55         else:
56             return v
57
58     def keeppathObj(v):
59         v["location"] = keeppath(v["location"])
60
61     job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
62
63     for k,v in job_order_object.items():
64         if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
65             job_order_object[k] = {
66                 "class": "File",
67                 "location": "keep:%s" % v
68             }
69
70     adjustFileObjs(job_order_object, keeppathObj)
71     adjustDirObjs(job_order_object, keeppathObj)
72     normalizeFilesDirs(job_order_object)
73     adjustDirObjs(job_order_object, functools.partial(getListing, arvados_cwl.fsaccess.CollectionFsAccess("", api_client=api)))
74
75     output_name = None
76     if "arv:output_name" in job_order_object:
77         output_name = job_order_object["arv:output_name"]
78         del job_order_object["arv:output_name"]
79
80     runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
81                                       output_name=output_name)
82
83     t = load_tool(job_order_object, runner.arv_make_tool)
84
85     args = argparse.Namespace()
86     args.project_uuid = arvados.current_job()["owner_uuid"]
87     args.enable_reuse = True
88     args.submit = False
89     args.debug = True
90     args.quiet = False
91     args.ignore_docker_for_reuse = False
92     args.basedir = os.getcwd()
93     args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
94     outputObj = runner.arv_executor(t, job_order_object, **vars(args))
95
96     if runner.final_output_collection:
97         outputCollection = runner.final_output_collection.portable_data_hash()
98     else:
99         outputCollection = None
100
101     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
102                                          body={
103                                              'output': outputCollection,
104                                              'success': True,
105                                              'progress':1.0
106                                          }).execute()
107 except Exception as e:
108     logging.exception("Unhandled exception")
109     api.job_tasks().update(uuid=arvados.current_task()['uuid'],
110                                          body={
111                                              'output': None,
112                                              'success': False,
113                                              'progress':1.0
114                                          }).execute()