22 import apiclient.discovery
25 EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
29 from collection import *
33 # Arvados configuration settings are taken from $HOME/.config/arvados.
34 # Environment variables override settings in the config file.
36 class ArvadosConfig(dict):
37 def __init__(self, config_file):
39 if os.path.exists(config_file):
40 with open(config_file, "r") as f:
42 var, val = config_line.rstrip().split('=', 2)
44 for var in os.environ:
45 if var.startswith('ARVADOS_'):
46 self[var] = os.environ[var]
49 class SyntaxError(Exception):
51 class AssertionError(Exception):
53 class NotFoundError(Exception):
55 class CommandFailedError(Exception):
57 class KeepWriteError(Exception):
59 class NotImplementedError(Exception):
62 class CredentialsFromEnv(object):
64 def http_request(self, uri, **kwargs):
66 from httplib import BadStatusLine
67 if 'headers' not in kwargs:
68 kwargs['headers'] = {}
69 kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
71 return self.orig_http_request(uri, **kwargs)
73 # This is how httplib tells us that it tried to reuse an
74 # existing connection but it was already closed by the
75 # server. In that case, yes, we would like to retry.
76 # Unfortunately, we are not absolutely certain that the
77 # previous call did not succeed, so this is slightly
79 return self.orig_http_request(uri, **kwargs)
80 def authorize(self, http):
81 http.orig_http_request = http.request
82 http.request = types.MethodType(self.http_request, http)
85 def task_set_output(self,s):
86 api('v1').job_tasks().update(uuid=self['uuid'],
98 t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
99 t = UserDict.UserDict(t)
100 t.set_output = types.MethodType(task_set_output, t)
101 t.tmpdir = os.environ['TASK_WORK']
110 t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
111 t = UserDict.UserDict(t)
112 t.tmpdir = os.environ['JOB_WORK']
116 def getjobparam(*args):
117 return current_job()['script_parameters'].get(*args)
119 # Monkey patch discovery._cast() so objects and arrays get serialized
120 # with json.dumps() instead of str().
121 _cast_orig = apiclient.discovery._cast
122 def _cast_objects_too(value, schema_type):
124 if (type(value) != type('') and
125 (schema_type == 'object' or schema_type == 'array')):
126 return json.dumps(value)
128 return _cast_orig(value, schema_type)
129 apiclient.discovery._cast = _cast_objects_too
131 def api(version=None):
132 global services, config
135 config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados')
136 if 'ARVADOS_DEBUG' in config:
137 logging.basicConfig(level=logging.DEBUG)
139 if not services.get(version):
143 logging.info("Using default API version. " +
144 "Call arvados.api('%s') instead." %
146 if 'ARVADOS_API_HOST' not in config:
147 raise Exception("ARVADOS_API_HOST is not set. Aborting.")
148 url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
149 config['ARVADOS_API_HOST'])
150 credentials = CredentialsFromEnv()
152 # Use system's CA certificates (if we find them) instead of httplib2's
153 ca_certs = '/etc/ssl/certs/ca-certificates.crt'
154 if not os.path.exists(ca_certs):
155 ca_certs = None # use httplib2 default
157 http = httplib2.Http(ca_certs=ca_certs)
158 http = credentials.authorize(http)
159 if re.match(r'(?i)^(true|1|yes)$',
160 config.get('ARVADOS_API_HOST_INSECURE', 'no')):
161 http.disable_ssl_certificate_validation=True
162 services[version] = apiclient.discovery.build(
163 'arvados', apiVersion, http=http, discoveryServiceUrl=url)
164 return services[version]
166 class JobTask(object):
167 def __init__(self, parameters=dict(), runtime_constraints=dict()):
168 print "init jobtask %s %s" % (parameters, runtime_constraints)
172 def one_task_per_input_file(if_sequence=0, and_end_task=True):
173 if if_sequence != current_task()['sequence']:
175 job_input = current_job()['script_parameters']['input']
176 cr = CollectionReader(job_input)
177 for s in cr.all_streams():
178 for f in s.all_files():
179 task_input = f.as_manifest()
181 'job_uuid': current_job()['uuid'],
182 'created_by_job_task_uuid': current_task()['uuid'],
183 'sequence': if_sequence + 1,
188 api('v1').job_tasks().create(body=new_task_attrs).execute()
190 api('v1').job_tasks().update(uuid=current_task()['uuid'],
191 body={'success':True}
196 def one_task_per_input_stream(if_sequence=0, and_end_task=True):
197 if if_sequence != current_task()['sequence']:
199 job_input = current_job()['script_parameters']['input']
200 cr = CollectionReader(job_input)
201 for s in cr.all_streams():
202 task_input = s.tokens()
204 'job_uuid': current_job()['uuid'],
205 'created_by_job_task_uuid': current_task()['uuid'],
206 'sequence': if_sequence + 1,
211 api('v1').job_tasks().create(body=new_task_attrs).execute()
213 api('v1').job_tasks().update(uuid=current_task()['uuid'],
214 body={'success':True}
220 def clear_tmpdir(path=None):
222 Ensure the given directory (or TASK_TMPDIR if none given)
226 path = current_task().tmpdir
227 if os.path.exists(path):
228 p = subprocess.Popen(['rm', '-rf', path])
229 stdout, stderr = p.communicate(None)
230 if p.returncode != 0:
231 raise Exception('rm -rf %s: %s' % (path, stderr))
235 def run_command(execargs, **kwargs):
236 kwargs.setdefault('stdin', subprocess.PIPE)
237 kwargs.setdefault('stdout', subprocess.PIPE)
238 kwargs.setdefault('stderr', sys.stderr)
239 kwargs.setdefault('close_fds', True)
240 kwargs.setdefault('shell', False)
241 p = subprocess.Popen(execargs, **kwargs)
242 stdoutdata, stderrdata = p.communicate(None)
243 if p.returncode != 0:
244 raise errors.CommandFailedError(
245 "run_command %s exit %d:\n%s" %
246 (execargs, p.returncode, stderrdata))
247 return stdoutdata, stderrdata
250 def git_checkout(url, version, path):
251 if not re.search('^/', path):
252 path = os.path.join(current_job().tmpdir, path)
253 if not os.path.exists(path):
254 util.run_command(["git", "clone", url, path],
255 cwd=os.path.dirname(path))
256 util.run_command(["git", "checkout", version],
261 def tar_extractor(path, decompress_flag):
262 return subprocess.Popen(["tar",
264 ("-x%sf" % decompress_flag),
267 stdin=subprocess.PIPE, stderr=sys.stderr,
268 shell=False, close_fds=True)
271 def tarball_extract(tarball, path):
272 """Retrieve a tarball from Keep and extract it to a local
273 directory. Return the absolute path where the tarball was
274 extracted. If the top level of the tarball contained just one
275 file or directory, return the absolute path of that single
278 tarball -- collection locator
279 path -- where to extract the tarball: absolute, or relative to job tmp
281 if not re.search('^/', path):
282 path = os.path.join(current_job().tmpdir, path)
283 lockfile = open(path + '.lock', 'w')
284 fcntl.flock(lockfile, fcntl.LOCK_EX)
289 already_have_it = False
291 if os.readlink(os.path.join(path, '.locator')) == tarball:
292 already_have_it = True
295 if not already_have_it:
297 # emulate "rm -f" (i.e., if the file does not exist, we win)
299 os.unlink(os.path.join(path, '.locator'))
301 if os.path.exists(os.path.join(path, '.locator')):
302 os.unlink(os.path.join(path, '.locator'))
304 for f in CollectionReader(tarball).all_files():
305 if re.search('\.(tbz|tar.bz2)$', f.name()):
306 p = util.tar_extractor(path, 'j')
307 elif re.search('\.(tgz|tar.gz)$', f.name()):
308 p = util.tar_extractor(path, 'z')
309 elif re.search('\.tar$', f.name()):
310 p = util.tar_extractor(path, '')
312 raise errors.AssertionError(
313 "tarball_extract cannot handle filename %s" % f.name())
321 if p.returncode != 0:
323 raise errors.CommandFailedError(
324 "tar exited %d" % p.returncode)
325 os.symlink(tarball, os.path.join(path, '.locator'))
326 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
328 if len(tld_extracts) == 1:
329 return os.path.join(path, tld_extracts[0])
333 def zipball_extract(zipball, path):
334 """Retrieve a zip archive from Keep and extract it to a local
335 directory. Return the absolute path where the archive was
336 extracted. If the top level of the archive contained just one
337 file or directory, return the absolute path of that single
340 zipball -- collection locator
341 path -- where to extract the archive: absolute, or relative to job tmp
343 if not re.search('^/', path):
344 path = os.path.join(current_job().tmpdir, path)
345 lockfile = open(path + '.lock', 'w')
346 fcntl.flock(lockfile, fcntl.LOCK_EX)
351 already_have_it = False
353 if os.readlink(os.path.join(path, '.locator')) == zipball:
354 already_have_it = True
357 if not already_have_it:
359 # emulate "rm -f" (i.e., if the file does not exist, we win)
361 os.unlink(os.path.join(path, '.locator'))
363 if os.path.exists(os.path.join(path, '.locator')):
364 os.unlink(os.path.join(path, '.locator'))
366 for f in CollectionReader(zipball).all_files():
367 if not re.search('\.zip$', f.name()):
368 raise errors.NotImplementedError(
369 "zipball_extract cannot handle filename %s" % f.name())
370 zip_filename = os.path.join(path, os.path.basename(f.name()))
371 zip_file = open(zip_filename, 'wb')
379 p = subprocess.Popen(["unzip",
384 stdin=None, stderr=sys.stderr,
385 shell=False, close_fds=True)
387 if p.returncode != 0:
389 raise errors.CommandFailedError(
390 "unzip exited %d" % p.returncode)
391 os.unlink(zip_filename)
392 os.symlink(zipball, os.path.join(path, '.locator'))
393 tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
395 if len(tld_extracts) == 1:
396 return os.path.join(path, tld_extracts[0])
400 def collection_extract(collection, path, files=[], decompress=True):
401 """Retrieve a collection from Keep and extract it to a local
402 directory. Return the absolute path where the collection was
405 collection -- collection locator
406 path -- where to extract: absolute, or relative to job tmp
408 matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
410 collection_hash = matches.group(1)
412 collection_hash = hashlib.md5(collection).hexdigest()
413 if not re.search('^/', path):
414 path = os.path.join(current_job().tmpdir, path)
415 lockfile = open(path + '.lock', 'w')
416 fcntl.flock(lockfile, fcntl.LOCK_EX)
421 already_have_it = False
423 if os.readlink(os.path.join(path, '.locator')) == collection_hash:
424 already_have_it = True
428 # emulate "rm -f" (i.e., if the file does not exist, we win)
430 os.unlink(os.path.join(path, '.locator'))
432 if os.path.exists(os.path.join(path, '.locator')):
433 os.unlink(os.path.join(path, '.locator'))
436 for s in CollectionReader(collection).all_streams():
437 stream_name = s.name()
438 for f in s.all_files():
440 ((f.name() not in files_got) and
441 (f.name() in files or
442 (decompress and f.decompressed_name() in files)))):
443 outname = f.decompressed_name() if decompress else f.name()
444 files_got += [outname]
445 if os.path.exists(os.path.join(path, stream_name, outname)):
447 util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
448 outfile = open(os.path.join(path, stream_name, outname), 'wb')
449 for buf in (f.readall_decompressed() if decompress
453 if len(files_got) < len(files):
454 raise errors.AssertionError(
455 "Wanted files %s but only got %s from %s" %
457 [z.name() for z in CollectionReader(collection).all_files()]))
458 os.symlink(collection_hash, os.path.join(path, '.locator'))
464 def mkdir_dash_p(path):
465 if not os.path.exists(path):
466 util.mkdir_dash_p(os.path.dirname(path))
470 if not os.path.exists(path):
474 def stream_extract(stream, path, files=[], decompress=True):
475 """Retrieve a stream from Keep and extract it to a local
476 directory. Return the absolute path where the stream was
479 stream -- StreamReader object
480 path -- where to extract: absolute, or relative to job tmp
482 if not re.search('^/', path):
483 path = os.path.join(current_job().tmpdir, path)
484 lockfile = open(path + '.lock', 'w')
485 fcntl.flock(lockfile, fcntl.LOCK_EX)
492 for f in stream.all_files():
494 ((f.name() not in files_got) and
495 (f.name() in files or
496 (decompress and f.decompressed_name() in files)))):
497 outname = f.decompressed_name() if decompress else f.name()
498 files_got += [outname]
499 if os.path.exists(os.path.join(path, outname)):
500 os.unlink(os.path.join(path, outname))
501 util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
502 outfile = open(os.path.join(path, outname), 'wb')
503 for buf in (f.readall_decompressed() if decompress
507 if len(files_got) < len(files):
508 raise errors.AssertionError(
509 "Wanted files %s but only got %s from %s" %
510 (files, files_got, [z.name() for z in stream.all_files()]))
515 def listdir_recursive(dirname, base=None):
517 for ent in sorted(os.listdir(dirname)):
518 ent_path = os.path.join(dirname, ent)
519 ent_base = os.path.join(base, ent) if base else ent
520 if os.path.isdir(ent_path):
521 allfiles += util.listdir_recursive(ent_path, ent_base)
523 allfiles += [ent_base]