Merge branch '8488-cwl-crunchrunner-collection' closes #8488
[arvados.git] / sdk / python / arvados / __init__.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from .api import api, http_cache
22 from collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
23 from keep import *
24 from stream import *
25 from arvfile import StreamFileReader
26 from retry import RetryLoop
27 import errors
28 import util
29
30 # Set up Arvados logging based on the user's configuration.
31 # All Arvados code should log under the arvados hierarchy.
32 log_handler = logging.StreamHandler()
33 log_handler.setFormatter(logging.Formatter(
34         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
35         '%Y-%m-%d %H:%M:%S'))
36 logger = logging.getLogger('arvados')
37 logger.addHandler(log_handler)
38 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
39                 else logging.WARNING)
40
41 def task_set_output(self, s, num_retries=5):
42     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
43         try:
44             return api('v1').job_tasks().update(
45                 uuid=self['uuid'],
46                 body={
47                     'output':s,
48                     'success':True,
49                     'progress':1.0
50                 }).execute()
51         except errors.ApiError as error:
52             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
53                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
54             else:
55                 raise
56
57 _current_task = None
58 def current_task(num_retries=5):
59     global _current_task
60     if _current_task:
61         return _current_task
62
63     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
64         try:
65             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
66             task = UserDict.UserDict(task)
67             task.set_output = types.MethodType(task_set_output, task)
68             task.tmpdir = os.environ['TASK_WORK']
69             _current_task = task
70             return task
71         except errors.ApiError as error:
72             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
73                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
74             else:
75                 raise
76
77 _current_job = None
78 def current_job(num_retries=5):
79     global _current_job
80     if _current_job:
81         return _current_job
82
83     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
84         try:
85             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
86             job = UserDict.UserDict(job)
87             job.tmpdir = os.environ['JOB_WORK']
88             _current_job = job
89             return job
90         except errors.ApiError as error:
91             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
92                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
93             else:
94                 raise
95
96 def getjobparam(*args):
97     return current_job()['script_parameters'].get(*args)
98
99 def get_job_param_mount(*args):
100     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
101
102 def get_task_param_mount(*args):
103     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
104
105 class JobTask(object):
106     def __init__(self, parameters=dict(), runtime_constraints=dict()):
107         print "init jobtask %s %s" % (parameters, runtime_constraints)
108
109 class job_setup:
110     @staticmethod
111     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
112         if if_sequence != current_task()['sequence']:
113             return
114
115         if not api_client:
116             api_client = api('v1')
117
118         job_input = current_job()['script_parameters']['input']
119         cr = CollectionReader(job_input, api_client=api_client)
120         cr.normalize()
121         for s in cr.all_streams():
122             for f in s.all_files():
123                 if input_as_path:
124                     task_input = os.path.join(job_input, s.name(), f.name())
125                 else:
126                     task_input = f.as_manifest()
127                 new_task_attrs = {
128                     'job_uuid': current_job()['uuid'],
129                     'created_by_job_task_uuid': current_task()['uuid'],
130                     'sequence': if_sequence + 1,
131                     'parameters': {
132                         'input':task_input
133                         }
134                     }
135                 api_client.job_tasks().create(body=new_task_attrs).execute()
136         if and_end_task:
137             api_client.job_tasks().update(uuid=current_task()['uuid'],
138                                        body={'success':True}
139                                        ).execute()
140             exit(0)
141
142     @staticmethod
143     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
144         if if_sequence != current_task()['sequence']:
145             return
146         job_input = current_job()['script_parameters']['input']
147         cr = CollectionReader(job_input)
148         for s in cr.all_streams():
149             task_input = s.tokens()
150             new_task_attrs = {
151                 'job_uuid': current_job()['uuid'],
152                 'created_by_job_task_uuid': current_task()['uuid'],
153                 'sequence': if_sequence + 1,
154                 'parameters': {
155                     'input':task_input
156                     }
157                 }
158             api('v1').job_tasks().create(body=new_task_attrs).execute()
159         if and_end_task:
160             api('v1').job_tasks().update(uuid=current_task()['uuid'],
161                                        body={'success':True}
162                                        ).execute()
163             exit(0)