Merge branch '11647-no-perm-tokens'
[arvados.git] / sdk / python / arvados / __init__.py
1 from __future__ import print_function
2 from __future__ import absolute_import
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import object
6 import bz2
7 import fcntl
8 import hashlib
9 import http.client
10 import httplib2
11 import json
12 import logging
13 import os
14 import pprint
15 import re
16 import string
17 import subprocess
18 import sys
19 import threading
20 import time
21 import types
22 import zlib
23
24 if sys.version_info >= (3, 0):
25     from collections import UserDict
26 else:
27     from UserDict import UserDict
28
29 from .api import api, api_from_config, http_cache
30 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
31 from arvados.keep import *
32 from arvados.stream import *
33 from .arvfile import StreamFileReader
34 from .retry import RetryLoop
35 import arvados.errors as errors
36 import arvados.util as util
37
38 # Set up Arvados logging based on the user's configuration.
39 # All Arvados code should log under the arvados hierarchy.
40 log_handler = logging.StreamHandler()
41 log_handler.setFormatter(logging.Formatter(
42         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
43         '%Y-%m-%d %H:%M:%S'))
44 logger = logging.getLogger('arvados')
45 logger.addHandler(log_handler)
46 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
47                 else logging.WARNING)
48
49 def task_set_output(self, s, num_retries=5):
50     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
51         try:
52             return api('v1').job_tasks().update(
53                 uuid=self['uuid'],
54                 body={
55                     'output':s,
56                     'success':True,
57                     'progress':1.0
58                 }).execute()
59         except errors.ApiError as error:
60             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
61                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
62             else:
63                 raise
64
65 _current_task = None
66 def current_task(num_retries=5):
67     global _current_task
68     if _current_task:
69         return _current_task
70
71     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
72         try:
73             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
74             task = UserDict(task)
75             task.set_output = types.MethodType(task_set_output, task)
76             task.tmpdir = os.environ['TASK_WORK']
77             _current_task = task
78             return task
79         except errors.ApiError as error:
80             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
81                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
82             else:
83                 raise
84
85 _current_job = None
86 def current_job(num_retries=5):
87     global _current_job
88     if _current_job:
89         return _current_job
90
91     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
92         try:
93             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
94             job = UserDict(job)
95             job.tmpdir = os.environ['JOB_WORK']
96             _current_job = job
97             return job
98         except errors.ApiError as error:
99             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
100                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
101             else:
102                 raise
103
104 def getjobparam(*args):
105     return current_job()['script_parameters'].get(*args)
106
107 def get_job_param_mount(*args):
108     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
109
110 def get_task_param_mount(*args):
111     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
112
113 class JobTask(object):
114     def __init__(self, parameters=dict(), runtime_constraints=dict()):
115         print("init jobtask %s %s" % (parameters, runtime_constraints))
116
117 class job_setup(object):
118     @staticmethod
119     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
120         if if_sequence != current_task()['sequence']:
121             return
122
123         if not api_client:
124             api_client = api('v1')
125
126         job_input = current_job()['script_parameters']['input']
127         cr = CollectionReader(job_input, api_client=api_client)
128         cr.normalize()
129         for s in cr.all_streams():
130             for f in s.all_files():
131                 if input_as_path:
132                     task_input = os.path.join(job_input, s.name(), f.name())
133                 else:
134                     task_input = f.as_manifest()
135                 new_task_attrs = {
136                     'job_uuid': current_job()['uuid'],
137                     'created_by_job_task_uuid': current_task()['uuid'],
138                     'sequence': if_sequence + 1,
139                     'parameters': {
140                         'input':task_input
141                         }
142                     }
143                 api_client.job_tasks().create(body=new_task_attrs).execute()
144         if and_end_task:
145             api_client.job_tasks().update(uuid=current_task()['uuid'],
146                                        body={'success':True}
147                                        ).execute()
148             exit(0)
149
150     @staticmethod
151     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
152         if if_sequence != current_task()['sequence']:
153             return
154         job_input = current_job()['script_parameters']['input']
155         cr = CollectionReader(job_input)
156         for s in cr.all_streams():
157             task_input = s.tokens()
158             new_task_attrs = {
159                 'job_uuid': current_job()['uuid'],
160                 'created_by_job_task_uuid': current_task()['uuid'],
161                 'sequence': if_sequence + 1,
162                 'parameters': {
163                     'input':task_input
164                     }
165                 }
166             api('v1').job_tasks().create(body=new_task_attrs).execute()
167         if and_end_task:
168             api('v1').job_tasks().update(uuid=current_task()['uuid'],
169                                        body={'success':True}
170                                        ).execute()
171             exit(0)