22 import apiclient.discovery
25 from collection import *
29 EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
32 # Arvados configuration settings are taken from $HOME/.config/arvados.
33 # Environment variables override settings in the config file.
35 class ArvadosConfig(dict):
36 def __init__(self, config_file):
38 if os.path.exists(config_file):
39 with open(config_file, "r") as f:
41 var, val = config_line.rstrip().split('=', 2)
43 for var in os.environ:
44 if var.startswith('ARVADOS_'):
45 self[var] = os.environ[var]
48 class SyntaxError(Exception):
50 class AssertionError(Exception):
52 class NotFoundError(Exception):
54 class CommandFailedError(Exception):
56 class KeepWriteError(Exception):
58 class NotImplementedError(Exception):
61 class CredentialsFromEnv(object):
63 def http_request(self, uri, **kwargs):
65 from httplib import BadStatusLine
66 if 'headers' not in kwargs:
67 kwargs['headers'] = {}
68 kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
70 return self.orig_http_request(uri, **kwargs)
72 # This is how httplib tells us that it tried to reuse an
73 # existing connection but it was already closed by the
74 # server. In that case, yes, we would like to retry.
75 # Unfortunately, we are not absolutely certain that the
76 # previous call did not succeed, so this is slightly
78 return self.orig_http_request(uri, **kwargs)
79 def authorize(self, http):
80 http.orig_http_request = http.request
81 http.request = types.MethodType(self.http_request, http)
84 def task_set_output(self,s):
85 api('v1').job_tasks().update(uuid=self['uuid'],
97 t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
98 t = UserDict.UserDict(t)
99 t.set_output = types.MethodType(task_set_output, t)
100 t.tmpdir = os.environ['TASK_WORK']
109 t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
110 t = UserDict.UserDict(t)
111 t.tmpdir = os.environ['JOB_WORK']
115 def getjobparam(*args):
116 return current_job()['script_parameters'].get(*args)
118 # Monkey patch discovery._cast() so objects and arrays get serialized
119 # with json.dumps() instead of str().
120 _cast_orig = apiclient.discovery._cast
121 def _cast_objects_too(value, schema_type):
123 if (type(value) != type('') and
124 (schema_type == 'object' or schema_type == 'array')):
125 return json.dumps(value)
127 return _cast_orig(value, schema_type)
128 apiclient.discovery._cast = _cast_objects_too
130 def api(version=None):
131 global services, config
134 config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados')
135 if 'ARVADOS_DEBUG' in config:
136 logging.basicConfig(level=logging.DEBUG)
138 if not services.get(version):
142 logging.info("Using default API version. " +
143 "Call arvados.api('%s') instead." %
145 if 'ARVADOS_API_HOST' not in config:
146 raise Exception("ARVADOS_API_HOST is not set. Aborting.")
147 url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
148 config['ARVADOS_API_HOST'])
149 credentials = CredentialsFromEnv()
151 # Use system's CA certificates (if we find them) instead of httplib2's
152 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
153 if not os.path.exists(ca_certs):
154 ca_certs = None # use httplib2 default
156 http = httplib2.Http(ca_certs=ca_certs)
157 http = credentials.authorize(http)
158 if re.match(r'(?i)^(true|1|yes)$',
159 config.get('ARVADOS_API_HOST_INSECURE', 'no')):
160 http.disable_ssl_certificate_validation=True
161 services[version] = apiclient.discovery.build(
162 'arvados', apiVersion, http=http, discoveryServiceUrl=url)
163 return services[version]
165 class JobTask(object):
166 def __init__(self, parameters=dict(), runtime_constraints=dict()):
167 print "init jobtask %s %s" % (parameters, runtime_constraints)
171 def one_task_per_input_file(if_sequence=0, and_end_task=True):
172 if if_sequence != current_task()['sequence']:
174 job_input = current_job()['script_parameters']['input']
175 cr = CollectionReader(job_input)
176 for s in cr.all_streams():
177 for f in s.all_files():
178 task_input = f.as_manifest()
180 'job_uuid': current_job()['uuid'],
181 'created_by_job_task_uuid': current_task()['uuid'],
182 'sequence': if_sequence + 1,
187 api('v1').job_tasks().create(body=new_task_attrs).execute()
189 api('v1').job_tasks().update(uuid=current_task()['uuid'],
190 body={'success':True}
195 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
196 if if_sequence != current_task()['sequence']:
198 job_input = current_job()['script_parameters']['input']
199 cr = CollectionReader(job_input)
200 for s in cr.all_streams():
201 task_input = s.tokens()
203 'job_uuid': current_job()['uuid'],
204 'created_by_job_task_uuid': current_task()['uuid'],
205 'sequence': if_sequence + 1,
210 api('v1').job_tasks().create(body=new_task_attrs).execute()
212 api('v1').job_tasks().update(uuid=current_task()['uuid'],
213 body={'success':True}
219 def clear_tmpdir(path=None):
221 Ensure the given directory (or TASK_TMPDIR if none given)
225 path = current_task().tmpdir
226 if os.path.exists(path):
227 p = subprocess.Popen(['rm', '-rf', path])
228 stdout, stderr = p.communicate(None)
229 if p.returncode != 0:
230 raise Exception('rm -rf %s: %s' % (path, stderr))
234 def run_command(execargs, **kwargs):
235 kwargs.setdefault('stdin', subprocess.PIPE)
236 kwargs.setdefault('stdout', subprocess.PIPE)
237 kwargs.setdefault('stderr', sys.stderr)
238 kwargs.setdefault('close_fds', True)
239 kwargs.setdefault('shell', False)
240 p = subprocess.Popen(execargs, **kwargs)
241 stdoutdata, stderrdata = p.communicate(None)
242 if p.returncode != 0:
243 raise errors.CommandFailedError(
244 "run_command %s exit %d:\n%s" %
245 (execargs, p.returncode, stderrdata))
246 return stdoutdata, stderrdata
249 def git_checkout(url, version, path):
250 if not re.search('^/', path):
251 path = os.path.join(current_job().tmpdir, path)
252 if not os.path.exists(path):
253 util.run_command(["git", "clone", url, path],
254 cwd=os.path.dirname(path))
255 util.run_command(["git", "checkout", version],
260 def tar_extractor(path, decompress_flag):
261 return subprocess.Popen(["tar",
263 ("-x%sf" % decompress_flag),
266 stdin=subprocess.PIPE, stderr=sys.stderr,
267 shell=False, close_fds=True)
270 def tarball_extract(tarball, path):
271 """Retrieve a tarball from Keep and extract it to a local
272 directory. Return the absolute path where the tarball was
273 extracted. If the top level of the tarball contained just one
274 file or directory, return the absolute path of that single
277 tarball -- collection locator
278 path -- where to extract the tarball: absolute, or relative to job tmp
280 if not re.search('^/', path):
281 path = os.path.join(current_job().tmpdir, path)
282 lockfile = open(path + '.lock', 'w')
283 fcntl.flock(lockfile, fcntl.LOCK_EX)
288 already_have_it = False
290 if os.readlink(os.path.join(path, '.locator')) == tarball:
291 already_have_it = True
294 if not already_have_it:
296 # emulate "rm -f" (i.e., if the file does not exist, we win)
298 os.unlink(os.path.join(path, '.locator'))
300 if os.path.exists(os.path.join(path, '.locator')):
301 os.unlink(os.path.join(path, '.locator'))
303 for f in CollectionReader(tarball).all_files():
304 if re.search('\.(tbz|tar.bz2)$', f.name()):
305 p = util.tar_extractor(path, 'j')
306 elif re.search('\.(tgz|tar.gz)$', f.name()):
307 p = util.tar_extractor(path, 'z')
308 elif re.search('\.tar$', f.name()):
309 p = util.tar_extractor(path, '')
311 raise errors.AssertionError(
312 "tarball_extract cannot handle filename %s" % f.name())
320 if p.returncode != 0:
322 raise errors.CommandFailedError(
323 "tar exited %d" % p.returncode)
324 os.symlink(tarball, os.path.join(path, '.locator'))
325 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
327 if len(tld_extracts) == 1:
328 return os.path.join(path, tld_extracts[0])
332 def zipball_extract(zipball, path):
333 """Retrieve a zip archive from Keep and extract it to a local
334 directory. Return the absolute path where the archive was
335 extracted. If the top level of the archive contained just one
336 file or directory, return the absolute path of that single
339 zipball -- collection locator
340 path -- where to extract the archive: absolute, or relative to job tmp
342 if not re.search('^/', path):
343 path = os.path.join(current_job().tmpdir, path)
344 lockfile = open(path + '.lock', 'w')
345 fcntl.flock(lockfile, fcntl.LOCK_EX)
350 already_have_it = False
352 if os.readlink(os.path.join(path, '.locator')) == zipball:
353 already_have_it = True
356 if not already_have_it:
358 # emulate "rm -f" (i.e., if the file does not exist, we win)
360 os.unlink(os.path.join(path, '.locator'))
362 if os.path.exists(os.path.join(path, '.locator')):
363 os.unlink(os.path.join(path, '.locator'))
365 for f in CollectionReader(zipball).all_files():
366 if not re.search('\.zip$', f.name()):
367 raise errors.NotImplementedError(
368 "zipball_extract cannot handle filename %s" % f.name())
369 zip_filename = os.path.join(path, os.path.basename(f.name()))
370 zip_file = open(zip_filename, 'wb')
378 p = subprocess.Popen(["unzip",
383 stdin=None, stderr=sys.stderr,
384 shell=False, close_fds=True)
386 if p.returncode != 0:
388 raise errors.CommandFailedError(
389 "unzip exited %d" % p.returncode)
390 os.unlink(zip_filename)
391 os.symlink(zipball, os.path.join(path, '.locator'))
392 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
394 if len(tld_extracts) == 1:
395 return os.path.join(path, tld_extracts[0])
399 def collection_extract(collection, path, files=[], decompress=True):
400 """Retrieve a collection from Keep and extract it to a local
401 directory. Return the absolute path where the collection was
404 collection -- collection locator
405 path -- where to extract: absolute, or relative to job tmp
407 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
409 collection_hash = matches.group(1)
411 collection_hash = hashlib.md5(collection).hexdigest()
412 if not re.search('^/', path):
413 path = os.path.join(current_job().tmpdir, path)
414 lockfile = open(path + '.lock', 'w')
415 fcntl.flock(lockfile, fcntl.LOCK_EX)
420 already_have_it = False
422 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
423 already_have_it = True
427 # emulate "rm -f" (i.e., if the file does not exist, we win)
429 os.unlink(os.path.join(path, '.locator'))
431 if os.path.exists(os.path.join(path, '.locator')):
432 os.unlink(os.path.join(path, '.locator'))
435 for s in CollectionReader(collection).all_streams():
436 stream_name = s.name()
437 for f in s.all_files():
439 ((f.name() not in files_got) and
440 (f.name() in files or
441 (decompress and f.decompressed_name() in files)))):
442 outname = f.decompressed_name() if decompress else f.name()
443 files_got += [outname]
444 if os.path.exists(os.path.join(path, stream_name, outname)):
446 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
447 outfile = open(os.path.join(path, stream_name, outname), 'wb')
448 for buf in (f.readall_decompressed() if decompress
452 if len(files_got) < len(files):
453 raise errors.AssertionError(
454 "Wanted files %s but only got %s from %s" %
456 [z.name() for z in CollectionReader(collection).all_files()]))
457 os.symlink(collection_hash, os.path.join(path, '.locator'))
463 def mkdir_dash_p(path):
464 if not os.path.exists(path):
465 util.mkdir_dash_p(os.path.dirname(path))
469 if not os.path.exists(path):
473 def stream_extract(stream, path, files=[], decompress=True):
474 """Retrieve a stream from Keep and extract it to a local
475 directory. Return the absolute path where the stream was
478 stream -- StreamReader object
479 path -- where to extract: absolute, or relative to job tmp
481 if not re.search('^/', path):
482 path = os.path.join(current_job().tmpdir, path)
483 lockfile = open(path + '.lock', 'w')
484 fcntl.flock(lockfile, fcntl.LOCK_EX)
491 for f in stream.all_files():
493 ((f.name() not in files_got) and
494 (f.name() in files or
495 (decompress and f.decompressed_name() in files)))):
496 outname = f.decompressed_name() if decompress else f.name()
497 files_got += [outname]
498 if os.path.exists(os.path.join(path, outname)):
499 os.unlink(os.path.join(path, outname))
500 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
501 outfile = open(os.path.join(path, outname), 'wb')
502 for buf in (f.readall_decompressed() if decompress
506 if len(files_got) < len(files):
507 raise errors.AssertionError(
508 "Wanted files %s but only got %s from %s" %
509 (files, files_got, [z.name() for z in stream.all_files()]))
514 def listdir_recursive(dirname, base=None):
516 for ent in sorted(os.listdir(dirname)):
517 ent_path = os.path.join(dirname, ent)
518 ent_base = os.path.join(base, ent) if base else ent
519 if os.path.isdir(ent_path):
520 allfiles += util.listdir_recursive(ent_path, ent_base)
522 allfiles += [ent_base]