10111: Merge branch 'master' into 10111-cr-provenance-graph
[arvados.git] / sdk / python / arvados / __init__.py
1 import httplib
2 import httplib2
3 import logging
4 import os
5 import pprint
6 import sys
7 import types
8 import subprocess
9 import json
10 import UserDict
11 import re
12 import hashlib
13 import string
14 import bz2
15 import zlib
16 import fcntl
17 import time
18 import threading
19
20 from .api import api, http_cache
21 from collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
22 from keep import *
23 from stream import *
24 from arvfile import StreamFileReader
25 from retry import RetryLoop
26 import errors
27 import util
28
29 # Set up Arvados logging based on the user's configuration.
30 # All Arvados code should log under the arvados hierarchy.
31 log_handler = logging.StreamHandler()
32 log_handler.setFormatter(logging.Formatter(
33         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
34         '%Y-%m-%d %H:%M:%S'))
35 logger = logging.getLogger('arvados')
36 logger.addHandler(log_handler)
37 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
38                 else logging.WARNING)
39
40 def task_set_output(self, s, num_retries=5):
41     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
42         try:
43             return api('v1').job_tasks().update(
44                 uuid=self['uuid'],
45                 body={
46                     'output':s,
47                     'success':True,
48                     'progress':1.0
49                 }).execute()
50         except errors.ApiError as error:
51             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
52                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
53             else:
54                 raise
55
56 _current_task = None
57 def current_task(num_retries=5):
58     global _current_task
59     if _current_task:
60         return _current_task
61
62     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
63         try:
64             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
65             task = UserDict.UserDict(task)
66             task.set_output = types.MethodType(task_set_output, task)
67             task.tmpdir = os.environ['TASK_WORK']
68             _current_task = task
69             return task
70         except errors.ApiError as error:
71             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
72                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
73             else:
74                 raise
75
76 _current_job = None
77 def current_job(num_retries=5):
78     global _current_job
79     if _current_job:
80         return _current_job
81
82     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
83         try:
84             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
85             job = UserDict.UserDict(job)
86             job.tmpdir = os.environ['JOB_WORK']
87             _current_job = job
88             return job
89         except errors.ApiError as error:
90             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
91                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
92             else:
93                 raise
94
95 def getjobparam(*args):
96     return current_job()['script_parameters'].get(*args)
97
98 def get_job_param_mount(*args):
99     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
100
101 def get_task_param_mount(*args):
102     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
103
104 class JobTask(object):
105     def __init__(self, parameters=dict(), runtime_constraints=dict()):
106         print "init jobtask %s %s" % (parameters, runtime_constraints)
107
108 class job_setup:
109     @staticmethod
110     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
111         if if_sequence != current_task()['sequence']:
112             return
113
114         if not api_client:
115             api_client = api('v1')
116
117         job_input = current_job()['script_parameters']['input']
118         cr = CollectionReader(job_input, api_client=api_client)
119         cr.normalize()
120         for s in cr.all_streams():
121             for f in s.all_files():
122                 if input_as_path:
123                     task_input = os.path.join(job_input, s.name(), f.name())
124                 else:
125                     task_input = f.as_manifest()
126                 new_task_attrs = {
127                     'job_uuid': current_job()['uuid'],
128                     'created_by_job_task_uuid': current_task()['uuid'],
129                     'sequence': if_sequence + 1,
130                     'parameters': {
131                         'input':task_input
132                         }
133                     }
134                 api_client.job_tasks().create(body=new_task_attrs).execute()
135         if and_end_task:
136             api_client.job_tasks().update(uuid=current_task()['uuid'],
137                                        body={'success':True}
138                                        ).execute()
139             exit(0)
140
141     @staticmethod
142     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
143         if if_sequence != current_task()['sequence']:
144             return
145         job_input = current_job()['script_parameters']['input']
146         cr = CollectionReader(job_input)
147         for s in cr.all_streams():
148             task_input = s.tokens()
149             new_task_attrs = {
150                 'job_uuid': current_job()['uuid'],
151                 'created_by_job_task_uuid': current_task()['uuid'],
152                 'sequence': if_sequence + 1,
153                 'parameters': {
154                     'input':task_input
155                     }
156                 }
157             api('v1').job_tasks().create(body=new_task_attrs).execute()
158         if and_end_task:
159             api('v1').job_tasks().update(uuid=current_task()['uuid'],
160                                        body={'success':True}
161                                        ).execute()
162             exit(0)