3551: Merge branch 'master' into 3551-go-layout
[arvados.git] / sdk / python / arvados / __init__.py
index e57b7e627d0db378e838c587d24ccfe4ad388fd3..060ed95d959531917749584176c5f42b8c453f66 100644 (file)
@@ -18,69 +18,23 @@ import fcntl
 import time
 import threading
 
-import apiclient
-import apiclient.discovery
-
-config = None
-EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0'
-services = {}
-
-from stream import *
+from api import *
 from collection import *
 from keep import *
-
-
-# Arvados configuration settings are taken from $HOME/.config/arvados.
-# Environment variables override settings in the config file.
-#
-class ArvadosConfig(dict):
-    def __init__(self, config_file):
-        dict.__init__(self)
-        if os.path.exists(config_file):
-            with open(config_file, "r") as f:
-                for config_line in f:
-                    var, val = config_line.rstrip().split('=', 2)
-                    self[var] = val
-        for var in os.environ:
-            if var.startswith('ARVADOS_'):
-                self[var] = os.environ[var]
-
-class errors:
-    class SyntaxError(Exception):
-        pass
-    class AssertionError(Exception):
-        pass
-    class NotFoundError(Exception):
-        pass
-    class CommandFailedError(Exception):
-        pass
-    class KeepWriteError(Exception):
-        pass
-    class NotImplementedError(Exception):
-        pass
-
-class CredentialsFromEnv(object):
-    @staticmethod
-    def http_request(self, uri, **kwargs):
-        global config
-        from httplib import BadStatusLine
-        if 'headers' not in kwargs:
-            kwargs['headers'] = {}
-        kwargs['headers']['Authorization'] = 'OAuth2 %s' % config.get('ARVADOS_API_TOKEN', 'ARVADOS_API_TOKEN_not_set')
-        try:
-            return self.orig_http_request(uri, **kwargs)
-        except BadStatusLine:
-            # This is how httplib tells us that it tried to reuse an
-            # existing connection but it was already closed by the
-            # server. In that case, yes, we would like to retry.
-            # Unfortunately, we are not absolutely certain that the
-            # previous call did not succeed, so this is slightly
-            # risky.
-            return self.orig_http_request(uri, **kwargs)
-    def authorize(self, http):
-        http.orig_http_request = http.request
-        http.request = types.MethodType(self.http_request, http)
-        return http
+from stream import *
+import errors
+import util
+
+# Set up Arvados logging based on the user's configuration.
+# All Arvados code should log under the arvados hierarchy.
+log_handler = logging.StreamHandler()
+log_handler.setFormatter(logging.Formatter(
+        '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
+        '%Y-%m-%d %H:%M:%S'))
+logger = logging.getLogger('arvados')
+logger.addHandler(log_handler)
+logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
+                else logging.WARNING)
 
 def task_set_output(self,s):
     api('v1').job_tasks().update(uuid=self['uuid'],
@@ -116,61 +70,11 @@ def current_job():
 def getjobparam(*args):
     return current_job()['script_parameters'].get(*args)
 
-# Monkey patch discovery._cast() so objects and arrays get serialized
-# with json.dumps() instead of str().
-_cast_orig = apiclient.discovery._cast
-def _cast_objects_too(value, schema_type):
-    global _cast_orig
-    if (type(value) != type('') and
-        (schema_type == 'object' or schema_type == 'array')):
-        return json.dumps(value)
-    else:
-        return _cast_orig(value, schema_type)
-apiclient.discovery._cast = _cast_objects_too
-
-def http_cache(data_type):
-    path = os.environ['HOME'] + '/.cache/arvados/' + data_type
-    try:
-        util.mkdir_dash_p(path)
-    except OSError:
-        path = None
-    return path
-
-def api(version=None):
-    global services, config
-
-    if not config:
-        config = ArvadosConfig(os.environ['HOME'] + '/.config/arvados/settings.conf')
-        if 'ARVADOS_DEBUG' in config:
-            logging.basicConfig(level=logging.DEBUG)
+def get_job_param_mount(*args):
+    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
 
-    if not services.get(version):
-        apiVersion = version
-        if not version:
-            apiVersion = 'v1'
-            logging.info("Using default API version. " +
-                         "Call arvados.api('%s') instead." %
-                         apiVersion)
-        if 'ARVADOS_API_HOST' not in config:
-            raise Exception("ARVADOS_API_HOST is not set. Aborting.")
-        url = ('https://%s/discovery/v1/apis/{api}/{apiVersion}/rest' %
-               config['ARVADOS_API_HOST'])
-        credentials = CredentialsFromEnv()
-
-        # Use system's CA certificates (if we find them) instead of httplib2's
-        ca_certs = '/etc/ssl/certs/ca-certificates.crt'
-        if not os.path.exists(ca_certs):
-            ca_certs = None             # use httplib2 default
-
-        http = httplib2.Http(ca_certs=ca_certs,
-                             cache=http_cache('discovery'))
-        http = credentials.authorize(http)
-        if re.match(r'(?i)^(true|1|yes)$',
-                    config.get('ARVADOS_API_HOST_INSECURE', 'no')):
-            http.disable_ssl_certificate_validation=True
-        services[version] = apiclient.discovery.build(
-            'arvados', apiVersion, http=http, discoveryServiceUrl=url)
-    return services[version]
+def get_task_param_mount(*args):
+    return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
 
 class JobTask(object):
     def __init__(self, parameters=dict(), runtime_constraints=dict()):
@@ -178,14 +82,17 @@ class JobTask(object):
 
 class job_setup:
     @staticmethod
-    def one_task_per_input_file(if_sequence=0, and_end_task=True):
+    def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False):
         if if_sequence != current_task()['sequence']:
             return
         job_input = current_job()['script_parameters']['input']
         cr = CollectionReader(job_input)
         for s in cr.all_streams():
             for f in s.all_files():
