fix slow queries in uploadfiles
[arvados.git] / sdk / python / arvados / commands / run.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 # Copyright (C) 2018 Genome Research Ltd.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 from __future__ import print_function
19 from __future__ import absolute_import
20 from builtins import range
21 from past.builtins import basestring
22 from builtins import object
23 import arvados
24 import arvados.commands.ws as ws
25 import argparse
26 import json
27 import re
28 import os
29 import stat
30 from . import put
31 import time
32 import subprocess
33 import logging
34 import sys
35 import errno
36 import arvados.commands._util as arv_cmd
37 import arvados.collection
38 import arvados.config as config
39
40 from arvados._version import __version__
41
42 logger = logging.getLogger('arvados.arv-run')
43 logger.setLevel(logging.INFO)
44
45 arvrun_parser = argparse.ArgumentParser(parents=[arv_cmd.retry_opt])
46 arvrun_parser.add_argument('--dry-run', action="store_true",
47                            help="Print out the pipeline that would be submitted and exit")
48 arvrun_parser.add_argument('--local', action="store_true",
49                            help="Run locally using arv-run-pipeline-instance")
50 arvrun_parser.add_argument('--docker-image', type=str,
51                            help="Docker image to use, otherwise use instance default.")
52 arvrun_parser.add_argument('--ignore-rcode', action="store_true",
53                            help="Commands that return non-zero return codes should not be considered failed.")
54 arvrun_parser.add_argument('--no-reuse', action="store_true",
55                            help="Do not reuse past jobs.")
56 arvrun_parser.add_argument('--no-wait', action="store_true",
57                            help="Do not wait and display logs after submitting command, just exit.")
58 arvrun_parser.add_argument('--project-uuid', type=str,
59                            help="Parent project of the pipeline")
60 arvrun_parser.add_argument('--git-dir', type=str, default="",
61                            help="Git repository passed to arv-crunch-job when using --local")
62 arvrun_parser.add_argument('--repository', type=str, default="arvados",
63                            help="repository field of component, default 'arvados'")
64 arvrun_parser.add_argument('--script-version', type=str, default="master",
65                            help="script_version field of component, default 'master'")
66 arvrun_parser.add_argument('--version', action='version',
67                            version="%s %s" % (sys.argv[0], __version__),
68                            help='Print version and exit.')
69 arvrun_parser.add_argument('args', nargs=argparse.REMAINDER)
70
71 class ArvFile(object):
72     def __init__(self, prefix, fn):
73         self.prefix = prefix
74         self.fn = fn
75
76     def __hash__(self):
77         return (self.prefix+self.fn).__hash__()
78
79     def __eq__(self, other):
80         return (self.prefix == other.prefix) and (self.fn == other.fn)
81
82 class UploadFile(ArvFile):
83     pass
84
85 # Determine if a file is in a collection, and return a tuple consisting of the
86 # portable data hash and the path relative to the root of the collection.
87 # Return None if the path isn't with an arv-mount collection or there was is error.
88 def is_in_collection(root, branch):
89     try:
90         if root == "/":
91             return (None, None)
92         fn = os.path.join(root, ".arvados#collection")
93         if os.path.exists(fn):
94             with file(fn, 'r') as f:
95                 c = json.load(f)
96             return (c["portable_data_hash"], branch)
97         else:
98             sp = os.path.split(root)
99             return is_in_collection(sp[0], os.path.join(sp[1], branch))
100     except (IOError, OSError):
101         return (None, None)
102
103 # Determine the project to place the output of this command by searching upward
104 # for arv-mount psuedofile indicating the project.  If the cwd isn't within
105 # an arv-mount project or there is an error, return current_user.
106 def determine_project(root, current_user):
107     try:
108         if root == "/":
109             return current_user
110         fn = os.path.join(root, ".arvados#project")
111         if os.path.exists(fn):
112             with file(fn, 'r') as f:
113                 c = json.load(f)
114             if 'writable_by' in c and current_user in c['writable_by']:
115                 return c["uuid"]
116             else:
117                 return current_user
118         else:
119             sp = os.path.split(root)
120             return determine_project(sp[0], current_user)
121     except (IOError, OSError):
122         return current_user
123
124 # Determine if string corresponds to a file, and if that file is part of a
125 # arv-mounted collection or only local to the machine.  Returns one of
126 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
127 # be uploaded to a collection), or simply returns prefix+fn (which yields the
128 # original parameter string).
129 def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
130     absfn = os.path.abspath(fn)
131     try:
132         st = os.stat(absfn)
133         sp = os.path.split(absfn)
134         (pdh, branch) = is_in_collection(sp[0], sp[1])
135         if pdh:
136             if stat.S_ISREG(st.st_mode):
137                 return ArvFile(prefix, fnPattern % (pdh, branch))
138             elif stat.S_ISDIR(st.st_mode):
139                 return ArvFile(prefix, dirPattern % (pdh, branch))
140             else:
141                 raise Exception("%s is not a regular file or directory" % absfn)
142         else:
143             # trim leading '/' for path prefix test later
144             return UploadFile(prefix, absfn[1:])
145     except OSError as e:
146         if e.errno == errno.ENOENT and not raiseOSError:
147             pass
148         else:
149             raise
150
151     return prefix+fn
152
153 def write_file(collection, pathprefix, fn):
154     with open(os.path.join(pathprefix, fn)) as src:
155         dst = collection.open(fn, "w")
156         r = src.read(1024*128)
157         while r:
158             dst.write(r)
159             r = src.read(1024*128)
160         dst.close(flush=False)
161
162 def uploadfiles(files, api, dry_run=False, num_retries=0,
163                 project=None,
164                 fnPattern="$(file %s/%s)",
165                 name=None,
166                 collection=None):
167     # Find the smallest path prefix that includes all the files that need to be uploaded.
168     # This starts at the root and iteratively removes common parent directory prefixes
169     # until all file paths no longer have a common parent.
170     if files:
171         n = True
172         pathprefix = "/"
173         while n:
174             pathstep = None
175             for c in files:
176                 if pathstep is None:
177                     sp = c.fn.split('/')
178                     if len(sp) < 2:
179                         # no parent directories left
180                         n = False
181                         break
182                     # path step takes next directory
183                     pathstep = sp[0] + "/"
184                 else:
185                     # check if pathstep is common prefix for all files
186                     if not c.fn.startswith(pathstep):
187                         n = False
188                         break
189             if n:
190                 # pathstep is common parent directory for all files, so remove the prefix
191                 # from each path
192                 pathprefix += pathstep
193                 for c in files:
194                     c.fn = c.fn[len(pathstep):]
195
196         logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
197
198     if dry_run:
199         logger.info("$(input) is %s", pathprefix.rstrip('/'))
200         pdh = "$(input)"
201     else:
202         files = sorted(files, key=lambda x: x.fn)
203         if collection is None:
204             collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
205         prev = ""
206         for f in files:
207             localpath = os.path.join(pathprefix, f.fn)
208             if prev and localpath.startswith(prev+"/"):
209                 # If this path is inside an already uploaded subdirectory,
210                 # don't redundantly re-upload it.
211                 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
212                 # skip it because it starts with "/tmp/foo/"
213                 continue
214             prev = localpath
215             if os.path.isfile(localpath):
216                 write_file(collection, pathprefix, f.fn)
217             elif os.path.isdir(localpath):
218                 for root, dirs, iterfiles in os.walk(localpath):
219                     root = root[len(pathprefix):]
220                     for src in iterfiles:
221                         write_file(collection, pathprefix, os.path.join(root, src))
222
223         pdh = None
224         if len(collection) > 0:
225             # non-empty collection
226             filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
227             name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
228             if name:
229                 filters.append(["name", "=", name_pdh])
230             if project:
231                 filters.append(["owner_uuid", "=", project])
232
233             # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
234             # and there is a potential race with other workflows that may have created the collection
235             # between when we list it and find it does not exist and when we attempt to create it.
236             tries = 2
237             while pdh is None and tries > 0:
238                 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
239
240                 if exists["items"]:
241                     item = exists["items"][0]
242                     pdh = item["portable_data_hash"]
243                     logger.info("Using collection %s (%s)", pdh, item["uuid"])
244                 else:
245                     try:
246                         collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
247                         pdh = collection.portable_data_hash()
248                         logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
249                     except arvados.errors.ApiError as ae:
250                         tries -= 1
251         else:
252             # empty collection
253             pdh = collection.portable_data_hash()
254             assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator"
255             logger.info("Using empty collection %s", pdh)
256
257     for c in files:
258         c.keepref = "%s/%s" % (pdh, c.fn)
259         c.fn = fnPattern % (pdh, c.fn)
260
261
262 def main(arguments=None):
263     args = arvrun_parser.parse_args(arguments)
264
265     if len(args.args) == 0:
266         arvrun_parser.print_help()
267         return
268
269     starting_args = args.args
270
271     reading_into = 2
272
273     # Parse the command arguments into 'slots'.
274     # All words following '>' are output arguments and are collected into slots[0].
275     # All words following '<' are input arguments and are collected into slots[1].
276     # slots[2..] store the parameters of each command in the pipeline.
277     #
278     # e.g. arv-run foo arg1 arg2 '|' bar arg3 arg4 '<' input1 input2 input3 '>' output.txt
279     # will be parsed into:
280     #   [['output.txt'],
281     #    ['input1', 'input2', 'input3'],
282     #    ['foo', 'arg1', 'arg2'],
283     #    ['bar', 'arg3', 'arg4']]
284     slots = [[], [], []]
285     for c in args.args:
286         if c.startswith('>'):
287             reading_into = 0
288             if len(c) > 1:
289                 slots[reading_into].append(c[1:])
290         elif c.startswith('<'):
291             reading_into = 1
292             if len(c) > 1:
293                 slots[reading_into].append(c[1:])
294         elif c == '|':
295             reading_into = len(slots)
296             slots.append([])
297         else:
298             slots[reading_into].append(c)
299
300     if slots[0] and len(slots[0]) > 1:
301         logger.error("Can only specify a single stdout file (run-command substitutions are permitted)")
302         return
303
304     if not args.dry_run:
305         api = arvados.api('v1')
306         if args.project_uuid:
307             project = args.project_uuid
308         else:
309             project = determine_project(os.getcwd(), api.users().current().execute()["uuid"])
310
311     # Identify input files.  Look at each parameter and test to see if there is
312     # a file by that name.  This uses 'patterns' to look for within
313     # command line arguments, such as --foo=file.txt or -lfile.txt
314     patterns = [re.compile("([^=]+=)(.*)"),
315                 re.compile("(-[A-Za-z])(.+)")]
316     for j, command in enumerate(slots[1:]):
317         for i, a in enumerate(command):
318             if j > 0 and i == 0:
319                 # j == 0 is stdin, j > 0 is commands
320                 # always skip program executable (i == 0) in commands
321                 pass
322             elif a.startswith('\\'):
323                 # if it starts with a \ then don't do any interpretation
324                 command[i] = a[1:]
325             else:
326                 # See if it looks like a file
327                 command[i] = statfile('', a)
328
329                 # If a file named command[i] was found, it would now be an
330                 # ArvFile or UploadFile.  If command[i] is a basestring, that
331                 # means it doesn't correspond exactly to a file, so do some
332                 # pattern matching.
333                 if isinstance(command[i], basestring):
334                     for p in patterns:
335                         m = p.match(a)
336                         if m:
337                             command[i] = statfile(m.group(1), m.group(2))
338                             break
339
340     files = [c for command in slots[1:] for c in command if isinstance(c, UploadFile)]
341     if files:
342         uploadfiles(files, api, dry_run=args.dry_run, num_retries=args.retries, project=project)
343
344     for i in range(1, len(slots)):
345         slots[i] = [("%s%s" % (c.prefix, c.fn)) if isinstance(c, ArvFile) else c for c in slots[i]]
346
347     component = {
348         "script": "run-command",
349         "script_version": args.script_version,
350         "repository": args.repository,
351         "script_parameters": {
352         },
353         "runtime_constraints": {}
354     }
355
356     if args.docker_image:
357         component["runtime_constraints"]["docker_image"] = args.docker_image
358
359     task_foreach = []
360     group_parser = argparse.ArgumentParser()
361     group_parser.add_argument('-b', '--batch-size', type=int)
362     group_parser.add_argument('args', nargs=argparse.REMAINDER)
363
364     for s in range(2, len(slots)):
365         for i in range(0, len(slots[s])):
366             if slots[s][i] == '--':
367                 inp = "input%i" % (s-2)
368                 groupargs = group_parser.parse_args(slots[2][i+1:])
369                 if groupargs.batch_size:
370                     component["script_parameters"][inp] = {"value": {"batch":groupargs.args, "size":groupargs.batch_size}}
371                     slots[s] = slots[s][0:i] + [{"foreach": inp, "command": "$(%s)" % inp}]
372                 else:
373                     component["script_parameters"][inp] = groupargs.args
374                     slots[s] = slots[s][0:i] + ["$(%s)" % inp]
375                 task_foreach.append(inp)
376                 break
377             if slots[s][i] == '\--':
378                 slots[s][i] = '--'
379
380     if slots[0]:
381         component["script_parameters"]["task.stdout"] = slots[0][0]
382     if slots[1]:
383         task_foreach.append("stdin")
384         component["script_parameters"]["stdin"] = slots[1]
385         component["script_parameters"]["task.stdin"] = "$(stdin)"
386
387     if task_foreach:
388         component["script_parameters"]["task.foreach"] = task_foreach
389
390     component["script_parameters"]["command"] = slots[2:]
391     if args.ignore_rcode:
392         component["script_parameters"]["task.ignore_rcode"] = args.ignore_rcode
393
394     pipeline = {
395         "name": "arv-run " + " | ".join([s[0] for s in slots[2:]]),
396         "description": "@" + " ".join(starting_args) + "@",
397         "components": {
398             "command": component
399         },
400         "state": "RunningOnClient" if args.local else "RunningOnServer"
401     }
402
403     if args.dry_run:
404         print(json.dumps(pipeline, indent=4))
405     else:
406         pipeline["owner_uuid"] = project
407         pi = api.pipeline_instances().create(body=pipeline, ensure_unique_name=True).execute()
408         logger.info("Running pipeline %s", pi["uuid"])
409
410         if args.local:
411             subprocess.call(["arv-run-pipeline-instance", "--instance", pi["uuid"], "--run-jobs-here"] + (["--no-reuse"] if args.no_reuse else []))
412         elif not args.no_wait:
413             ws.main(["--pipeline", pi["uuid"]])
414
415         pi = api.pipeline_instances().get(uuid=pi["uuid"]).execute()
416         logger.info("Pipeline is %s", pi["state"])
417         if "output_uuid" in pi["components"]["command"]:
418             logger.info("Output is %s", pi["components"]["command"]["output_uuid"])
419         else:
420             logger.info("No output")
421
422 if __name__ == '__main__':
423     main()