Merge remote-tracking branch 'origin/master' into 14645-fuse-operations-reporting
[arvados.git] / sdk / python / arvados / commands / run.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 # Copyright (C) 2018 Genome Research Ltd.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 from __future__ import print_function
19 from __future__ import absolute_import
20 from builtins import range
21 from past.builtins import basestring
22 from builtins import object
23 import arvados
24 import arvados.commands.ws as ws
25 import argparse
26 import json
27 import re
28 import os
29 import stat
30 from . import put
31 import time
32 import subprocess
33 import logging
34 import sys
35 import errno
36 import arvados.commands._util as arv_cmd
37 import arvados.collection
38 import arvados.config as config
39
40 from arvados._version import __version__
41
42 logger = logging.getLogger('arvados.arv-run')
43 logger.setLevel(logging.INFO)
44
45 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
46 arvrun_parser.add_argument('--dry-run', action="store_true",
47                            help="Print out the pipeline that would be submitted and exit")
48 arvrun_parser.add_argument('--local', action="store_true",
49                            help="Run locally using arv-run-pipeline-instance")
50 arvrun_parser.add_argument('--docker-image', type=str,
51                            help="Docker image to use, otherwise use instance default.")
52 arvrun_parser.add_argument('--ignore-rcode', action="store_true",
53                            help="Commands that return non-zero return codes should not be considered failed.")
54 arvrun_parser.add_argument('--no-reuse', action="store_true",
55                            help="Do not reuse past jobs.")
56 arvrun_parser.add_argument('--no-wait', action="store_true",
57                            help="Do not wait and display logs after submitting command, just exit.")
58 arvrun_parser.add_argument('--project-uuid', type=str,
59                            help="Parent project of the pipeline")
60 arvrun_parser.add_argument('--git-dir', type=str, default="",
61                            help="Git repository passed to arv-crunch-job when using --local")
62 arvrun_parser.add_argument('--repository', type=str, default="arvados",
63                            help="repository field of component, default 'arvados'")
64 arvrun_parser.add_argument('--script-version', type=str, default="master",
65                            help="script_version field of component, default 'master'")
66 arvrun_parser.add_argument('--version', action='version',
67                            version="%s %s" % (sys.argv[0], __version__),
68                            help='Print version and exit.')
69 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
70
71 class ArvFile(object):
72     def __init__(self, prefix, fn):
73         self.prefix = prefix
74         self.fn = fn
75
76     def __hash__(self):
77         return (self.prefix+self.fn).__hash__()
78
79     def __eq__(self, other):
80         return (self.prefix == other.prefix) and (self.fn == other.fn)
81
82 class UploadFile(ArvFile):
83     pass
84
85 # Determine if a file is in a collection, and return a tuple consisting of the
86 # portable data hash and the path relative to the root of the collection.
87 # Return None if the path isn't with an arv-mount collection or there was is error.
88 def is_in_collection(root, branch):
89     try:
90         if root == "/":
91             return (None, None)
92         fn = os.path.join(root, ".arvados#collection")
93         if os.path.exists(fn):
94             with file(fn, 'r') as f:
95                 c = json.load(f)
96             return (c["portable_data_hash"], branch)
97         else:
98             sp = os.path.split(root)
99             return is_in_collection(sp[0], os.path.join(sp[1], branch))
100     except (IOError, OSError):
101         return (None, None)
102
103 # Determine the project to place the output of this command by searching upward
104 # for arv-mount psuedofile indicating the project.  If the cwd isn't within
105 # an arv-mount project or there is an error, return current_user.
106 def determine_project(root, current_user):
107     try:
108         if root == "/":
109             return current_user
110         fn = os.path.join(root, ".arvados#project")
111         if os.path.exists(fn):
112             with file(fn, 'r') as f:
113                 c = json.load(f)
114             if 'writable_by' in c and current_user in c['writable_by']:
115                 return c["uuid"]
116             else:
117                 return current_user
118         else:
119             sp = os.path.split(root)
120             return determine_project(sp[0], current_user)
121     except (IOError, OSError):
122         return current_user
123
124 # Determine if string corresponds to a file, and if that file is part of a
125 # arv-mounted collection or only local to the machine.  Returns one of
126 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
127 # be uploaded to a collection), or simply returns prefix+fn (which yields the
128 # original parameter string).
129 def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
130     absfn = os.path.abspath(fn)
131     try:
132         st = os.stat(absfn)
133         sp = os.path.split(absfn)
134         (pdh, branch) = is_in_collection(sp[0], sp[1])
135         if pdh:
136             if stat.S_ISREG(st.st_mode):
137                 return ArvFile(prefix, fnPattern % (pdh, branch))
138             elif stat.S_ISDIR(st.st_mode):
139                 return ArvFile(prefix, dirPattern % (pdh, branch))
140             else:
141                 raise Exception("%s is not a regular file or directory" % absfn)
142         else:
143             # trim leading '/' for path prefix test later
144             return UploadFile(prefix, absfn[1:])
145     except OSError as e:
146         if e.errno == errno.ENOENT and not raiseOSError:
147             pass
148         else:
149             raise
150
151     return prefix+fn
152
153 def write_file(collection, pathprefix, fn, flush=False):
154     with open(os.path.join(pathprefix, fn), "rb") as src:
155         dst = collection.open(fn, "wb")
156         r = src.read(1024*128)
157         while r:
158             dst.write(r)
159             r = src.read(1024*128)
160         dst.close(flush=flush)
161
162 def uploadfiles(files, api, dry_run=False, num_retries=0,
163                 project=None,
164                 fnPattern="$(file %s/%s)",
165                 name=None,
166                 collection=None,
167                 packed=True):
168     # Find the smallest path prefix that includes all the files that need to be uploaded.
169     # This starts at the root and iteratively removes common parent directory prefixes
170     # until all file paths no longer have a common parent.
171     if files:
172         n = True
173         pathprefix = "/"
174         while n:
175             pathstep = None
176             for c in files:
177                 if pathstep is None:
178                     sp = c.fn.split('/')
179                     if len(sp) < 2:
180                         # no parent directories left
181                         n = False
182                         break
183                     # path step takes next directory
184                     pathstep = sp[0] + "/"
185                 else:
186                     # check if pathstep is common prefix for all files
187                     if not c.fn.startswith(pathstep):
188                         n = False
189                         break
190             if n:
191                 # pathstep is common parent directory for all files, so remove the prefix
192                 # from each path
193                 pathprefix += pathstep
194                 for c in files:
195                     c.fn = c.fn[len(pathstep):]
196
197         logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
198
199     if dry_run:
200         logger.info("$(input) is %s", pathprefix.rstrip('/'))
201         pdh = "$(input)"
202     else:
203         files = sorted(files, key=lambda x: x.fn)
204         if collection is None:
205             collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
206         prev = ""
207         for f in files:
208             localpath = os.path.join(pathprefix, f.fn)
209             if prev and localpath.startswith(prev+"/"):
210                 # If this path is inside an already uploaded subdirectory,
211                 # don't redundantly re-upload it.
212                 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
213                 # skip it because it starts with "/tmp/foo/"
214                 continue
215             prev = localpath
216             if os.path.isfile(localpath):
217                 write_file(collection, pathprefix, f.fn, not packed)
218             elif os.path.isdir(localpath):
219                 for root, dirs, iterfiles in os.walk(localpath):
220                     root = root[len(pathprefix):]
221                     for src in iterfiles:
222                         write_file(collection, pathprefix, os.path.join(root, src), not packed)
223
224         pdh = None
225         if len(collection) > 0:
226             # non-empty collection
227             filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
228             name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
229             if name:
230                 filters.append(["name", "=", name_pdh])
231             if project:
232                 filters.append(["owner_uuid", "=", project])
233
234             # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
235             # and there is a potential race with other workflows that may have created the collection
236             # between when we list it and find it does not exist and when we attempt to create it.
237             tries = 2
238             while pdh is None and tries > 0:
239                 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
240
241                 if exists["items"]:
242                     item = exists["items"][0]
243                     pdh = item["portable_data_hash"]
244                     logger.info("Using collection %s (%s)", pdh, item["uuid"])
245                 else:
246                     try:
247                         collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
248                         pdh = collection.portable_data_hash()
249                         logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
250                     except arvados.errors.ApiError as ae:
251                         tries -= 1
252             if pdh is None:
253                 # Something weird going on here, probably a collection
254                 # with a conflicting name but wrong PDH.  We won't
255                 # able to reuse it but we still need to save our
256                 # collection, so so save it with unique name.
257                 logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
258                 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
259                 pdh = collection.portable_data_hash()
260                 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
261         else:
262             # empty collection
263             pdh = collection.portable_data_hash()
264             assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
265             logger.info("Using empty collection %s", pdh)
266
267     for c in files:
268         c.keepref = "%s/%s" % (pdh, c.fn)
269         c.fn = fnPattern % (pdh, c.fn)
270
271
272 def main(arguments=None):
273     args = arvrun_parser.parse_args(arguments)
274
275     if len(args.args) == 0:
276         arvrun_parser.print_help()
277         return
278
279     starting_args = args.args
280
281     reading_into = 2
282
283     # Parse the command arguments into 'slots'.
284     # All words following '>' are output arguments and are collected into slots[0].
285     # All words following '<' are input arguments and are collected into slots[1].
286     # slots[2..] store the parameters of each command in the pipeline.
287     #
288     # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
289     # will be parsed into:
290     #   [['output.txt'],
291     #    ['input1', 'input2', 'input3'],
292     #    ['foo', 'arg1', 'arg2'],
293     #    ['bar', 'arg3', 'arg4']]
294     slots = [[], [], []]
295     for c in args.args:
296         if c.startswith('>'):
297             reading_into = 0
298             if len(c) > 1:
299                 slots[reading_into].append(c[1:])
300         elif c.startswith('<'):
301             reading_into = 1
302             if len(c) > 1:
303                 slots[reading_into].append(c[1:])
304         elif c == '|':
305             reading_into = len(slots)
306             slots.append([])
307         else:
308             slots[reading_into].append(c)
309
310     if slots[0] and len(slots[0]) > 1:
311         logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
312         return
313
314     if not args.dry_run:
315         api = arvados.api('v1')
316         if args.project_uuid:
317             project = args.project_uuid
318         else:
319             project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
320
321     # Identify input files.  Look at each parameter and test to see if there is
322     # a file by that name.  This uses 'patterns' to look for within
323     # command line arguments, such as --foo=file.txt or -lfile.txt
324     patterns = [re.compile("([^=]+=)(.*)"),
325                 re.compile("(-[A-Za-z])(.+)")]
326     for j, command in enumerate(slots[1:]):
327         for i, a in enumerate(command):
328             if j > 0 and i == 0:
329                 # j == 0 is stdin, j > 0 is commands
330                 # always skip program executable (i == 0) in commands
331                 pass
332             elif a.startswith('\\'):
333                 # if it starts with a \ then don't do any interpretation
334                 command[i] = a[1:]
335             else:
336                 # See if it looks like a file
337                 command[i] = statfile('', a)
338
339                 # If a file named command[i] was found, it would now be an
340                 # ArvFile or UploadFile.  If command[i] is a basestring, that
341                 # means it doesn't correspond exactly to a file, so do some
342                 # pattern matching.
343                 if isinstance(command[i], basestring):
344                     for p in patterns:
345                         m = p.match(a)
346                         if m:
347                             command[i] = statfile(m.group(1), m.group(2))
348                             break
349
350     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
351     if files:
352         uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.retries, project=project)
353
354     for i in range(1, len(slots)):
355         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
356
357     component = {
358         "script": "run-command",
359         "script_version": args.script_version,
360         "repository": args.repository,
361         "script_parameters": {
362         },
363         "runtime_constraints": {}
364     }
365
366     if args.docker_image:
367         component["runtime_constraints"]["docker_image"] = args.docker_image
368
369     task_foreach = []
370     group_parser = argparse.ArgumentParser()
371     group_parser.add_argument('-b', '--batch-size', type=int)
372     group_parser.add_argument('args', nargs=argparse.REMAINDER)
373
374     for s in range(2, len(slots)):
375         for i in range(0, len(slots[s])):
376             if slots[s][i] == '--':
377                 inp = "input%i" % (s-2)
378                 groupargs = group_parser.parse_args(slots[2][i+1:])
379                 if groupargs.batch_size:
380                     component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
381                     slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
382                 else:
383                     component["script_parameters"][inp] = groupargs.args
384                     slots[s] = slots[s][0:i] + ["$(%s)" % inp]
385                 task_foreach.append(inp)
386                 break
387             if slots[s][i] == '\--':
388                 slots[s][i] = '--'
389
390     if slots[0]:
391         component["script_parameters"]["task.stdout"] = slots[0][0]
392     if slots[1]:
393         task_foreach.append("stdin")
394         component["script_parameters"]["stdin"] = slots[1]
395         component["script_parameters"]["task.stdin"] = "$(stdin)"
396
397     if task_foreach:
398         component["script_parameters"]["task.foreach"] = task_foreach
399
400     component["script_parameters"]["command"] = slots[2:]
401     if args.ignore_rcode:
402         component["script_parameters"]["task.ignore_rcode"] = args.ignore_rcode
403
404     pipeline = {
405         "name": "arv-run " + " | ".join([s[0] for s in slots[2:]]),
406         "description": "@" + " ".join(starting_args) + "@",
407         "components": {
408             "command": component
409         },
410         "state": "RunningOnClient" if args.local else "RunningOnServer"
411     }
412
413     if args.dry_run:
414         print(json.dumps(pipeline, indent=4))
415     else:
416         pipeline["owner_uuid"] = project
417         pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
418         logger.info("Running pipeline %s", pi["uuid"])
419
420         if args.local:
421             subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
422         elif not args.no_wait:
423             ws.main(["--pipeline", pi["uuid"]])
424
425         pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
426         logger.info("Pipeline is %s", pi["state"])
427         if "output_uuid" in pi["components"]["command"]:
428             logger.info("Output is %s", pi["components"]["command"]["output_uuid"])
429         else:
430             logger.info("No output")
431
432 if __name__ == '__main__':
433     main()