5016: PollClient.run_forever() polls self.stop() in order to have a chance to process...
[arvados.git] / sdk / python / arvados / __init__.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from api import *
22 from collection import *
23 from keep import *
24 from stream import *
25 import errors
26 import util
27
28 # Set up Arvados logging based on the user's configuration.
29 # All Arvados code should log under the arvados hierarchy.
30 log_handler = logging.StreamHandler()
31 log_handler.setFormatter(logging.Formatter(
32         '%(asctime)s %(name)s[%(process)d] %(levelname)s: %(message)s',
33         '%Y-%m-%d %H:%M:%S'))
34 logger = logging.getLogger('arvados')
35 logger.addHandler(log_handler)
36 logger.setLevel(logging.DEBUG if config.get('ARVADOS_DEBUG')
37                 else logging.WARNING)
38
39 def task_set_output(self,s):
40     api('v1').job_tasks().update(uuid=self['uuid'],
41                                  body={
42             'output':s,
43             'success':True,
44             'progress':1.0
45             }).execute()
46
47 _current_task = None
48 def current_task():
49     global _current_task
50     if _current_task:
51         return _current_task
52     t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
53     t = UserDict.UserDict(t)
54     t.set_output = types.MethodType(task_set_output, t)
55     t.tmpdir = os.environ['TASK_WORK']
56     _current_task = t
57     return t
58
59 _current_job = None
60 def current_job():
61     global _current_job
62     if _current_job:
63         return _current_job
64     t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
65     t = UserDict.UserDict(t)
66     t.tmpdir = os.environ['JOB_WORK']
67     _current_job = t
68     return t
69
70 def getjobparam(*args):
71     return current_job()['script_parameters'].get(*args)
72
73 def get_job_param_mount(*args):
74     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_job()['script_parameters'].get(*args))
75
76 def get_task_param_mount(*args):
77     return os.path.join(os.environ['TASK_KEEPMOUNT'], current_task()['parameters'].get(*args))
78
79 class JobTask(object):
80     def __init__(self, parameters=dict(), runtime_constraints=dict()):
81         print "init jobtask %s %s" % (parameters, runtime_constraints)
82
83 class job_setup:
84     @staticmethod
85     def one_task_per_input_file(if_sequence=0, and_end_task=True, input_as_path=False):
86         if if_sequence != current_task()['sequence']:
87             return
88         job_input = current_job()['script_parameters']['input']
89         cr = CollectionReader(job_input)
90         cr.normalize()
91         for s in cr.all_streams():
92             for f in s.all_files():
93                 if input_as_path:
94                     task_input = os.path.join(job_input, s.name(), f.name())
95                 else:
96                     task_input = f.as_manifest()
97                 new_task_attrs = {
98                     'job_uuid': current_job()['uuid'],
99                     'created_by_job_task_uuid': current_task()['uuid'],
100                     'sequence': if_sequence + 1,
101                     'parameters': {
102                         'input':task_input
103                         }
104                     }
105                 api('v1').job_tasks().create(body=new_task_attrs).execute()
106         if and_end_task:
107             api('v1').job_tasks().update(uuid=current_task()['uuid'],
108                                        body={'success':True}
109                                        ).execute()
110             exit(0)
111
112     @staticmethod
113     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
114         if if_sequence != current_task()['sequence']:
115             return
116         job_input = current_job()['script_parameters']['input']
117         cr = CollectionReader(job_input)
118         for s in cr.all_streams():
119             task_input = s.tokens()
120             new_task_attrs = {
121                 'job_uuid': current_job()['uuid'],
122                 'created_by_job_task_uuid': current_task()['uuid'],
123                 'sequence': if_sequence + 1,
124                 'parameters': {
125                     'input':task_input
126                     }
127                 }
128             api('v1').job_tasks().create(body=new_task_attrs).execute()
129         if and_end_task:
130             api('v1').job_tasks().update(uuid=current_task()['uuid'],
131                                        body={'success':True}
132                                        ).execute()
133             exit(0)
134
135