11308: Merge branch 'master' into 11308-python3
[arvados.git] / sdk / python / arvados / __init__.py
1 from __future__ import print_function
2 from __future__ import absolute_import
3 from future import standard_library
4 standard_library.install_aliases()
5 from builtins import object
6 import bz2
7 import fcntl
8 import gflags
9 import hashlib
10 import http.client
11 import httplib2
12 import json
13 import logging
14 import os
15 import pprint
16 import re
17 import string
18 import subprocess
19 import sys
20 import threading
21 import time
22 import types
23 import zlib
24
25 if sys.version_info >= (3, 0):
26     from collections import UserDict
27 else:
28     from UserDict import UserDict
29
30 from .api import api, api_from_config, http_cache
31 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
32 from arvados.keep import *
33 from arvados.stream import *
34 from .arvfile import StreamFileReader
35 from .retry import RetryLoop
36 import arvados.errors as errors
37 import arvados.util as util
38
39 # Set up Arvados logging based on the user's configuration.
40 # All Arvados code should log under the arvados hierarchy.
41 log_handler = logging.StreamHandler()
42 log_handler.setFormatter(logging.Formatter(
43         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
44         '%Y-%m-%d %H:%M:%S'))
45 logger = logging.getLogger('arvados')
46 logger.addHandler(log_handler)
47 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
48                 else logging.WARNING)
49
50 def task_set_output(self, s, num_retries=5):
51     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
52         try:
53             return api('v1').job_tasks().update(
54                 uuid=self['uuid'],
55                 body={
56                     'output':s,
57                     'success':True,
58                     'progress':1.0
59                 }).execute()
60         except errors.ApiError as error:
61             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
62                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
63             else:
64                 raise
65
66 _current_task = None
67 def current_task(num_retries=5):
68     global _current_task
69     if _current_task:
70         return _current_task
71
72     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
73         try:
74             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
75             task = UserDict(task)
76             task.set_output = types.MethodType(task_set_output, task)
77             task.tmpdir = os.environ['TASK_WORK']
78             _current_task = task
79             return task
80         except errors.ApiError as error:
81             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
82                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
83             else:
84                 raise
85
86 _current_job = None
87 def current_job(num_retries=5):
88     global _current_job
89     if _current_job:
90         return _current_job
91
92     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
93         try:
94             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
95             job = UserDict(job)
96             job.tmpdir = os.environ['JOB_WORK']
97             _current_job = job
98             return job
99         except errors.ApiError as error:
100             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
101                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
102             else:
103                 raise
104
105 def getjobparam(*args):
106     return current_job()['script_parameters'].get(*args)
107
108 def get_job_param_mount(*args):
109     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
110
111 def get_task_param_mount(*args):
112     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
113
114 class JobTask(object):
115     def __init__(self, parameters=dict(), runtime_constraints=dict()):
116         print("init jobtask %s %s" % (parameters, runtime_constraints))
117
118 class job_setup(object):
119     @staticmethod
120     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
121         if if_sequence != current_task()['sequence']:
122             return
123
124         if not api_client:
125             api_client = api('v1')
126
127         job_input = current_job()['script_parameters']['input']
128         cr = CollectionReader(job_input, api_client=api_client)
129         cr.normalize()
130         for s in cr.all_streams():
131             for f in s.all_files():
132                 if input_as_path:
133                     task_input = os.path.join(job_input, s.name(), f.name())
134                 else:
135                     task_input = f.as_manifest()
136                 new_task_attrs = {
137                     'job_uuid': current_job()['uuid'],
138                     'created_by_job_task_uuid': current_task()['uuid'],
139                     'sequence': if_sequence + 1,
140                     'parameters': {
141                         'input':task_input
142                         }
143                     }
144                 api_client.job_tasks().create(body=new_task_attrs).execute()
145         if and_end_task:
146             api_client.job_tasks().update(uuid=current_task()['uuid'],
147                                        body={'success':True}
148                                        ).execute()
149             exit(0)
150
151     @staticmethod
152     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
153         if if_sequence != current_task()['sequence']:
154             return
155         job_input = current_job()['script_parameters']['input']
156         cr = CollectionReader(job_input)
157         for s in cr.all_streams():
158             task_input = s.tokens()
159             new_task_attrs = {
160                 'job_uuid': current_job()['uuid'],
161                 'created_by_job_task_uuid': current_task()['uuid'],
162                 'sequence': if_sequence + 1,
163                 'parameters': {
164                     'input':task_input
165                     }
166                 }
167             api('v1').job_tasks().create(body=new_task_attrs).execute()
168         if and_end_task:
169             api('v1').job_tasks().update(uuid=current_task()['uuid'],
170                                        body={'success':True}
171                                        ).execute()
172             exit(0)