7593: Don't upload the same files more than once. Fix handling "./" in glob paths.
[arvados.git] / sdk / cwl / arvados_cwl / __init__.py
1 #!/usr/bin/env python
2
3 import argparse
4 import arvados
5 import arvados.events
6 import arvados.commands.keepdocker
7 import arvados.commands.run
8 import cwltool.draft2tool
9 import cwltool.workflow
10 import cwltool.main
11 import threading
12 import cwltool.docker
13 import fnmatch
14 import logging
15 import re
16 import os
17 import copy
18 from cwltool.process import get_feature
19
20 logger = logging.getLogger('arvados.cwl-runner')
21 logger.setLevel(logging.INFO)
22
23 def arv_docker_get_image(api_client, dockerRequirement, pull_image):
24     if "dockerImageId" not in dockerRequirement and "dockerPull" in dockerRequirement:
25         dockerRequirement["dockerImageId"] = dockerRequirement["dockerPull"]
26
27     sp = dockerRequirement["dockerImageId"].split(":")
28     image_name = sp[0]
29     image_tag = sp[1] if len(sp) > 1 else None
30
31     images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
32                                                             image_name=image_name,
33                                                             image_tag=image_tag)
34
35     if not images:
36         imageId = cwltool.docker.get_image(dockerRequirement, pull_image)
37         args = [image_name]
38         if image_tag:
39             args.append(image_tag)
40         arvados.commands.keepdocker.main(args)
41
42     return dockerRequirement["dockerImageId"]
43
44 class CollectionFsAccess(cwltool.process.StdFsAccess):
45     def __init__(self, basedir):
46         self.collections = {}
47         self.basedir = basedir
48
49     def get_collection(self, path):
50         p = path.split("/")
51         if arvados.util.keep_locator_pattern.match(p[0]):
52             if p[0] not in self.collections:
53                 self.collections[p[0]] = arvados.collection.CollectionReader(p[0])
54             return (self.collections[p[0]], "/".join(p[1:]))
55         else:
56             return (None, path)
57
58     def _match(self, collection, patternsegments, parent):
59         ret = []
60         if len(patternsegments) == 0:
61             return ret
62         for filename in collection:
63             if fnmatch.fnmatch(filename, patternsegments[0]):
64                 cur = os.path.join(parent, filename)
65                 if len(patternsegments) == 1:
66                     ret.append(cur)
67                 else:
68                     ret.extend(self._match(collection[filename], patternsegments[1:], cur))
69             elif patternsegments[0] == '.':
70                 ret.extend(self._match(collection, patternsegments[1:], parent))
71         return ret
72
73     def glob(self, pattern):
74         collection, rest = self.get_collection(pattern)
75         patternsegments = rest.split("/")
76         return self._match(collection, patternsegments, collection.manifest_locator())
77
78     def open(self, fn, mode):
79         collection, rest = self.get_collection(fn)
80         if collection:
81             return collection.open(rest, mode)
82         else:
83             return open(self._abs(fn), mode)
84
85     def exists(self, fn):
86         collection, rest = self.get_collection(fn)
87         if collection:
88             return collection.exists(rest)
89         else:
90             return os.path.exists(self._abs(fn))
91
92 class ArvadosJob(object):
93     def __init__(self, runner):
94         self.arvrunner = runner
95         self.running = False
96
97     def run(self, dry_run=False, pull_image=True, **kwargs):
98         script_parameters = {
99             "command": self.command_line
100         }
101         runtime_constraints = {}
102
103         if self.generatefiles:
104             vwd = arvados.collection.Collection()
105             script_parameters["task.vwd"] = {}
106             for t in self.generatefiles:
107                 if isinstance(self.generatefiles[t], dict):
108                     src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"][13:])
109                     vwd.copy(rest, t, source_collection=src)
110                 else:
111                     with vwd.open(t, "w") as f:
112                         f.write(self.generatefiles[t])
113             vwd.save_new()
114             for t in self.generatefiles:
115                 script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
116
117         script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
118         if self.environment:
119             script_parameters["task.env"].update(self.environment)
120
121         if self.stdin:
122             script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
123
124         if self.stdout:
125             script_parameters["task.stdout"] = self.stdout
126
127         (docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
128         if docker_req and kwargs.get("use_container") is not False:
129             runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image)
130
131         response = self.arvrunner.api.jobs().create(body={
132             "script": "crunchrunner",
133             "repository": "peteramstutz/cr",
134             "script_version": "master",
135             "script_parameters": {"tasks": [script_parameters]},
136             "runtime_constraints": runtime_constraints
137         }, find_or_create=kwargs.get("enable_reuse", True)).execute()
138
139         self.arvrunner.jobs[response["uuid"]] = self
140
141         logger.info("Job %s is %s", response["uuid"], response["state"])
142
143         if response["state"] in ("Complete", "Failed", "Cancelled"):
144             self.done(response)
145
146     def done(self, record):
147         try:
148             if record["state"] == "Complete":
149                 processStatus = "success"
150             else:
151                 processStatus = "permanentFail"
152
153             try:
154                 outputs = {}
155                 outputs = self.collect_outputs(record["output"])
156             except Exception as e:
157                 logger.exception("Got exception while collecting job outputs:")
158                 processStatus = "permanentFail"
159
160             self.output_callback(outputs, processStatus)
161         finally:
162             del self.arvrunner.jobs[record["uuid"]]
163
164 class ArvPathMapper(cwltool.pathmapper.PathMapper):
165     def __init__(self, arvrunner, referenced_files, basedir, **kwargs):
166         self._pathmap = copy.copy(arvrunner.uploaded)
167         uploadfiles = []
168
169         pdh_path = re.compile(r'^[0-9a-f]{32}\+\d+/.+')
170
171         for src in referenced_files:
172             if isinstance(src, basestring) and pdh_path.match(src):
173                 self._pathmap[src] = (src, "$(task.keep)/%s" % src)
174             if src in self._pathmap:
175                 pass
176             else:
177                 ab = cwltool.pathmapper.abspath(src, basedir)
178                 st = arvados.commands.run.statfile("", ab)
179                 if kwargs.get("conformance_test"):
180                     self._pathmap[src] = (src, ab)
181                 elif isinstance(st, arvados.commands.run.UploadFile):
182                     uploadfiles.append((src, ab, st))
183                 elif isinstance(st, arvados.commands.run.ArvFile):
184                     self._pathmap[src] = (ab, st.fn)
185                 else:
186                     raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
187
188         if uploadfiles:
189             arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
190                                              arvrunner.api,
191                                              dry_run=kwargs.get("dry_run"),
192                                              num_retries=3,
193                                              fnPattern="$(task.keep)/%s/%s")
194
195         for src, ab, st in uploadfiles:
196             arvrunner.uploaded[src] = (ab, st.fn)
197             self._pathmap[src] = (ab, st.fn)
198
199
200
201 class ArvadosCommandTool(cwltool.draft2tool.CommandLineTool):
202     def __init__(self, arvrunner, toolpath_object, **kwargs):
203         super(ArvadosCommandTool, self).__init__(toolpath_object, outdir="$(task.outdir)", tmpdir="$(task.tmpdir)", **kwargs)
204         self.arvrunner = arvrunner
205
206     def makeJobRunner(self):
207         return ArvadosJob(self.arvrunner)
208
209     def makePathMapper(self, reffiles, input_basedir, **kwargs):
210         return ArvPathMapper(self.arvrunner, reffiles, input_basedir, **kwargs)
211
212
213 class ArvCwlRunner(object):
214     def __init__(self, api_client):
215         self.api = api_client
216         self.jobs = {}
217         self.lock = threading.Lock()
218         self.cond = threading.Condition(self.lock)
219         self.final_output = None
220         self.uploaded = {}
221
222     def arvMakeTool(self, toolpath_object, **kwargs):
223         if "class" in toolpath_object and toolpath_object["class"] == "CommandLineTool":
224             return ArvadosCommandTool(self, toolpath_object, **kwargs)
225         else:
226             return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
227
228     def output_callback(self, out, processStatus):
229         if processStatus == "success":
230             logger.info("Overall job status is %s", processStatus)
231         else:
232             logger.warn("Overall job status is %s", processStatus)
233         self.final_output = out
234
235     def on_message(self, event):
236         if "object_uuid" in event:
237                 if event["object_uuid"] in self.jobs and event["event_type"] == "update":
238                     if event["properties"]["new_attributes"]["state"] == "Running" and self.jobs[event["object_uuid"]].running is False:
239                         logger.info("Job %s is Running", event["object_uuid"])
240                         with self.lock:
241                             self.jobs[event["object_uuid"]].running = True
242                     elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled"):
243                         logger.info("Job %s is %s", event["object_uuid"], event["properties"]["new_attributes"]["state"])
244                         try:
245                             self.cond.acquire()
246                             self.jobs[event["object_uuid"]].done(event["properties"]["new_attributes"])
247                             self.cond.notify()
248                         finally:
249                             self.cond.release()
250
251     def arvExecutor(self, tool, job_order, input_basedir, args, **kwargs):
252         events = arvados.events.subscribe(arvados.api('v1'), [["object_uuid", "is_a", "arvados#job"]], self.on_message)
253
254         self.fs_access = CollectionFsAccess(input_basedir)
255
256         kwargs["fs_access"] = self.fs_access
257         kwargs["enable_reuse"] = args.enable_reuse
258
259         if kwargs.get("conformance_test"):
260             return cwltool.main.single_job_executor(tool, job_order, input_basedir, args, **kwargs)
261         else:
262             jobiter = tool.job(job_order,
263                             input_basedir,
264                             self.output_callback,
265                             **kwargs)
266
267             for runnable in jobiter:
268                 if runnable:
269                     with self.lock:
270                         runnable.run(**kwargs)
271                 else:
272                     if self.jobs:
273                         try:
274                             self.cond.acquire()
275                             self.cond.wait()
276                         finally:
277                             self.cond.release()
278                     else:
279                         logger.error("Workflow cannot make any more progress.")
280                         break
281
282             while self.jobs:
283                 try:
284                     self.cond.acquire()
285                     self.cond.wait()
286                 finally:
287                     self.cond.release()
288
289             events.close()
290
291             if self.final_output is None:
292                 raise cwltool.workflow.WorkflowException("Workflow did not return a result.")
293
294             return self.final_output
295
296
297 def main(args, stdout, stderr, api_client=None):
298     runner = ArvCwlRunner(api_client=arvados.api('v1'))
299     args.insert(0, "--leave-outputs")
300     parser = cwltool.main.arg_parser()
301     exgroup = parser.add_mutually_exclusive_group()
302     exgroup.add_argument("--enable-reuse", action="store_true",
303                         default=False, dest="enable_reuse",
304                         help="")
305     exgroup.add_argument("--disable-reuse", action="store_false",
306                         default=False, dest="enable_reuse",
307                         help="")
308
309     return cwltool.main.main(args, executor=runner.arvExecutor, makeTool=runner.arvMakeTool, parser=parser)