EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
+services = {}
+
class errors:
class SyntaxError(Exception):
pass
http.request = types.MethodType(self.http_request, http)
return http
-url = ('https://%s:%s/discovery/v1/apis/'
- '{api}/{apiVersion}/rest' %
- (os.environ['ARVADOS_API_HOST'],
- os.environ.get('ARVADOS_API_PORT') or "443"))
-credentials = CredentialsFromEnv()
-
-# Use system's CA certificates (if we find them) instead of httplib2's
-ca_certs = '/etc/ssl/certs/ca-certificates.crt'
-if not os.path.exists(ca_certs):
- ca_certs = None # use httplib2 default
-
-http = httplib2.Http(ca_certs=ca_certs)
-http = credentials.authorize(http)
-if re.match(r'(?i)^(true|1|yes)$',
- os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
- http.disable_ssl_certificate_validation=True
-service = build("arvados", "v1", http=http, discoveryServiceUrl=url)
-
def task_set_output(self,s):
- service.job_tasks().update(uuid=self['uuid'],
- body={
+ api('v1').job_tasks().update(uuid=self['uuid'],
+ body={
'output':s,
'success':True,
'progress':1.0
global _current_task
if _current_task:
return _current_task
- t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
+ t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
t = UserDict.UserDict(t)
t.set_output = types.MethodType(task_set_output, t)
t.tmpdir = os.environ['TASK_WORK']
global _current_job
if _current_job:
return _current_job
- t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute()
+ t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
t = UserDict.UserDict(t)
t.tmpdir = os.environ['JOB_WORK']
_current_job = t
def getjobparam(*args):
return current_job()['script_parameters'].get(*args)
-def api():
- return service
+def api(version=None):
+ global services
+ if not services.get(version):
+ apiVersion = version
+ if not version:
+ apiVersion = 'v1'
+ logging.info("Using default API version. " +
+ "Call arvados.api('%s') instead." %
+ apiVersion)
+ if 'ARVADOS_API_HOST' not in os.environ:
+ raise Exception("ARVADOS_API_HOST is not set. Aborting.")
+ url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
+ os.environ['ARVADOS_API_HOST'])
+ credentials = CredentialsFromEnv()
+
+ # Use system's CA certificates (if we find them) instead of httplib2's
+ ca_certs = '/etc/ssl/certs/ca-certificates.crt'
+ if not os.path.exists(ca_certs):
+ ca_certs = None # use httplib2 default
+
+ http = httplib2.Http(ca_certs=ca_certs)
+ http = credentials.authorize(http)
+ if re.match(r'(?i)^(true|1|yes)$',
+ os.environ.get('ARVADOS_API_HOST_INSECURE', '')):
+ http.disable_ssl_certificate_validation=True
+ services[version] = build(
+ 'arvados', apiVersion, http=http, discoveryServiceUrl=url)
+ return services[version]
class JobTask(object):
def __init__(self, parameters=dict(), runtime_constraints=dict()):
'input':task_input
}
}
- service.job_tasks().create(body=new_task_attrs).execute()
+ api('v1').job_tasks().create(body=new_task_attrs).execute()
if and_end_task:
- service.job_tasks().update(uuid=current_task()['uuid'],
+ api('v1').job_tasks().update(uuid=current_task()['uuid'],
body={'success':True}
).execute()
exit(0)
'input':task_input
}
}
- service.job_tasks().create(body=new_task_attrs).execute()
+ api('v1').job_tasks().create(body=new_task_attrs).execute()
if and_end_task:
- service.job_tasks().update(uuid=current_task()['uuid'],
+ api('v1').job_tasks().update(uuid=current_task()['uuid'],
body={'success':True}
).execute()
exit(0)
return ''
with open(os.path.join(os.environ['KEEP_LOCAL_STORE'], r.group(0)), 'r') as f:
return f.read()
+
+# We really shouldn't do this but some clients still use
+# arvados.service.* directly instead of arvados.api().*
+service = api()