Merge branch 'main' from arvados-workbench2.git
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados Python SDK
5
6 This module provides the entire Python SDK for Arvados. The most useful modules
7 include:
8
9 * arvados.api - After you `import arvados`, you can call `arvados.api.api` as
10   `arvados.api` to construct a client object.
11
12 * arvados.collection - The `arvados.collection.Collection` class provides a
13   high-level interface to read and write collections. It coordinates sending
14   data to and from Keep, and synchronizing updates with the collection object.
15
16 * arvados.util - Utility functions to use mostly in conjunction with the API
17   client object and the results it returns.
18
19 Other submodules provide lower-level functionality.
20 """
21
22 import logging as stdliblog
23 import os
24 import sys
25 import types
26
27 from collections import UserDict
28
29 from .api import api, api_from_config, http_cache
30 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
31 from arvados.keep import *
32 from arvados.stream import *
33 from .arvfile import StreamFileReader
34 from .logging import log_format, log_date_format, log_handler
35 from .retry import RetryLoop
36 import arvados.errors as errors
37 import arvados.util as util
38
39 # Override logging module pulled in via `from ... import *`
40 # so users can `import arvados.logging`.
41 logging = sys.modules['arvados.logging']
42
43 # Set up Arvados logging based on the user's configuration.
44 # All Arvados code should log under the arvados hierarchy.
45 logger = stdliblog.getLogger('arvados')
46 logger.addHandler(log_handler)
47 logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
48                 else stdliblog.WARNING)
49
50 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
51 def task_set_output(self, s, num_retries=5):
52     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
53         try:
54             return api('v1').job_tasks().update(
55                 uuid=self['uuid'],
56                 body={
57                     'output':s,
58                     'success':True,
59                     'progress':1.0
60                 }).execute()
61         except errors.ApiError as error:
62             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
63                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
64             else:
65                 raise
66
67 _current_task = None
68 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
69 def current_task(num_retries=5):
70     global _current_task
71     if _current_task:
72         return _current_task
73
74     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
75         try:
76             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
77             task = UserDict(task)
78             task.set_output = types.MethodType(task_set_output, task)
79             task.tmpdir = os.environ['TASK_WORK']
80             _current_task = task
81             return task
82         except errors.ApiError as error:
83             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
84                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
85             else:
86                 raise
87
88 _current_job = None
89 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
90 def current_job(num_retries=5):
91     global _current_job
92     if _current_job:
93         return _current_job
94
95     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
96         try:
97             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
98             job = UserDict(job)
99             job.tmpdir = os.environ['JOB_WORK']
100             _current_job = job
101             return job
102         except errors.ApiError as error:
103             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
104                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
105             else:
106                 raise
107
108 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
109 def getjobparam(*args):
110     return current_job()['script_parameters'].get(*args)
111
112 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
113 def get_job_param_mount(*args):
114     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
115
116 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
117 def get_task_param_mount(*args):
118     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
119
120 class JobTask(object):
121     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
122     def __init__(self, parameters=dict(), runtime_constraints=dict()):
123         print("init jobtask %s %s" % (parameters, runtime_constraints))
124
125 class job_setup(object):
126     @staticmethod
127     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
128     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
129         if if_sequence != current_task()['sequence']:
130             return
131
132         if not api_client:
133             api_client = api('v1')
134
135         job_input = current_job()['script_parameters']['input']
136         cr = CollectionReader(job_input, api_client=api_client)
137         cr.normalize()
138         for s in cr.all_streams():
139             for f in s.all_files():
140                 if input_as_path:
141                     task_input = os.path.join(job_input, s.name(), f.name())
142                 else:
143                     task_input = f.as_manifest()
144                 new_task_attrs = {
145                     'job_uuid': current_job()['uuid'],
146                     'created_by_job_task_uuid': current_task()['uuid'],
147                     'sequence': if_sequence + 1,
148                     'parameters': {
149                         'input':task_input
150                         }
151                     }
152                 api_client.job_tasks().create(body=new_task_attrs).execute()
153         if and_end_task:
154             api_client.job_tasks().update(uuid=current_task()['uuid'],
155                                        body={'success':True}
156                                        ).execute()
157             exit(0)
158
159     @staticmethod
160     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
161     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
162         if if_sequence != current_task()['sequence']:
163             return
164         job_input = current_job()['script_parameters']['input']
165         cr = CollectionReader(job_input)
166         for s in cr.all_streams():
167             task_input = s.tokens()
168             new_task_attrs = {
169                 'job_uuid': current_job()['uuid'],
170                 'created_by_job_task_uuid': current_task()['uuid'],
171                 'sequence': if_sequence + 1,
172                 'parameters': {
173                     'input':task_input
174                     }
175                 }
176             api('v1').job_tasks().create(body=new_task_attrs).execute()
177         if and_end_task:
178             api('v1').job_tasks().update(uuid=current_task()['uuid'],
179                                        body={'success':True}
180                                        ).execute()
181             exit(0)