18874: Add 'services/workbench2/' from commit 'f6f88d9ca9cdeeeebfadcfe999789bfb9f69e5c6'
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import object
10 import bz2
11 import fcntl
12 import hashlib
13 import http.client
14 import httplib2
15 import json
16 import logging as stdliblog
17 import os
18 import pprint
19 import re
20 import string
21 import sys
22 import time
23 import types
24 import zlib
25
26 if sys.version_info >= (3, 0):
27     from collections import UserDict
28 else:
29     from UserDict import UserDict
30
31 from .api import api, api_from_config, http_cache
32 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
33 from arvados.keep import *
34 from arvados.stream import *
35 from .arvfile import StreamFileReader
36 from .logging import log_format, log_date_format, log_handler
37 from .retry import RetryLoop
38 import arvados.errors as errors
39 import arvados.util as util
40
41 # Override logging module pulled in via `from ... import *`
42 # so users can `import arvados.logging`.
43 logging = sys.modules['arvados.logging']
44
45 # Set up Arvados logging based on the user's configuration.
46 # All Arvados code should log under the arvados hierarchy.
47 logger = stdliblog.getLogger('arvados')
48 logger.addHandler(log_handler)
49 logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
50                 else stdliblog.WARNING)
51
52 def task_set_output(self, s, num_retries=5):
53     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
54         try:
55             return api('v1').job_tasks().update(
56                 uuid=self['uuid'],
57                 body={
58                     'output':s,
59                     'success':True,
60                     'progress':1.0
61                 }).execute()
62         except errors.ApiError as error:
63             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
64                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
65             else:
66                 raise
67
68 _current_task = None
69 def current_task(num_retries=5):
70     global _current_task
71     if _current_task:
72         return _current_task
73
74     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
75         try:
76             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
77             task = UserDict(task)
78             task.set_output = types.MethodType(task_set_output, task)
79             task.tmpdir = os.environ['TASK_WORK']
80             _current_task = task
81             return task
82         except errors.ApiError as error:
83             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
84                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
85             else:
86                 raise
87
88 _current_job = None
89 def current_job(num_retries=5):
90     global _current_job
91     if _current_job:
92         return _current_job
93
94     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
95         try:
96             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
97             job = UserDict(job)
98             job.tmpdir = os.environ['JOB_WORK']
99             _current_job = job
100             return job
101         except errors.ApiError as error:
102             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
103                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
104             else:
105                 raise
106
107 def getjobparam(*args):
108     return current_job()['script_parameters'].get(*args)
109
110 def get_job_param_mount(*args):
111     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
112
113 def get_task_param_mount(*args):
114     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
115
116 class JobTask(object):
117     def __init__(self, parameters=dict(), runtime_constraints=dict()):
118         print("init jobtask %s %s" % (parameters, runtime_constraints))
119
120 class job_setup(object):
121     @staticmethod
122     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
123         if if_sequence != current_task()['sequence']:
124             return
125
126         if not api_client:
127             api_client = api('v1')
128
129         job_input = current_job()['script_parameters']['input']
130         cr = CollectionReader(job_input, api_client=api_client)
131         cr.normalize()
132         for s in cr.all_streams():
133             for f in s.all_files():
134                 if input_as_path:
135                     task_input = os.path.join(job_input, s.name(), f.name())
136                 else:
137                     task_input = f.as_manifest()
138                 new_task_attrs = {
139                     'job_uuid': current_job()['uuid'],
140                     'created_by_job_task_uuid': current_task()['uuid'],
141                     'sequence': if_sequence + 1,
142                     'parameters': {
143                         'input':task_input
144                         }
145                     }
146                 api_client.job_tasks().create(body=new_task_attrs).execute()
147         if and_end_task:
148             api_client.job_tasks().update(uuid=current_task()['uuid'],
149                                        body={'success':True}
150                                        ).execute()
151             exit(0)
152
153     @staticmethod
154     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
155         if if_sequence != current_task()['sequence']:
156             return
157         job_input = current_job()['script_parameters']['input']
158         cr = CollectionReader(job_input)
159         for s in cr.all_streams():
160             task_input = s.tokens()
161             new_task_attrs = {
162                 'job_uuid': current_job()['uuid'],
163                 'created_by_job_task_uuid': current_task()['uuid'],
164                 'sequence': if_sequence + 1,
165                 'parameters': {
166                     'input':task_input
167                     }
168                 }
169             api('v1').job_tasks().create(body=new_task_attrs).execute()
170         if and_end_task:
171             api('v1').job_tasks().update(uuid=current_task()['uuid'],
172                                        body={'success':True}
173                                        ).execute()
174             exit(0)