Merge branch '13330-collection-save'
[arvados.git] / sdk / python / arvados / __init__.py
1 # Copyright (C) The Arvados Authors. All rights reserved.
2 #
3 # SPDX-License-Identifier: Apache-2.0
4
5 from __future__ import print_function
6 from __future__ import absolute_import
7 from future import standard_library
8 standard_library.install_aliases()
9 from builtins import object
10 import bz2
11 import fcntl
12 import hashlib
13 import http.client
14 import httplib2
15 import json
16 import logging
17 import os
18 import pprint
19 import re
20 import string
21 import subprocess
22 import sys
23 import threading
24 import time
25 import types
26 import zlib
27
28 if sys.version_info >= (3, 0):
29     from collections import UserDict
30 else:
31     from UserDict import UserDict
32
33 from .api import api, api_from_config, http_cache
34 from .collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
35 from arvados.keep import *
36 from arvados.stream import *
37 from .arvfile import StreamFileReader
38 from .retry import RetryLoop
39 import arvados.errors as errors
40 import arvados.util as util
41
42 # Set up Arvados logging based on the user's configuration.
43 # All Arvados code should log under the arvados hierarchy.
44 log_format = '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s'
45 log_date_format = '%Y-%m-%d %H:%M:%S'
46 log_handler = logging.StreamHandler()
47 log_handler.setFormatter(logging.Formatter(log_format, log_date_format))
48 logger = logging.getLogger('arvados')
49 logger.addHandler(log_handler)
50 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
51                 else logging.WARNING)
52
53 def task_set_output(self, s, num_retries=5):
54     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=0):
55         try:
56             return api('v1').job_tasks().update(
57                 uuid=self['uuid'],
58                 body={
59                     'output':s,
60                     'success':True,
61                     'progress':1.0
62                 }).execute()
63         except errors.ApiError as error:
64             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
65                 logger.debug("task_set_output: job_tasks().update() raised {}, retrying with {} tries left".format(repr(error),tries_left))
66             else:
67                 raise
68
69 _current_task = None
70 def current_task(num_retries=5):
71     global _current_task
72     if _current_task:
73         return _current_task
74
75     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
76         try:
77             task = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
78             task = UserDict(task)
79             task.set_output = types.MethodType(task_set_output, task)
80             task.tmpdir = os.environ['TASK_WORK']
81             _current_task = task
82             return task
83         except errors.ApiError as error:
84             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
85                 logger.debug("current_task: job_tasks().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
86             else:
87                 raise
88
89 _current_job = None
90 def current_job(num_retries=5):
91     global _current_job
92     if _current_job:
93         return _current_job
94
95     for tries_left in RetryLoop(num_retries=num_retries, backoff_start=2):
96         try:
97             job = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
98             job = UserDict(job)
99             job.tmpdir = os.environ['JOB_WORK']
100             _current_job = job
101             return job
102         except errors.ApiError as error:
103             if retry.check_http_response_success(error.resp.status) is None and tries_left > 0:
104                 logger.debug("current_job: jobs().get() raised {}, retrying with {} tries left".format(repr(error),tries_left))
105             else:
106                 raise
107
108 def getjobparam(*args):
109     return current_job()['script_parameters'].get(*args)
110
111 def get_job_param_mount(*args):
112     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
113
114 def get_task_param_mount(*args):
115     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
116
117 class JobTask(object):
118     def __init__(self, parameters=dict(), runtime_constraints=dict()):
119         print("init jobtask %s %s" % (parameters, runtime_constraints))
120
121 class job_setup(object):
122     @staticmethod
123     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, api_client=None):
124         if if_sequence != current_task()['sequence']:
125             return
126
127         if not api_client:
128             api_client = api('v1')
129
130         job_input = current_job()['script_parameters']['input']
131         cr = CollectionReader(job_input, api_client=api_client)
132         cr.normalize()
133         for s in cr.all_streams():
134             for f in s.all_files():
135                 if input_as_path:
136                     task_input = os.path.join(job_input, s.name(), f.name())
137                 else:
138                     task_input = f.as_manifest()
139                 new_task_attrs = {
140                     'job_uuid': current_job()['uuid'],
141                     'created_by_job_task_uuid': current_task()['uuid'],
142                     'sequence': if_sequence + 1,
143                     'parameters': {
144                         'input':task_input
145                         }
146                     }
147                 api_client.job_tasks().create(body=new_task_attrs).execute()
148         if and_end_task:
149             api_client.job_tasks().update(uuid=current_task()['uuid'],
150                                        body={'success':True}
151                                        ).execute()
152             exit(0)
153
154     @staticmethod
155     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
156         if if_sequence != current_task()['sequence']:
157             return
158         job_input = current_job()['script_parameters']['input']
159         cr = CollectionReader(job_input)
160         for s in cr.all_streams():
161             task_input = s.tokens()
162             new_task_attrs = {
163                 'job_uuid': current_job()['uuid'],
164                 'created_by_job_task_uuid': current_task()['uuid'],
165                 'sequence': if_sequence + 1,
166                 'parameters': {
167                     'input':task_input
168                     }
169                 }
170             api('v1').job_tasks().create(body=new_task_attrs).execute()
171         if and_end_task:
172             api('v1').job_tasks().update(uuid=current_task()['uuid'],
173                                        body={'success':True}
174                                        ).execute()
175             exit(0)