14075: Tweak ruamel.yaml verision range in setup.py
[arvados.git] / sdk / python / arvados / commands / run.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 # Copyright (C) 2018 Genome Research Ltd.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 from __future__ import print_function
19 from __future__ import absolute_import
20 from builtins import range
21 from past.builtins import basestring
22 from builtins import object
23 import arvados
24 import arvados.commands.ws as ws
25 import argparse
26 import json
27 import re
28 import os
29 import stat
30 from . import put
31 import time
32 import subprocess
33 import logging
34 import sys
35 import errno
36 import arvados.commands._util as arv_cmd
37 import arvados.collection
38 import arvados.config as config
39
40 from arvados._version import __version__
41
42 logger = logging.getLogger('arvados.arv-run')
43 logger.setLevel(logging.INFO)
44
45 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
46 arvrun_parser.add_argument('--dry-run', action="store_true",
47                            help="Print out the pipeline that would be submitted and exit")
48 arvrun_parser.add_argument('--local', action="store_true",
49                            help="Run locally using arv-run-pipeline-instance")
50 arvrun_parser.add_argument('--docker-image', type=str,
51                            help="Docker image to use, otherwise use instance default.")
52 arvrun_parser.add_argument('--ignore-rcode', action="store_true",
53                            help="Commands that return non-zero return codes should not be considered failed.")
54 arvrun_parser.add_argument('--no-reuse', action="store_true",
55                            help="Do not reuse past jobs.")
56 arvrun_parser.add_argument('--no-wait', action="store_true",
57                            help="Do not wait and display logs after submitting command, just exit.")
58 arvrun_parser.add_argument('--project-uuid', type=str,
59                            help="Parent project of the pipeline")
60 arvrun_parser.add_argument('--git-dir', type=str, default="",
61                            help="Git repository passed to arv-crunch-job when using --local")
62 arvrun_parser.add_argument('--repository', type=str, default="arvados",
63                            help="repository field of component, default 'arvados'")
64 arvrun_parser.add_argument('--script-version', type=str, default="master",
65                            help="script_version field of component, default 'master'")
66 arvrun_parser.add_argument('--version', action='version',
67                            version="%s %s" % (sys.argv[0], __version__),
68                            help='Print version and exit.')
69 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
70
71 class ArvFile(object):
72     def __init__(self, prefix, fn):
73         self.prefix = prefix
74         self.fn = fn
75
76     def __hash__(self):
77         return (self.prefix+self.fn).__hash__()
78
79     def __eq__(self, other):
80         return (self.prefix == other.prefix) and (self.fn == other.fn)
81
82 class UploadFile(ArvFile):
83     pass
84
85 # Determine if a file is in a collection, and return a tuple consisting of the
86 # portable data hash and the path relative to the root of the collection.
87 # Return None if the path isn't with an arv-mount collection or there was is error.
88 def is_in_collection(root, branch):
89     try:
90         if root == "/":
91             return (None, None)
92         fn = os.path.join(root, ".arvados#collection")
93         if os.path.exists(fn):
94             with file(fn, 'r') as f:
95                 c = json.load(f)
96             return (c["portable_data_hash"], branch)
97         else:
98             sp = os.path.split(root)
99             return is_in_collection(sp[0], os.path.join(sp[1], branch))
100     except (IOError, OSError):
101         return (None, None)
102
103 # Determine the project to place the output of this command by searching upward
104 # for arv-mount psuedofile indicating the project.  If the cwd isn't within
105 # an arv-mount project or there is an error, return current_user.
106 def determine_project(root, current_user):
107     try:
108         if root == "/":
109             return current_user
110         fn = os.path.join(root, ".arvados#project")
111         if os.path.exists(fn):
112             with file(fn, 'r') as f:
113                 c = json.load(f)
114             if 'writable_by' in c and current_user in c['writable_by']:
115                 return c["uuid"]
116             else:
117                 return current_user
118         else:
119             sp = os.path.split(root)
120             return determine_project(sp[0], current_user)
121     except (IOError, OSError):
122         return current_user
123
124 # Determine if string corresponds to a file, and if that file is part of a
125 # arv-mounted collection or only local to the machine.  Returns one of
126 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
127 # be uploaded to a collection), or simply returns prefix+fn (which yields the
128 # original parameter string).
129 def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
130     absfn = os.path.abspath(fn)
131     try:
132         st = os.stat(absfn)
133         sp = os.path.split(absfn)
134         (pdh, branch) = is_in_collection(sp[0], sp[1])
135         if pdh:
136             if stat.S_ISREG(st.st_mode):
137                 return ArvFile(prefix, fnPattern % (pdh, branch))
138             elif stat.S_ISDIR(st.st_mode):
139                 return ArvFile(prefix, dirPattern % (pdh, branch))
140             else:
141                 raise Exception("%s is not a regular file or directory" % absfn)
142         else:
143             # trim leading '/' for path prefix test later
144             return UploadFile(prefix, absfn[1:])
145     except OSError as e:
146         if e.errno == errno.ENOENT and not raiseOSError:
147             pass
148         else:
149             raise
150
151     return prefix+fn
152
153 def write_file(collection, pathprefix, fn):
154     with open(os.path.join(pathprefix, fn)) as src:
155         dst = collection.open(fn, "w")
156         r = src.read(1024*128)
157         while r:
158             dst.write(r)
159             r = src.read(1024*128)
160         dst.close(flush=False)
161
162 def uploadfiles(files, api, dry_run=False, num_retries=0,
163                 project=None,
164                 fnPattern="$(file %s/%s)",
165                 name=None,
166                 collection=None):
167     # Find the smallest path prefix that includes all the files that need to be uploaded.
168     # This starts at the root and iteratively removes common parent directory prefixes
169     # until all file paths no longer have a common parent.
170     if files:
171         n = True
172         pathprefix = "/"
173         while n:
174             pathstep = None
175             for c in files:
176                 if pathstep is None:
177                     sp = c.fn.split('/')
178                     if len(sp) < 2:
179                         # no parent directories left
180                         n = False
181                         break
182                     # path step takes next directory
183                     pathstep = sp[0] + "/"
184                 else:
185                     # check if pathstep is common prefix for all files
186                     if not c.fn.startswith(pathstep):
187                         n = False
188                         break
189             if n:
190                 # pathstep is common parent directory for all files, so remove the prefix
191                 # from each path
192                 pathprefix += pathstep
193                 for c in files:
194                     c.fn = c.fn[len(pathstep):]
195
196         logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
197
198     if dry_run:
199         logger.info("$(input) is %s", pathprefix.rstrip('/'))
200         pdh = "$(input)"
201     else:
202         files = sorted(files, key=lambda x: x.fn)
203         if collection is None:
204             collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
205         prev = ""
206         for f in files:
207             localpath = os.path.join(pathprefix, f.fn)
208             if prev and localpath.startswith(prev+"/"):
209                 # If this path is inside an already uploaded subdirectory,
210                 # don't redundantly re-upload it.
211                 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
212                 # skip it because it starts with "/tmp/foo/"
213                 continue
214             prev = localpath
215             if os.path.isfile(localpath):
216                 write_file(collection, pathprefix, f.fn)
217             elif os.path.isdir(localpath):
218                 for root, dirs, iterfiles in os.walk(localpath):
219                     root = root[len(pathprefix):]
220                     for src in iterfiles:
221                         write_file(collection, pathprefix, os.path.join(root, src))
222
223         pdh = None
224         if len(collection) > 0:
225             # non-empty collection
226             filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
227             name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
228             if name:
229                 filters.append(["name", "=", name_pdh])
230             if project:
231                 filters.append(["owner_uuid", "=", project])
232
233             # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
234             # and there is a potential race with other workflows that may have created the collection
235             # between when we list it and find it does not exist and when we attempt to create it.
236             tries = 2
237             while pdh is None and tries > 0:
238                 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
239
240                 if exists["items"]:
241                     item = exists["items"][0]
242                     pdh = item["portable_data_hash"]
243                     logger.info("Using collection %s (%s)", pdh, item["uuid"])
244                 else:
245                     try:
246                         collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
247                         pdh = collection.portable_data_hash()
248                         logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
249                     except arvados.errors.ApiError as ae:
250                         tries -= 1
251             if pdh is None:
252                 # Something weird going on here, probably a collection
253                 # with a conflicting name but wrong PDH.  We won't
254                 # able to reuse it but we still need to save our
255                 # collection, so so save it with unique name.
256                 logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
257                 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
258                 pdh = collection.portable_data_hash()
259                 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
260         else:
261             # empty collection
262             pdh = collection.portable_data_hash()
263             assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
264             logger.info("Using empty collection %s", pdh)
265
266     for c in files:
267         c.keepref = "%s/%s" % (pdh, c.fn)
268         c.fn = fnPattern % (pdh, c.fn)
269
270
271 def main(arguments=None):
272     args = arvrun_parser.parse_args(arguments)
273
274     if len(args.args) == 0:
275         arvrun_parser.print_help()
276         return
277
278     starting_args = args.args
279
280     reading_into = 2
281
282     # Parse the command arguments into 'slots'.
283     # All words following '>' are output arguments and are collected into slots[0].
284     # All words following '<' are input arguments and are collected into slots[1].
285     # slots[2..] store the parameters of each command in the pipeline.
286     #
287     # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
288     # will be parsed into:
289     #   [['output.txt'],
290     #    ['input1', 'input2', 'input3'],
291     #    ['foo', 'arg1', 'arg2'],
292     #    ['bar', 'arg3', 'arg4']]
293     slots = [[], [], []]
294     for c in args.args:
295         if c.startswith('>'):
296             reading_into = 0
297             if len(c) > 1:
298                 slots[reading_into].append(c[1:])
299         elif c.startswith('<'):
300             reading_into = 1
301             if len(c) > 1:
302                 slots[reading_into].append(c[1:])
303         elif c == '|':
304             reading_into = len(slots)
305             slots.append([])
306         else:
307             slots[reading_into].append(c)
308
309     if slots[0] and len(slots[0]) > 1:
310         logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
311         return
312
313     if not args.dry_run:
314         api = arvados.api('v1')
315         if args.project_uuid:
316             project = args.project_uuid
317         else:
318             project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
319
320     # Identify input files.  Look at each parameter and test to see if there is
321     # a file by that name.  This uses 'patterns' to look for within
322     # command line arguments, such as --foo=file.txt or -lfile.txt
323     patterns = [re.compile("([^=]+=)(.*)"),
324                 re.compile("(-[A-Za-z])(.+)")]
325     for j, command in enumerate(slots[1:]):
326         for i, a in enumerate(command):
327             if j > 0 and i == 0:
328                 # j == 0 is stdin, j > 0 is commands
329                 # always skip program executable (i == 0) in commands
330                 pass
331             elif a.startswith('\\'):
332                 # if it starts with a \ then don't do any interpretation
333                 command[i] = a[1:]
334             else:
335                 # See if it looks like a file
336                 command[i] = statfile('', a)
337
338                 # If a file named command[i] was found, it would now be an
339                 # ArvFile or UploadFile.  If command[i] is a basestring, that
340                 # means it doesn't correspond exactly to a file, so do some
341                 # pattern matching.
342                 if isinstance(command[i], basestring):
343                     for p in patterns:
344                         m = p.match(a)
345                         if m:
346                             command[i] = statfile(m.group(1), m.group(2))
347                             break
348
349     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
350     if files:
351         uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.retries, project=project)
352
353     for i in range(1, len(slots)):
354         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
355
356     component = {
357         "script": "run-command",
358         "script_version": args.script_version,
359         "repository": args.repository,
360         "script_parameters": {
361         },
362         "runtime_constraints": {}
363     }
364
365     if args.docker_image:
366         component["runtime_constraints"]["docker_image"] = args.docker_image
367
368     task_foreach = []
369     group_parser = argparse.ArgumentParser()
370     group_parser.add_argument('-b', '--batch-size', type=int)
371     group_parser.add_argument('args', nargs=argparse.REMAINDER)
372
373     for s in range(2, len(slots)):
374         for i in range(0, len(slots[s])):
375             if slots[s][i] == '--':
376                 inp = "input%i" % (s-2)
377                 groupargs = group_parser.parse_args(slots[2][i+1:])
378                 if groupargs.batch_size:
379                     component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
380                     slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
381                 else:
382                     component["script_parameters"][inp] = groupargs.args
383                     slots[s] = slots[s][0:i] + ["$(%s)" % inp]
384                 task_foreach.append(inp)
385                 break
386             if slots[s][i] == '\--':
387                 slots[s][i] = '--'
388
389     if slots[0]:
390         component["script_parameters"]["task.stdout"] = slots[0][0]
391     if slots[1]:
392         task_foreach.append("stdin")
393         component["script_parameters"]["stdin"] = slots[1]
394         component["script_parameters"]["task.stdin"] = "$(stdin)"
395
396     if task_foreach:
397         component["script_parameters"]["task.foreach"] = task_foreach
398
399     component["script_parameters"]["command"] = slots[2:]
400     if args.ignore_rcode:
401         component["script_parameters"]["task.ignore_rcode"] = args.ignore_rcode
402
403     pipeline = {
404         "name": "arv-run " + " | ".join([s[0] for s in slots[2:]]),
405         "description": "@" + " ".join(starting_args) + "@",
406         "components": {
407             "command": component
408         },
409         "state": "RunningOnClient" if args.local else "RunningOnServer"
410     }
411
412     if args.dry_run:
413         print(json.dumps(pipeline, indent=4))
414     else:
415         pipeline["owner_uuid"] = project
416         pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
417         logger.info("Running pipeline %s", pi["uuid"])
418
419         if args.local:
420             subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
421         elif not args.no_wait:
422             ws.main(["--pipeline", pi["uuid"]])
423
424         pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
425         logger.info("Pipeline is %s", pi["state"])
426         if "output_uuid" in pi["components"]["command"]:
427             logger.info("Output is %s", pi["components"]["command"]["output_uuid"])
428         else:
429             logger.info("No output")
430
431 if __name__ == '__main__':
432     main()