Merge branch 'master' into 4823-python-sdk-writable-collection-api
[arvados.git] / sdk / python / arvados / commands / run.py
1 #!/usr/bin/env python
2
3 import arvados
4 import arvados.commands.ws as ws
5 import argparse
6 import json
7 import re
8 import os
9 import stat
10 import put
11 import time
12 import subprocess
13 import logging
14 import arvados.commands._util as arv_cmd
15
16 logger = logging.getLogger('arvados.arv-run')
17
18 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
19 arvrun_parser.add_argument('--dry-run', action="store_true", help="Print out the pipeline that would be submitted and exit")
20 arvrun_parser.add_argument('--local', action="store_true", help="Run locally using arv-run-pipeline-instance")
21 arvrun_parser.add_argument('--docker-image', type=str, default="arvados/jobs", help="Docker image to use, default arvados/jobs")
22 arvrun_parser.add_argument('--ignore-rcode', action="store_true", help="Commands that return non-zero return codes should not be considered failed.")
23 arvrun_parser.add_argument('--no-reuse', action="store_true", help="Do not reuse past jobs.")
24 arvrun_parser.add_argument('--no-wait', action="store_true", help="Do not wait and display logs after submitting command, just exit.")
25 arvrun_parser.add_argument('--project-uuid', type=str, help="Parent project of the pipeline")
26 arvrun_parser.add_argument('--git-dir', type=str, default="", help="Git repository passed to arv-crunch-job when using --local")
27 arvrun_parser.add_argument('--repository', type=str, default="arvados", help="repository field of component, default 'arvados'")
28 arvrun_parser.add_argument('--script-version', type=str, default="master", help="script_version field of component, default 'master'")
29 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
30
31 class ArvFile(object):
32     def __init__(self, prefix, fn):
33         self.prefix = prefix
34         self.fn = fn
35
36 class UploadFile(ArvFile):
37     pass
38
39 # Determine if a file is in a collection, and return a tuple consisting of the
40 # portable data hash and the path relative to the root of the collection.
41 # Return None if the path isn't with an arv-mount collection or there was is error.
42 def is_in_collection(root, branch):
43     try:
44         if root == "/":
45             return (None, None)
46         fn = os.path.join(root, ".arvados#collection")
47         if os.path.exists(fn):
48             with file(fn, 'r') as f:
49                 c = json.load(f)
50             return (c["portable_data_hash"], branch)
51         else:
52             sp = os.path.split(root)
53             return is_in_collection(sp[0], os.path.join(sp[1], branch))
54     except IOError, OSError:
55         return (None, None)
56
57 # Determine the project to place the output of this command by searching upward
58 # for arv-mount psuedofile indicating the project.  If the cwd isn't within
59 # an arv-mount project or there is an error, return current_user.
60 def determine_project(root, current_user):
61     try:
62         if root == "/":
63             return current_user
64         fn = os.path.join(root, ".arvados#project")
65         if os.path.exists(fn):
66             with file(fn, 'r') as f:
67                 c = json.load(f)
68             if 'writable_by' in c and current_user in c['writable_by']:
69                 return c["uuid"]
70             else:
71                 return current_user
72         else:
73             sp = os.path.split(root)
74             return determine_project(sp[0], current_user)
75     except IOError, OSError:
76         return current_user
77
78 # Determine if string corresponds to a file, and if that file is part of a
79 # arv-mounted collection or only local to the machine.  Returns one of
80 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
81 # be uploaded to a collection), or simply returns prefix+fn (which yields the
82 # original parameter string).
83 def statfile(prefix, fn):
84     absfn = os.path.abspath(fn)
85     if os.path.exists(absfn):
86         st = os.stat(absfn)
87         if stat.S_ISREG(st.st_mode):
88             sp = os.path.split(absfn)
89             (pdh, branch) = is_in_collection(sp[0], sp[1])
90             if pdh:
91                 return ArvFile(prefix, "$(file %s/%s)" % (pdh, branch))
92             else:
93                 # trim leading '/' for path prefix test later
94                 return UploadFile(prefix, absfn[1:])
95         if stat.S_ISDIR(st.st_mode):
96             sp = os.path.split(absfn)
97             (pdh, branch) = is_in_collection(sp[0], sp[1])
98             if pdh:
99                 return ArvFile(prefix, "$(dir %s/%s/)" % (pdh, branch))
100
101     return prefix+fn
102
103 def main(arguments=None):
104     args = arvrun_parser.parse_args(arguments)
105
106     if len(args.args) == 0:
107         arvrun_parser.print_help()
108         return
109
110     starting_args = args.args
111
112     reading_into = 2
113
114     # Parse the command arguments into 'slots'.
115     # All words following '>' are output arguments and are collected into slots[0].
116     # All words following '<' are input arguments and are collected into slots[1].
117     # slots[2..] store the parameters of each command in the pipeline.
118     #
119     # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
120     # will be parsed into:
121     #   [['output.txt'],
122     #    ['input1', 'input2', 'input3'],
123     #    ['foo', 'arg1', 'arg2'],
124     #    ['bar', 'arg3', 'arg4']]
125     slots = [[], [], []]
126     for c in args.args:
127         if c.startswith('>'):
128             reading_into = 0
129             if len(c) > 1:
130                 slots[reading_into].append(c[1:])
131         elif c.startswith('<'):
132             reading_into = 1
133             if len(c) > 1:
134                 slots[reading_into].append(c[1:])
135         elif c == '|':
136             reading_into = len(slots)
137             slots.append([])
138         else:
139             slots[reading_into].append(c)
140
141     if slots[0] and len(slots[0]) > 1:
142         logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
143         return
144
145     if not args.dry_run:
146         api = arvados.api('v1')
147         if args.project_uuid:
148             project = args.project_uuid
149         else:
150             project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
151
152     # Identify input files.  Look at each parameter and test to see if there is
153     # a file by that name.  This uses 'patterns' to look for within
154     # command line arguments, such as --foo=file.txt or -lfile.txt
155     patterns = [re.compile("([^=]+=)(.*)"),
156                 re.compile("(-[A-Za-z])(.+)")]
157     for j, command in enumerate(slots[1:]):
158         for i, a in enumerate(command):
159             if j > 0 and i == 0:
160                 # j == 0 is stdin, j > 0 is commands
161                 # always skip program executable (i == 0) in commands
162                 pass
163             elif a.startswith('\\'):
164                 # if it starts with a \ then don't do any interpretation
165                 command[i] = a[1:]
166             else:
167                 # See if it looks like a file
168                 command[i] = statfile('', a)
169
170                 # If a file named command[i] was found, it would now be an
171                 # ArvFile or UploadFile.  If command[i] is a basestring, that
172                 # means it doesn't correspond exactly to a file, so do some
173                 # pattern matching.
174                 if isinstance(command[i], basestring):
175                     for p in patterns:
176                         m = p.match(a)
177                         if m:
178                             command[i] = statfile(m.group(1), m.group(2))
179                             break
180
181     n = True
182     pathprefix = "/"
183     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
184     if len(files) > 0:
185         # Find the smallest path prefix that includes all the files that need to be uploaded.
186         # This starts at the root and iteratively removes common parent directory prefixes
187         # until all file pathes no longer have a common parent.
188         while n:
189             pathstep = None
190             for c in files:
191                 if pathstep is None:
192                     sp = c.fn.split('/')
193                     if len(sp) < 2:
194                         # no parent directories left
195                         n = False
196                         break
197                     # path step takes next directory
198                     pathstep = sp[0] + "/"
199                 else:
200                     # check if pathstep is common prefix for all files
201                     if not c.fn.startswith(pathstep):
202                         n = False
203                         break
204             if n:
205                 # pathstep is common parent directory for all files, so remove the prefix
206                 # from each path
207                 pathprefix += pathstep
208                 for c in files:
209                     c.fn = c.fn[len(pathstep):]
210
211         orgdir = os.getcwd()
212         os.chdir(pathprefix)
213
214         print("Upload local files: \"%s\"" % '" "'.join([c.fn for c in files]))
215
216         if args.dry_run:
217             print("$(input) is %s" % pathprefix.rstrip('/'))
218             pdh = "$(input)"
219         else:
220             files = sorted(files, key=lambda x: x.fn)
221             collection = arvados.CollectionWriter(api, num_retries=args.retries)
222             stream = None
223             for f in files:
224                 sp = os.path.split(f.fn)
225                 if sp[0] != stream:
226                     stream = sp[0]
227                     collection.start_new_stream(stream)
228                 collection.write_file(f.fn, sp[1])
229             item = api.collections().create(body={"owner_uuid": project, "manifest_text": collection.manifest_text()}).execute()
230             pdh = item["portable_data_hash"]
231             print "Uploaded to %s" % item["uuid"]
232
233         for c in files:
234             c.fn = "$(file %s/%s)" % (pdh, c.fn)
235
236         os.chdir(orgdir)
237
238     for i in xrange(1, len(slots)):
239         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
240
241     component = {
242         "script": "run-command",
243         "script_version": args.script_version,
244         "repository": args.repository,
245         "script_parameters": {
246         },
247         "runtime_constraints": {
248             "docker_image": args.docker_image
249         }
250     }
251
252     task_foreach = []
253     group_parser = argparse.ArgumentParser()
254     group_parser.add_argument('-b', '--batch-size', type=int)
255     group_parser.add_argument('args', nargs=argparse.REMAINDER)
256
257     for s in xrange(2, len(slots)):
258         for i in xrange(0, len(slots[s])):
259             if slots[s][i] == '--':
260                 inp = "input%i" % (s-2)
261                 groupargs = group_parser.parse_args(slots[2][i+1:])
262                 if groupargs.batch_size:
263                     component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
264                     slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
265                 else:
266                     component["script_parameters"][inp] = groupargs.args
267                     slots[s] = slots[s][0:i] + ["$(%s)" % inp]
268                 task_foreach.append(inp)
269                 break
270             if slots[s][i] == '\--':
271                 slots[s][i] = '--'
272
273     if slots[0]:
274         component["script_parameters"]["task.stdout"] = slots[0][0]
275     if slots[1]:
276         task_foreach.append("stdin")
277         component["script_parameters"]["stdin"] = slots[1]
278         component["script_parameters"]["task.stdin"] = "$(stdin)"
279
280     if task_foreach:
281         component["script_parameters"]["task.foreach"] = task_foreach
282
283     component["script_parameters"]["command"] = slots[2:]
284     if args.ignore_rcode:
285         component["script_parameters"]["task.ignore_rcode"] = args.ignore_rcode
286
287     pipeline = {
288         "name": "arv-run " + " | ".join([s[0] for s in slots[2:]]),
289         "description": "@" + " ".join(starting_args) + "@",
290         "components": {
291             "command": component
292         },
293         "state": "RunningOnClient" if args.local else "RunningOnServer"
294     }
295
296     if args.dry_run:
297         print(json.dumps(pipeline, indent=4))
298     else:
299         pipeline["owner_uuid"] = project
300         pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
301         print "Running pipeline %s" % pi["uuid"]
302
303         if args.local:
304             subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
305         elif not args.no_wait:
306             ws.main(["--pipeline", pi["uuid"]])
307
308         pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
309         print "Pipeline is %s" % pi["state"]
310         if "output_uuid" in pi["components"]["command"]:
311             print "Output is %s" % pi["components"]["command"]["output_uuid"]
312         else:
313             print "No output"
314
315 if __name__ == '__main__':
316     main()