-                task_input = f.as_manifest()
+                if input_as_path:
+                    task_input = os.path.join(job_input, s.name(), f.name())
+                else:
+                    task_input = f.as_manifest()
                 new_task_attrs = {
                     'job_uuid': current_job()['uuid'],
                     'created_by_job_task_uuid': current_task()['uuid'],
@@ -224,311 +131,4 @@ class job_setup:
                                        ).execute()
             exit(0)
 
-class util:
-    @staticmethod
-    def clear_tmpdir(path=None):
-        """
-        Ensure the given directory (or TASK_TMPDIR if none given)
-        exists and is empty.
-        """
-        if path == None:
-            path = current_task().tmpdir
-        if os.path.exists(path):
-            p = subprocess.Popen(['rm', '-rf', path])
-            stdout, stderr = p.communicate(None)
-            if p.returncode != 0:
-                raise Exception('rm -rf %s: %s' % (path, stderr))
-        os.mkdir(path)
-
-    @staticmethod
-    def run_command(execargs, **kwargs):
-        kwargs.setdefault('stdin', subprocess.PIPE)
-        kwargs.setdefault('stdout', subprocess.PIPE)
-        kwargs.setdefault('stderr', sys.stderr)
-        kwargs.setdefault('close_fds', True)
-        kwargs.setdefault('shell', False)
-        p = subprocess.Popen(execargs, **kwargs)
-        stdoutdata, stderrdata = p.communicate(None)
-        if p.returncode != 0:
-            raise errors.CommandFailedError(
-                "run_command %s exit %d:\n%s" %
-                (execargs, p.returncode, stderrdata))
-        return stdoutdata, stderrdata
-
-    @staticmethod
-    def git_checkout(url, version, path):
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        if not os.path.exists(path):
-            util.run_command(["git", "clone", url, path],
-                             cwd=os.path.dirname(path))
-        util.run_command(["git", "checkout", version],
-                         cwd=path)
-        return path
-
-    @staticmethod
-    def tar_extractor(path, decompress_flag):
-        return subprocess.Popen(["tar",
-                                 "-C", path,
-                                 ("-x%sf" % decompress_flag),
-                                 "-"],
-                                stdout=None,
-                                stdin=subprocess.PIPE, stderr=sys.stderr,
-                                shell=False, close_fds=True)
-
-    @staticmethod
-    def tarball_extract(tarball, path):
-        """Retrieve a tarball from Keep and extract it to a local
-        directory.  Return the absolute path where the tarball was
-        extracted. If the top level of the tarball contained just one
-        file or directory, return the absolute path of that single
-        item.
-
-        tarball -- collection locator
-        path -- where to extract the tarball: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == tarball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-
-            for f in CollectionReader(tarball).all_files():
-                if re.search('\.(tbz|tar.bz2)$', f.name()):
-                    p = util.tar_extractor(path, 'j')
-                elif re.search('\.(tgz|tar.gz)$', f.name()):
-                    p = util.tar_extractor(path, 'z')
-                elif re.search('\.tar$', f.name()):
-                    p = util.tar_extractor(path, '')
-                else:
-                    raise errors.AssertionError(
-                        "tarball_extract cannot handle filename %s" % f.name())
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    p.stdin.write(buf)
-                p.stdin.close()
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "tar exited %d" % p.returncode)
-            os.symlink(tarball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-
-    @staticmethod
-    def zipball_extract(zipball, path):
-        """Retrieve a zip archive from Keep and extract it to a local
-        directory.  Return the absolute path where the archive was
-        extracted. If the top level of the archive contained just one
-        file or directory, return the absolute path of that single
-        item.
-
-        zipball -- collection locator
-        path -- where to extract the archive: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == zipball:
-                already_have_it = True
-        except OSError:
-            pass
-        if not already_have_it:
-
-            # emulate "rm -f" (i.e., if the file does not exist, we win)
-            try:
-                os.unlink(os.path.join(path, '.locator'))
-            except OSError:
-                if os.path.exists(os.path.join(path, '.locator')):
-                    os.unlink(os.path.join(path, '.locator'))
-
-            for f in CollectionReader(zipball).all_files():
-                if not re.search('\.zip$', f.name()):
-                    raise errors.NotImplementedError(
-                        "zipball_extract cannot handle filename %s" % f.name())
-                zip_filename = os.path.join(path, os.path.basename(f.name()))
-                zip_file = open(zip_filename, 'wb')
-                while True:
-                    buf = f.read(2**20)
-                    if len(buf) == 0:
-                        break
-                    zip_file.write(buf)
-                zip_file.close()
-                
-                p = subprocess.Popen(["unzip",
-                                      "-q", "-o",
-                                      "-d", path,
-                                      zip_filename],
-                                     stdout=None,
-                                     stdin=None, stderr=sys.stderr,
-                                     shell=False, close_fds=True)
-                p.wait()
-                if p.returncode != 0:
-                    lockfile.close()
-                    raise errors.CommandFailedError(
-                        "unzip exited %d" % p.returncode)
-                os.unlink(zip_filename)
-            os.symlink(zipball, os.path.join(path, '.locator'))
-        tld_extracts = filter(lambda f: f != '.locator', os.listdir(path))
-        lockfile.close()
-        if len(tld_extracts) == 1:
-            return os.path.join(path, tld_extracts[0])
-        return path
-
-    @staticmethod
-    def collection_extract(collection, path, files=[], decompress=True):
-        """Retrieve a collection from Keep and extract it to a local
-        directory.  Return the absolute path where the collection was
-        extracted.
-
-        collection -- collection locator
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        matches = re.search(r'^([0-9a-f]+)(\+[\w@]+)*$', collection)
-        if matches:
-            collection_hash = matches.group(1)
-        else:
-            collection_hash = hashlib.md5(collection).hexdigest()
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-        already_have_it = False
-        try:
-            if os.readlink(os.path.join(path, '.locator')) == collection_hash:
-                already_have_it = True
-        except OSError:
-            pass
-
-        # emulate "rm -f" (i.e., if the file does not exist, we win)
-        try:
-            os.unlink(os.path.join(path, '.locator'))
-        except OSError:
-            if os.path.exists(os.path.join(path, '.locator')):
-                os.unlink(os.path.join(path, '.locator'))
-
-        files_got = []
-        for s in CollectionReader(collection).all_streams():
-            stream_name = s.name()
-            for f in s.all_files():
-                if (files == [] or
-                    ((f.name() not in files_got) and
-                     (f.name() in files or
-                      (decompress and f.decompressed_name() in files)))):
-                    outname = f.decompressed_name() if decompress else f.name()
-                    files_got += [outname]
-                    if os.path.exists(os.path.join(path, stream_name, outname)):
-                        continue
-                    util.mkdir_dash_p(os.path.dirname(os.path.join(path, stream_name, outname)))
-                    outfile = open(os.path.join(path, stream_name, outname), 'wb')
-                    for buf in (f.readall_decompressed() if decompress
-                                else f.readall()):
-                        outfile.write(buf)
-                    outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got,
-                 [z.name() for z in CollectionReader(collection).all_files()]))
-        os.symlink(collection_hash, os.path.join(path, '.locator'))
-
-        lockfile.close()
-        return path
-
-    @staticmethod
-    def mkdir_dash_p(path):
-        if not os.path.exists(path):
-            util.mkdir_dash_p(os.path.dirname(path))
-            try:
-                os.mkdir(path)
-            except OSError:
-                if not os.path.exists(path):
-                    os.mkdir(path)
-
-    @staticmethod
-    def stream_extract(stream, path, files=[], decompress=True):
-        """Retrieve a stream from Keep and extract it to a local
-        directory.  Return the absolute path where the stream was
-        extracted.
-
-        stream -- StreamReader object
-        path -- where to extract: absolute, or relative to job tmp
-        """
-        if not re.search('^/', path):
-            path = os.path.join(current_job().tmpdir, path)
-        lockfile = open(path + '.lock', 'w')
-        fcntl.flock(lockfile, fcntl.LOCK_EX)
-        try:
-            os.stat(path)
-        except OSError:
-            os.mkdir(path)
-
-        files_got = []
-        for f in stream.all_files():
-            if (files == [] or
-                ((f.name() not in files_got) and
-                 (f.name() in files or
-                  (decompress and f.decompressed_name() in files)))):
-                outname = f.decompressed_name() if decompress else f.name()
-                files_got += [outname]
-                if os.path.exists(os.path.join(path, outname)):
-                    os.unlink(os.path.join(path, outname))
-                util.mkdir_dash_p(os.path.dirname(os.path.join(path, outname)))
-                outfile = open(os.path.join(path, outname), 'wb')
-                for buf in (f.readall_decompressed() if decompress
-                            else f.readall()):
-                    outfile.write(buf)
-                outfile.close()
-        if len(files_got) < len(files):
-            raise errors.AssertionError(
-                "Wanted files %s but only got %s from %s" %
-                (files, files_got, [z.name() for z in stream.all_files()]))
-        lockfile.close()
-        return path
-
-    @staticmethod
-    def listdir_recursive(dirname, base=None):
-        allfiles = []
-        for ent in sorted(os.listdir(dirname)):
-            ent_path = os.path.join(dirname, ent)
-            ent_base = os.path.join(base, ent) if base else ent
-            if os.path.isdir(ent_path):
-                allfiles += util.listdir_recursive(ent_path, ent_base)
-            else:
-                allfiles += [ent_base]
-        return allfiles