4823: Add arvapi parameter to one_task_per_input_file() to solve mocking
[arvados.git] / sdk / python / arvados / __init__.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from .api import api, http_cache
22 from collection import CollectionReader, CollectionWriter, ResumableCollectionWriter
23 from keep import *
24 from stream import *
25 from arvfile import StreamFileReader
26 import errors
27 import util
28
29 # Set up Arvados logging based on the user's configuration.
30 # All Arvados code should log under the arvados hierarchy.
31 log_handler = logging.StreamHandler()
32 log_handler.setFormatter(logging.Formatter(
33         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
34         '%Y-%m-%d %H:%M:%S'))
35 logger = logging.getLogger('arvados')
36 logger.addHandler(log_handler)
37 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
38                 else logging.WARNING)
39
40 def task_set_output(self,s):
41     api('v1').job_tasks().update(uuid=self['uuid'],
42                                  body={
43             'output':s,
44             'success':True,
45             'progress':1.0
46             }).execute()
47
48 _current_task = None
49 def current_task():
50     global _current_task
51     if _current_task:
52         return _current_task
53     t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
54     t = UserDict.UserDict(t)
55     t.set_output = types.MethodType(task_set_output, t)
56     t.tmpdir = os.environ['TASK_WORK']
57     _current_task = t
58     return t
59
60 _current_job = None
61 def current_job():
62     global _current_job
63     if _current_job:
64         return _current_job
65     t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
66     t = UserDict.UserDict(t)
67     t.tmpdir = os.environ['JOB_WORK']
68     _current_job = t
69     return t
70
71 def getjobparam(*args):
72     return current_job()['script_parameters'].get(*args)
73
74 def get_job_param_mount(*args):
75     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
76
77 def get_task_param_mount(*args):
78     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
79
80 class JobTask(object):
81     def __init__(self, parameters=dict(), runtime_constraints=dict()):
82         print "init jobtask %s %s" % (parameters, runtime_constraints)
83
84 class job_setup:
85     @staticmethod
86     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False, arvapi=None):
87         if if_sequence != current_task()['sequence']:
88             return
89
90         if not arvapi:
91             arvapi = api('v1')
92
93         job_input = current_job()['script_parameters']['input']
94         cr = CollectionReader(job_input, api_client=arvapi)
95         cr.normalize()
96         for s in cr.all_streams():
97             for f in s.all_files():
98                 if input_as_path:
99                     task_input = os.path.join(job_input, s.name(), f.name())
100                 else:
101                     task_input = f.as_manifest()
102                 new_task_attrs = {
103                     'job_uuid': current_job()['uuid'],
104                     'created_by_job_task_uuid': current_task()['uuid'],
105                     'sequence': if_sequence + 1,
106                     'parameters': {
107                         'input':task_input
108                         }
109                     }
110                 arvapi.job_tasks().create(body=new_task_attrs).execute()
111         if and_end_task:
112             arvapi.job_tasks().update(uuid=current_task()['uuid'],
113                                        body={'success':True}
114                                        ).execute()
115             exit(0)
116
117     @staticmethod
118     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
119         if if_sequence != current_task()['sequence']:
120             return
121         job_input = current_job()['script_parameters']['input']
122         cr = CollectionReader(job_input)
123         for s in cr.all_streams():
124             task_input = s.tokens()
125             new_task_attrs = {
126                 'job_uuid': current_job()['uuid'],
127                 'created_by_job_task_uuid': current_task()['uuid'],
128                 'sequence': if_sequence + 1,
129                 'parameters': {
130                     'input':task_input
131                     }
132                 }
133             api('v1').job_tasks().create(body=new_task_attrs).execute()
134         if and_end_task:
135             api('v1').job_tasks().update(uuid=current_task()['uuid'],
136                                        body={'success':True}
137                                        ).execute()
138             exit(0)