6264: First pass complete, ready for testing.
[arvados.git] / sdk / python / arvados / commands / cwl_runner.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import cwltool.draft2tool
9 import cwltool.workflow
10 import cwltool.main
11 import threading
12 import cwltool.docker
13 import fnmatch
14
15 logger = logging.getLogger('arvados.cwl-runner')
16 logger.setLevel(logging.INFO)
17
18 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
19     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
20         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
21
22     sp = dockerRequirement["dockerImageId"].split(":")
23     image_name = sp[0]
24     image_tag = sp[1] if len(sp) > 1 else None
25
26     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
27                                                             image_name=image_name,
28                                                             image_tag=image_tag)
29
30     if not images:
31         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
32         arvados.commands.keepdocker.main(dockerRequirement["dockerImageId"])
33
34     return dockerRequirement["dockerImageId"]
35
36 class CollectionFsAccess(object):
37     def __init__(self):
38         self.collections = {}
39
40     def get_collection(self, path):
41         p = path.split("/")
42         if p[0] == "keep":
43             del p[0]
44         if p[0] not in self.collections:
45             self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
46         return (self.collections[p[0]], "/".join(p[1:]))
47
48     def _match(self, collection, patternsegments, parent):
49         ret = []
50         for i in collection:
51             if fnmatch.fnmatch(i, patternsegments[0]):
52                 cur = os.path.join(parent, i)
53                 if len(patternsegments) == 1:
54                     ret.append(cur)
55                 else:
56                     ret.extend(self._match(collection[i], patternsegments[1:], cur))
57         return ret
58
59     def glob(self, pattern):
60         collection, rest = self.get_path(pattern)
61         patternsegments = rest.split("/")
62         return self._match(collection, patternsegments, collection.manifest_locator())
63
64     def open(self, fn, mode):
65         collection, rest = self.get_path(fn)
66         return c.open(rest, mode)
67
68     def exists(self, fn):
69         collection, rest = self.get_path(fn)
70         return c.exists(rest)
71
72
73 class ArvadosJob(object):
74     def __init__(self, runner):
75         self.arvrunner = runner
76
77     def run(self, dry_run=False, pull_image=True, **kwargs):
78         script_parameters = {
79             "command": self.command_line
80         }
81         runtime_constraints = {}
82
83         if self.stdin:
84             command["stdin"] = self.stdin
85
86         if self.stdout:
87             command["stdout"] = self.stdout
88
89         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
90         if docker_req and kwargs.get("use_container") is not False:
91             runtime_constraints["docker_image"] = arv_docker_get(docker_req, pull_image)
92             runtime_constraints["arvados_sdk_version"] = "master"
93
94         response = self.arvrunner.api.jobs().create(body={
95             "script": "run-command",
96             "repository": "arvados",
97             "script_version": "master",
98             "script_parameters": script_parameters,
99             "runtime_constraints": runtime_constraints
100         }).execute()
101
102         self.arvrunner.jobs[response["uuid"]] = self
103
104     def done(self, record):
105         outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
106
107         if record["state"] == "Complete":
108             processStatus = "success"
109         else:
110             processStatus = "permanentFail"
111
112         self.output_callback(outputs, processStatus)
113
114
115 class ArvPathMapper(cwltool.pathmapper.PathMapper):
116     def __init__(self, arvrunner, referenced_files, basedir):
117         self._pathmap = {}
118         uploadfiles = []
119
120         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
121
122         for src in referenced_files:
123             ab = src if os.path.isabs(src) else os.path.join(basedir, src)
124             st = arvados.commands.run.statfile("", ab)
125             if isinstance(st, arvados.commands.run.UploadFile):
126                 uploadfiles.append((src, ab, st))
127             elif isinstance(st, arvados.commands.run.ArvFile):
128                 self._pathmap[src] = (ab, st.fn)
129             elif isinstance(st, basestring) and pdh_path.match(st):
130                 self._pathmap[src] = (st, "$(file %s") % st)
131             else:
132                 workflow.WorkflowException("Input file path '%s' is invalid", st)
133
134         arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api)
135
136         for src, ab, st in uploadfiles:
137             self._pathmap[src] = (ab, st.fn)
138
139
140 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
141     def __init__(self, arvrunner, toolpath_object, **kwargs):
142         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
143         self.arvrunner = arvrunner
144
145     def makeJobRunner(self):
146         return ArvadosJob(self.arvrunner)
147
148     def makePathMapper(self, reffiles, input_basedir):
149         return ArvadosCommandTool.ArvPathMapper(self.arvrunner, reffiles, input_basedir)
150
151
152 class ArvCwlRunner(object):
153     def __init__(self, api_client):
154         self.api = api_client
155         self.jobs = {}
156         self.lock = threading.Lock()
157         self.cond = threading.Condition(self.lock)
158         self.final_output = None
159
160     def arvMakeTool(self, toolpath_object, **kwargs):
161         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
162             return ArvadosCommandTool(self, toolpath_object, **kwargs)
163         else:
164             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
165
166     def output_callback(out, processStatus):
167         if processStatus == "success":
168             _logger.info("Overall job status is %s", processStatus)
169         else:
170             _logger.warn("Overall job status is %s", processStatus)
171         self.final_output = out
172
173     def on_message(self, event):
174         if "object_uuid" in event:
175                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
176                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
177                     try:
178                         self.cond.acquire()
179                         self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
180                         self.cond.notify()
181                     finally:
182                         self.cond.release()
183
184     def arvExecutor(self, t, job_order, input_basedir, **kwargs):
185         events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
186
187         if kwargs.get("conformance_test"):
188             return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
189         else:
190             jobiter = t.job(job_order,
191                             input_basedir,
192                             self.output_callback,
193                             **kwargs)
194
195             for r in jobiter:
196                 if r:
197                     with self.lock:
198                         r.run(**kwargs)
199                 else:
200                     if self.jobs:
201                         try:
202                             self.cond.acquire()
203                             self.cond.wait()
204                         finally:
205                             self.cond.release()
206                     else:
207                         raise workflow.WorkflowException("Workflow deadlocked.")
208
209             while self.jobs:
210                 try:
211                     self.cond.acquire()
212                     self.cond.wait()
213                 finally:
214                     self.cond.release()
215
216             events.close()
217
218             if self.final_output is None:
219                 raise workflow.WorkflowException("Workflow did not return a result.")
220
221             return self.final_output
222
223
224 def main(args, stdout, stderr, api_client=None):
225     runner = ArvCwlRunner(api_client=arvados.api('v1'))
226     args.append("--leave-outputs")
227     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool)