Merge branch '21639-prefetch' refs #21639
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 """Arvados Python SDK
5
6 This module provides the entire Python SDK for Arvados. The most useful modules
7 include:
8
9 * arvados.api - After you `import arvados`, you can call `arvados.api` as a
10   shortcut to the client constructor function `arvados.api.api`.
11
12 * arvados.collection - The `arvados.collection.Collection` class provides a
13   high-level interface to read and write collections. It coordinates sending
14   data to and from Keep, and synchronizing updates with the collection object.
15
16 * arvados.util - Utility functions to use mostly in conjunction with the API
17   client object and the results it returns.
18
19 Other submodules provide lower-level functionality.
20 """
21
22 import logging as stdliblog
23 import os
24 import sys
25 import types
26
27 from collections import UserDict
28
29 from . import api, errors, util
30 from .api import api_from_config, http_cache
31 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
32 from arvados.keep import *
33 from arvados.stream import *
34 from .arvfile import StreamFileReader
35 from .logging import log_format, log_date_format, log_handler
36 from .retry import RetryLoop
37
38 # Previous versions of the PySDK used to say `from .api import api`.  This
39 # made it convenient to call the API client constructor, but difficult to
40 # access the rest of the `arvados.api` module. The magic below fixes that
41 # bug while retaining backwards compatibility: `arvados.api` is now the
42 # module and you can import it normally, but we make that module callable so
43 # all the existing code that says `arvados.api('v1', ...)` still works.
44 class _CallableAPIModule(api.__class__):
45     __call__ = staticmethod(api.api)
46 api.__class__ = _CallableAPIModule
47
48 # Override logging module pulled in via `from ... import *`
49 # so users can `import arvados.logging`.
50 logging = sys.modules['arvados.logging']
51
52 # Set up Arvados logging based on the user's configuration.
53 # All Arvados code should log under the arvados hierarchy.
54 logger = stdliblog.getLogger('arvados')
55 logger.addHandler(log_handler)
56 logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
57                 else stdliblog.WARNING)
58
59 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
60 def task_set_output(self, s, num_retries=5):
61     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
62         try:
63             return api('v1').job_tasks().update(
64                 uuid=self['uuid'],
65                 body={
66                     'output':s,
67                     'success':True,
68                     'progress':1.0
69                 }).execute()
70         except errors.ApiError as error:
71             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
72                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
73             else:
74                 raise
75
76 _current_task = None
77 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
78 def current_task(num_retries=5):
79     global _current_task
80     if _current_task:
81         return _current_task
82
83     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
84         try:
85             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
86             task = UserDict(task)
87             task.set_output = types.MethodType(task_set_output, task)
88             task.tmpdir = os.environ['TASK_WORK']
89             _current_task = task
90             return task
91         except errors.ApiError as error:
92             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
93                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
94             else:
95                 raise
96
97 _current_job = None
98 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
99 def current_job(num_retries=5):
100     global _current_job
101     if _current_job:
102         return _current_job
103
104     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
105         try:
106             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
107             job = UserDict(job)
108             job.tmpdir = os.environ['JOB_WORK']
109             _current_job = job
110             return job
111         except errors.ApiError as error:
112             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
113                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
114             else:
115                 raise
116
117 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
118 def getjobparam(*args):
119     return current_job()['script_parameters'].get(*args)
120
121 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
122 def get_job_param_mount(*args):
123     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
124
125 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
126 def get_task_param_mount(*args):
127     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
128
129 class JobTask(object):
130     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
131     def __init__(self, parameters=dict(), runtime_constraints=dict()):
132         print("init jobtask %s %s" % (parameters, runtime_constraints))
133
134 class job_setup(object):
135     @staticmethod
136     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
137     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
138         if if_sequence != current_task()['sequence']:
139             return
140
141         if not api_client:
142             api_client = api('v1')
143
144         job_input = current_job()['script_parameters']['input']
145         cr = CollectionReader(job_input, api_client=api_client)
146         cr.normalize()
147         for s in cr.all_streams():
148             for f in s.all_files():
149                 if input_as_path:
150                     task_input = os.path.join(job_input, s.name(), f.name())
151                 else:
152                     task_input = f.as_manifest()
153                 new_task_attrs = {
154                     'job_uuid': current_job()['uuid'],
155                     'created_by_job_task_uuid': current_task()['uuid'],
156                     'sequence': if_sequence + 1,
157                     'parameters': {
158                         'input':task_input
159                         }
160                     }
161                 api_client.job_tasks().create(body=new_task_attrs).execute()
162         if and_end_task:
163             api_client.job_tasks().update(uuid=current_task()['uuid'],
164                                        body={'success':True}
165                                        ).execute()
166             exit(0)
167
168     @staticmethod
169     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
170     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
171         if if_sequence != current_task()['sequence']:
172             return
173         job_input = current_job()['script_parameters']['input']
174         cr = CollectionReader(job_input)
175         for s in cr.all_streams():
176             task_input = s.tokens()
177             new_task_attrs = {
178                 'job_uuid': current_job()['uuid'],
179                 'created_by_job_task_uuid': current_task()['uuid'],
180                 'sequence': if_sequence + 1,
181                 'parameters': {
182                     'input':task_input
183                     }
184                 }
185             api('v1').job_tasks().create(body=new_task_attrs).execute()
186         if and_end_task:
187             api('v1').job_tasks().update(uuid=current_task()['uuid'],
188                                        body={'success':True}
189                                        ).execute()
190             exit(0)