Merge branch '16982-tilde-devN' refs #16982
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import object
10 import bz2
11 import fcntl
12 import hashlib
13 import http.client
14 import httplib2
15 import json
16 import logging
17 import os
18 import pprint
19 import re
20 import string
21 import sys
22 import time
23 import types
24 import zlib
25
26 if sys.version_info >= (3, 0):
27     from collections import UserDict
28 else:
29     from UserDict import UserDict
30
31 from .api import api, api_from_config, http_cache
32 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
33 from arvados.keep import *
34 from arvados.stream import *
35 from .arvfile import StreamFileReader
36 from .retry import RetryLoop
37 import arvados.errors as errors
38 import arvados.util as util
39
40 # Set up Arvados logging based on the user's configuration.
41 # All Arvados code should log under the arvados hierarchy.
42 log_format = '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s'
43 log_date_format = '%Y-%m-%d %H:%M:%S'
44 log_handler = logging.StreamHandler()
45 log_handler.setFormatter(logging.Formatter(log_format, log_date_format))
46 logger = logging.getLogger('arvados')
47 logger.addHandler(log_handler)
48 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
49                 else logging.WARNING)
50
51 def task_set_output(self, s, num_retries=5):
52     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
53         try:
54             return api('v1').job_tasks().update(
55                 uuid=self['uuid'],
56                 body={
57                     'output':s,
58                     'success':True,
59                     'progress':1.0
60                 }).execute()
61         except errors.ApiError as error:
62             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
63                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
64             else:
65                 raise
66
67 _current_task = None
68 def current_task(num_retries=5):
69     global _current_task
70     if _current_task:
71         return _current_task
72
73     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
74         try:
75             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
76             task = UserDict(task)
77             task.set_output = types.MethodType(task_set_output, task)
78             task.tmpdir = os.environ['TASK_WORK']
79             _current_task = task
80             return task
81         except errors.ApiError as error:
82             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
83                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
84             else:
85                 raise
86
87 _current_job = None
88 def current_job(num_retries=5):
89     global _current_job
90     if _current_job:
91         return _current_job
92
93     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
94         try:
95             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
96             job = UserDict(job)
97             job.tmpdir = os.environ['JOB_WORK']
98             _current_job = job
99             return job
100         except errors.ApiError as error:
101             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
102                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
103             else:
104                 raise
105
106 def getjobparam(*args):
107     return current_job()['script_parameters'].get(*args)
108
109 def get_job_param_mount(*args):
110     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
111
112 def get_task_param_mount(*args):
113     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
114
115 class JobTask(object):
116     def __init__(self, parameters=dict(), runtime_constraints=dict()):
117         print("init jobtask %s %s" % (parameters, runtime_constraints))
118
119 class job_setup(object):
120     @staticmethod
121     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
122         if if_sequence != current_task()['sequence']:
123             return
124
125         if not api_client:
126             api_client = api('v1')
127
128         job_input = current_job()['script_parameters']['input']
129         cr = CollectionReader(job_input, api_client=api_client)
130         cr.normalize()
131         for s in cr.all_streams():
132             for f in s.all_files():
133                 if input_as_path:
134                     task_input = os.path.join(job_input, s.name(), f.name())
135                 else:
136                     task_input = f.as_manifest()
137                 new_task_attrs = {
138                     'job_uuid': current_job()['uuid'],
139                     'created_by_job_task_uuid': current_task()['uuid'],
140                     'sequence': if_sequence + 1,
141                     'parameters': {
142                         'input':task_input
143                         }
144                     }
145                 api_client.job_tasks().create(body=new_task_attrs).execute()
146         if and_end_task:
147             api_client.job_tasks().update(uuid=current_task()['uuid'],
148                                        body={'success':True}
149                                        ).execute()
150             exit(0)
151
152     @staticmethod
153     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
154         if if_sequence != current_task()['sequence']:
155             return
156         job_input = current_job()['script_parameters']['input']
157         cr = CollectionReader(job_input)
158         for s in cr.all_streams():
159             task_input = s.tokens()
160             new_task_attrs = {
161                 'job_uuid': current_job()['uuid'],
162                 'created_by_job_task_uuid': current_task()['uuid'],
163                 'sequence': if_sequence + 1,
164                 'parameters': {
165                     'input':task_input
166                     }
167                 }
168             api('v1').job_tasks().create(body=new_task_attrs).execute()
169         if and_end_task:
170             api('v1').job_tasks().update(uuid=current_task()['uuid'],
171                                        body={'success':True}
172                                        ).execute()
173             exit(0)