6264: Uploads files and Docker images, can almost run jobs.
[arvados.git] / sdk / python / arvados / commands / cwl_runner.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import cwltool.draft2tool
9 import cwltool.workflow
10 import cwltool.main
11 import threading
12 import cwltool.docker
13 import fnmatch
14 import logging
15 import re
16 import os
17 from cwltool.process import get_feature
18
19 logger = logging.getLogger('arvados.cwl-runner')
20 logger.setLevel(logging.INFO)
21
22 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
23     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
24         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
25
26     sp = dockerRequirement["dockerImageId"].split(":")
27     image_name = sp[0]
28     image_tag = sp[1] if len(sp) > 1 else None
29
30     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
31                                                             image_name=image_name,
32                                                             image_tag=image_tag)
33
34     if not images:
35         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
36         args = [image_name]
37         if image_tag:
38             args.append(image_tag)
39         arvados.commands.keepdocker.main(args)
40
41     return dockerRequirement["dockerImageId"]
42
43 class CollectionFsAccess(object):
44     def __init__(self):
45         self.collections = {}
46
47     def get_collection(self, path):
48         p = path.split("/")
49         if p[0] == "keep":
50             del p[0]
51         if p[0] not in self.collections:
52             self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
53         return (self.collections[p[0]], "/".join(p[1:]))
54
55     def _match(self, collection, patternsegments, parent):
56         ret = []
57         for i in collection:
58             if fnmatch.fnmatch(i, patternsegments[0]):
59                 cur = os.path.join(parent, i)
60                 if len(patternsegments) == 1:
61                     ret.append(cur)
62                 else:
63                     ret.extend(self._match(collection[i], patternsegments[1:], cur))
64         return ret
65
66     def glob(self, pattern):
67         collection, rest = self.get_path(pattern)
68         patternsegments = rest.split("/")
69         return self._match(collection, patternsegments, collection.manifest_locator())
70
71     def open(self, fn, mode):
72         collection, rest = self.get_path(fn)
73         return c.open(rest, mode)
74
75     def exists(self, fn):
76         collection, rest = self.get_path(fn)
77         return c.exists(rest)
78
79
80 class ArvadosJob(object):
81     def __init__(self, runner):
82         self.arvrunner = runner
83
84     def run(self, dry_run=False, pull_image=True, **kwargs):
85         script_parameters = {
86             "command": self.command_line
87         }
88         runtime_constraints = {}
89
90         if self.stdin:
91             script_parameters["task.stdin"] = self.stdin
92
93         if self.stdout:
94             script_parameters["task.stdout"] = self.stdout
95
96         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
97         if docker_req and kwargs.get("use_container") is not False:
98             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
99             runtime_constraints["arvados_sdk_version"] = "master"
100
101         response = self.arvrunner.api.jobs().create(body={
102             "script": "run-command",
103             "repository": "arvados",
104             "script_version": "master",
105             "script_parameters": script_parameters,
106             "runtime_constraints": runtime_constraints
107         }).execute()
108
109         self.arvrunner.jobs[response["uuid"]] = self
110
111     def done(self, record):
112         outputs = self.collect_outputs(record["output"], fs_access=CollectionFsAccess())
113
114         if record["state"] == "Complete":
115             processStatus = "success"
116         else:
117             processStatus = "permanentFail"
118
119         self.output_callback(outputs, processStatus)
120
121
122 class ArvPathMapper(cwltool.pathmapper.PathMapper):
123     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
124         self._pathmap = {}
125         uploadfiles = []
126
127         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/(.*)')
128
129         for src in referenced_files:
130             ab = src if os.path.isabs(src) else os.path.join(basedir, src)
131             st = arvados.commands.run.statfile("", ab)
132             if kwargs.get("conformance_test"):
133                 self._pathmap[src] = (src, ab)
134             elif isinstance(st, arvados.commands.run.UploadFile):
135                 uploadfiles.append((src, ab, st))
136             elif isinstance(st, arvados.commands.run.ArvFile):
137                 self._pathmap[src] = (ab, st.fn)
138             elif isinstance(st, basestring) and pdh_path.match(st):
139                 self._pathmap[src] = (st, "$(file %s)" % st)
140             else:
141                 raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid", st)
142
143         if uploadfiles:
144             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles], arvrunner.api, dry_run=kwargs.get("dry_run"), num_retries=3)
145
146         for src, ab, st in uploadfiles:
147             self._pathmap[src] = (ab, st.fn)
148
149
150 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
151     def __init__(self, arvrunner, toolpath_object, **kwargs):
152         super(ArvadosCommandTool, self).__init__(toolpath_object, **kwargs)
153         self.arvrunner = arvrunner
154
155     def makeJobRunner(self):
156         return ArvadosJob(self.arvrunner)
157
158     def makePathMapper(self, reffiles, input_basedir, **kwargs):
159         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
160
161
162 class ArvCwlRunner(object):
163     def __init__(self, api_client):
164         self.api = api_client
165         self.jobs = {}
166         self.lock = threading.Lock()
167         self.cond = threading.Condition(self.lock)
168         self.final_output = None
169
170     def arvMakeTool(self, toolpath_object, **kwargs):
171         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
172             return ArvadosCommandTool(self, toolpath_object, **kwargs)
173         else:
174             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
175
176     def output_callback(out, processStatus):
177         if processStatus == "success":
178             _logger.info("Overall job status is %s", processStatus)
179         else:
180             _logger.warn("Overall job status is %s", processStatus)
181         self.final_output = out
182
183     def on_message(self, event):
184         if "object_uuid" in event:
185                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
186                     if ev["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
187                         try:
188                             self.cond.acquire()
189                             self.jobs[event["object_uuid"]].done(ev["properties"]["new_attributes"])
190                             self.cond.notify()
191                         finally:
192                             self.cond.release()
193
194     def arvExecutor(self, t, job_order, input_basedir, **kwargs):
195         events = arvados.events.subscribe(arvados.api('v1'), [["object_kind", "=", "arvados#job"]], self.on_message)
196
197         if kwargs.get("conformance_test"):
198             return cwltool.main.single_job_executor(t, job_order, input_basedir, **kwargs)
199         else:
200             jobiter = t.job(job_order,
201                             input_basedir,
202                             self.output_callback,
203                             **kwargs)
204
205             for r in jobiter:
206                 if r:
207                     with self.lock:
208                         r.run(**kwargs)
209                 else:
210                     if self.jobs:
211                         try:
212                             self.cond.acquire()
213                             self.cond.wait()
214                         finally:
215                             self.cond.release()
216                     else:
217                         raise cwltool.workflow.WorkflowException("Workflow deadlocked.")
218
219             while self.jobs:
220                 try:
221                     self.cond.acquire()
222                     self.cond.wait()
223                 finally:
224                     self.cond.release()
225
226             events.close()
227
228             if self.final_output is None:
229                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
230
231             return self.final_output
232
233
234 def main(args, stdout, stderr, api_client=None):
235     runner = ArvCwlRunner(api_client=arvados.api('v1'))
236     args.append("--leave-outputs")
237     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool)