Merge branch '21126-trash-when-ro'
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import object
10 import bz2
11 import fcntl
12 import hashlib
13 import http.client
14 import httplib2
15 import json
16 import logging as stdliblog
17 import os
18 import pprint
19 import re
20 import string
21 import sys
22 import time
23 import types
24 import zlib
25
26 if sys.version_info >= (3, 0):
27     from collections import UserDict
28 else:
29     from UserDict import UserDict
30
31 from .api import api, api_from_config, http_cache
32 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
33 from arvados.keep import *
34 from arvados.stream import *
35 from .arvfile import StreamFileReader
36 from .logging import log_format, log_date_format, log_handler
37 from .retry import RetryLoop
38 import arvados.errors as errors
39 import arvados.util as util
40
41 # Override logging module pulled in via `from ... import *`
42 # so users can `import arvados.logging`.
43 logging = sys.modules['arvados.logging']
44
45 # Set up Arvados logging based on the user's configuration.
46 # All Arvados code should log under the arvados hierarchy.
47 logger = stdliblog.getLogger('arvados')
48 logger.addHandler(log_handler)
49 logger.setLevel(stdliblog.DEBUG if config.get('ARVADOS_DEBUG')
50                 else stdliblog.WARNING)
51
52 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
53 def task_set_output(self, s, num_retries=5):
54     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
55         try:
56             return api('v1').job_tasks().update(
57                 uuid=self['uuid'],
58                 body={
59                     'output':s,
60                     'success':True,
61                     'progress':1.0
62                 }).execute()
63         except errors.ApiError as error:
64             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
65                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
66             else:
67                 raise
68
69 _current_task = None
70 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
71 def current_task(num_retries=5):
72     global _current_task
73     if _current_task:
74         return _current_task
75
76     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
77         try:
78             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
79             task = UserDict(task)
80             task.set_output = types.MethodType(task_set_output, task)
81             task.tmpdir = os.environ['TASK_WORK']
82             _current_task = task
83             return task
84         except errors.ApiError as error:
85             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
86                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
87             else:
88                 raise
89
90 _current_job = None
91 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
92 def current_job(num_retries=5):
93     global _current_job
94     if _current_job:
95         return _current_job
96
97     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
98         try:
99             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
100             job = UserDict(job)
101             job.tmpdir = os.environ['JOB_WORK']
102             _current_job = job
103             return job
104         except errors.ApiError as error:
105             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
106                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
107             else:
108                 raise
109
110 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
111 def getjobparam(*args):
112     return current_job()['script_parameters'].get(*args)
113
114 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
115 def get_job_param_mount(*args):
116     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
117
118 @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
119 def get_task_param_mount(*args):
120     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
121
122 class JobTask(object):
123     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
124     def __init__(self, parameters=dict(), runtime_constraints=dict()):
125         print("init jobtask %s %s" % (parameters, runtime_constraints))
126
127 class job_setup(object):
128     @staticmethod
129     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
130     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
131         if if_sequence != current_task()['sequence']:
132             return
133
134         if not api_client:
135             api_client = api('v1')
136
137         job_input = current_job()['script_parameters']['input']
138         cr = CollectionReader(job_input, api_client=api_client)
139         cr.normalize()
140         for s in cr.all_streams():
141             for f in s.all_files():
142                 if input_as_path:
143                     task_input = os.path.join(job_input, s.name(), f.name())
144                 else:
145                     task_input = f.as_manifest()
146                 new_task_attrs = {
147                     'job_uuid': current_job()['uuid'],
148                     'created_by_job_task_uuid': current_task()['uuid'],
149                     'sequence': if_sequence + 1,
150                     'parameters': {
151                         'input':task_input
152                         }
153                     }
154                 api_client.job_tasks().create(body=new_task_attrs).execute()
155         if and_end_task:
156             api_client.job_tasks().update(uuid=current_task()['uuid'],
157                                        body={'success':True}
158                                        ).execute()
159             exit(0)
160
161     @staticmethod
162     @util._deprecated('3.0', 'arvados-cwl-runner or the containers API')
163     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
164         if if_sequence != current_task()['sequence']:
165             return
166         job_input = current_job()['script_parameters']['input']
167         cr = CollectionReader(job_input)
168         for s in cr.all_streams():
169             task_input = s.tokens()
170             new_task_attrs = {
171                 'job_uuid': current_job()['uuid'],
172                 'created_by_job_task_uuid': current_task()['uuid'],
173                 'sequence': if_sequence + 1,
174                 'parameters': {
175                     'input':task_input
176                     }
177                 }
178             api('v1').job_tasks().create(body=new_task_attrs).execute()
179         if and_end_task:
180             api('v1').job_tasks().update(uuid=current_task()['uuid'],
181                                        body={'success':True}
182                                        ).execute()
183             exit(0)