18600: Merge branch 'main'
[arvados.git] / sdk / python / arvados / commands / run.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 # Copyright (C) 2018 Genome Research Ltd.
3 #
4 # SPDX-License-Identifier: Apache-2.0
5 #
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
9 #
10 #    http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17
18 from __future__ import print_function
19 from __future__ import absolute_import
20 from builtins import range
21 from past.builtins import basestring
22 from builtins import object
23 import arvados
24 import arvados.commands.ws as ws
25 import argparse
26 import json
27 import re
28 import os
29 import stat
30 from . import put
31 import time
32 import subprocess
33 import logging
34 import sys
35 import errno
36 import arvados.commands._util as arv_cmd
37 import arvados.collection
38 import arvados.config as config
39
40 from arvados._version import __version__
41
42 logger = logging.getLogger('arvados.arv-run')
43 logger.setLevel(logging.INFO)
44
45 class ArvFile(object):
46     def __init__(self, prefix, fn):
47         self.prefix = prefix
48         self.fn = fn
49
50     def __hash__(self):
51         return (self.prefix+self.fn).__hash__()
52
53     def __eq__(self, other):
54         return (self.prefix == other.prefix) and (self.fn == other.fn)
55
56 class UploadFile(ArvFile):
57     pass
58
59 # Determine if a file is in a collection, and return a tuple consisting of the
60 # portable data hash and the path relative to the root of the collection.
61 # Return None if the path isn't with an arv-mount collection or there was is error.
62 def is_in_collection(root, branch):
63     try:
64         if root == "/":
65             return (None, None)
66         fn = os.path.join(root, ".arvados#collection")
67         if os.path.exists(fn):
68             with open(fn, 'r') as f:
69                 c = json.load(f)
70             return (c["portable_data_hash"], branch)
71         else:
72             sp = os.path.split(root)
73             return is_in_collection(sp[0], os.path.join(sp[1], branch))
74     except (IOError, OSError):
75         return (None, None)
76
77 # Determine the project to place the output of this command by searching upward
78 # for arv-mount psuedofile indicating the project.  If the cwd isn't within
79 # an arv-mount project or there is an error, return current_user.
80 def determine_project(root, current_user):
81     try:
82         if root == "/":
83             return current_user
84         fn = os.path.join(root, ".arvados#project")
85         if os.path.exists(fn):
86             with file(fn, 'r') as f:
87                 c = json.load(f)
88             if 'writable_by' in c and current_user in c['writable_by']:
89                 return c["uuid"]
90             else:
91                 return current_user
92         else:
93             sp = os.path.split(root)
94             return determine_project(sp[0], current_user)
95     except (IOError, OSError):
96         return current_user
97
98 # Determine if string corresponds to a file, and if that file is part of a
99 # arv-mounted collection or only local to the machine.  Returns one of
100 # ArvFile() (file already exists in a collection), UploadFile() (file needs to
101 # be uploaded to a collection), or simply returns prefix+fn (which yields the
102 # original parameter string).
103 def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)", raiseOSError=False):
104     absfn = os.path.abspath(fn)
105     try:
106         st = os.stat(absfn)
107         sp = os.path.split(absfn)
108         (pdh, branch) = is_in_collection(sp[0], sp[1])
109         if pdh:
110             if stat.S_ISREG(st.st_mode):
111                 return ArvFile(prefix, fnPattern % (pdh, branch))
112             elif stat.S_ISDIR(st.st_mode):
113                 return ArvFile(prefix, dirPattern % (pdh, branch))
114             else:
115                 raise Exception("%s is not a regular file or directory" % absfn)
116         else:
117             # trim leading '/' for path prefix test later
118             return UploadFile(prefix, absfn[1:])
119     except OSError as e:
120         if e.errno == errno.ENOENT and not raiseOSError:
121             pass
122         else:
123             raise
124
125     return prefix+fn
126
127 def write_file(collection, pathprefix, fn, flush=False):
128     with open(os.path.join(pathprefix, fn), "rb") as src:
129         dst = collection.open(fn, "wb")
130         r = src.read(1024*128)
131         while r:
132             dst.write(r)
133             r = src.read(1024*128)
134         dst.close(flush=flush)
135
136 def uploadfiles(files, api, dry_run=False, num_retries=0,
137                 project=None,
138                 fnPattern="$(file %s/%s)",
139                 name=None,
140                 collection=None,
141                 packed=True):
142     # Find the smallest path prefix that includes all the files that need to be uploaded.
143     # This starts at the root and iteratively removes common parent directory prefixes
144     # until all file paths no longer have a common parent.
145     if files:
146         n = True
147         pathprefix = "/"
148         while n:
149             pathstep = None
150             for c in files:
151                 if pathstep is None:
152                     sp = c.fn.split('/')
153                     if len(sp) < 2:
154                         # no parent directories left
155                         n = False
156                         break
157                     # path step takes next directory
158                     pathstep = sp[0] + "/"
159                 else:
160                     # check if pathstep is common prefix for all files
161                     if not c.fn.startswith(pathstep):
162                         n = False
163                         break
164             if n:
165                 # pathstep is common parent directory for all files, so remove the prefix
166                 # from each path
167                 pathprefix += pathstep
168                 for c in files:
169                     c.fn = c.fn[len(pathstep):]
170
171         logger.info("Upload local files: \"%s\"", '" "'.join([c.fn for c in files]))
172
173     if dry_run:
174         logger.info("$(input) is %s", pathprefix.rstrip('/'))
175         pdh = "$(input)"
176     else:
177         files = sorted(files, key=lambda x: x.fn)
178         if collection is None:
179             collection = arvados.collection.Collection(api_client=api, num_retries=num_retries)
180         prev = ""
181         for f in files:
182             localpath = os.path.join(pathprefix, f.fn)
183             if prev and localpath.startswith(prev+"/"):
184                 # If this path is inside an already uploaded subdirectory,
185                 # don't redundantly re-upload it.
186                 # e.g. we uploaded /tmp/foo and the next file is /tmp/foo/bar
187                 # skip it because it starts with "/tmp/foo/"
188                 continue
189             prev = localpath
190             if os.path.isfile(localpath):
191                 write_file(collection, pathprefix, f.fn, not packed)
192             elif os.path.isdir(localpath):
193                 for root, dirs, iterfiles in os.walk(localpath):
194                     root = root[len(pathprefix):]
195                     for src in iterfiles:
196                         write_file(collection, pathprefix, os.path.join(root, src), not packed)
197
198         pdh = None
199         if len(collection) > 0:
200             # non-empty collection
201             filters = [["portable_data_hash", "=", collection.portable_data_hash()]]
202             name_pdh = "%s (%s)" % (name, collection.portable_data_hash())
203             if name:
204                 filters.append(["name", "=", name_pdh])
205             if project:
206                 filters.append(["owner_uuid", "=", project])
207
208             # do the list / create in a loop with up to 2 tries as we are using `ensure_unique_name=False`
209             # and there is a potential race with other workflows that may have created the collection
210             # between when we list it and find it does not exist and when we attempt to create it.
211             tries = 2
212             while pdh is None and tries > 0:
213                 exists = api.collections().list(filters=filters, limit=1).execute(num_retries=num_retries)
214
215                 if exists["items"]:
216                     item = exists["items"][0]
217                     pdh = item["portable_data_hash"]
218                     logger.info("Using collection %s (%s)", pdh, item["uuid"])
219                 else:
220                     try:
221                         collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=False)
222                         pdh = collection.portable_data_hash()
223                         logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
224                     except arvados.errors.ApiError as ae:
225                         tries -= 1
226             if pdh is None:
227                 # Something weird going on here, probably a collection
228                 # with a conflicting name but wrong PDH.  We won't
229                 # able to reuse it but we still need to save our
230                 # collection, so so save it with unique name.
231                 logger.info("Name conflict on '%s', existing collection has an unexpected portable data hash", name_pdh)
232                 collection.save_new(name=name_pdh, owner_uuid=project, ensure_unique_name=True)
233                 pdh = collection.portable_data_hash()
234                 logger.info("Uploaded to %s (%s)", pdh, collection.manifest_locator())
235         else:
236             # empty collection
237             pdh = collection.portable_data_hash()
238             assert (pdh == config.EMPTY_BLOCK_LOCATOR), "Empty collection portable_data_hash did not have expected locator, was %s" % pdh
239             logger.debug("Using empty collection %s", pdh)
240
241     for c in files:
242         c.keepref = "%s/%s" % (pdh, c.fn)
243         c.fn = fnPattern % (pdh, c.fn)
244
245
246 def main(arguments=None):
247     raise Exception("Legacy arv-run removed.")
248
249 if __name__ == '__main__':
250     main()