Merge branch 'master' into 1977-provenance-report
[arvados.git] / sdk / python / arvados / __init__.py
1 import gflags
2 import httplib
3 import httplib2
4 import logging
5 import os
6 import pprint
7 import sys
8 import types
9 import subprocess
10 import json
11 import UserDict
12 import re
13 import hashlib
14 import string
15 import bz2
16 import zlib
17 import fcntl
18 import time
19 import threading
20
21 from api import *
22 from collection import *
23 from keep import *
24 from stream import *
25 import errors
26 import util
27
28 def task_set_output(self,s):
29     api('v1').job_tasks().update(uuid=self['uuid'],
30                                  body={
31             'output':s,
32             'success':True,
33             'progress':1.0
34             }).execute()
35
36 _current_task = None
37 def current_task():
38     global _current_task
39     if _current_task:
40         return _current_task
41     t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute()
42     t = UserDict.UserDict(t)
43     t.set_output = types.MethodType(task_set_output, t)
44     t.tmpdir = os.environ['TASK_WORK']
45     _current_task = t
46     return t
47
48 _current_job = None
49 def current_job():
50     global _current_job
51     if _current_job:
52         return _current_job
53     t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute()
54     t = UserDict.UserDict(t)
55     t.tmpdir = os.environ['JOB_WORK']
56     _current_job = t
57     return t
58
59 def getjobparam(*args):
60     return current_job()['script_parameters'].get(*args)
61
62 class JobTask(object):
63     def __init__(self, parameters=dict(), runtime_constraints=dict()):
64         print "init jobtask %s %s" % (parameters, runtime_constraints)
65
66 class job_setup:
67     @staticmethod
68     def one_task_per_input_file(if_sequence=0, and_end_task=True):
69         if if_sequence != current_task()['sequence']:
70             return
71         job_input = current_job()['script_parameters']['input']
72         cr = CollectionReader(job_input)
73         for s in cr.all_streams():
74             for f in s.all_files():
75                 task_input = f.as_manifest()
76                 new_task_attrs = {
77                     'job_uuid': current_job()['uuid'],
78                     'created_by_job_task_uuid': current_task()['uuid'],
79                     'sequence': if_sequence + 1,
80                     'parameters': {
81                         'input':task_input
82                         }
83                     }
84                 api('v1').job_tasks().create(body=new_task_attrs).execute()
85         if and_end_task:
86             api('v1').job_tasks().update(uuid=current_task()['uuid'],
87                                        body={'success':True}
88                                        ).execute()
89             exit(0)
90
91     @staticmethod
92     def one_task_per_input_stream(if_sequence=0, and_end_task=True):
93         if if_sequence != current_task()['sequence']:
94             return
95         job_input = current_job()['script_parameters']['input']
96         cr = CollectionReader(job_input)
97         for s in cr.all_streams():
98             task_input = s.tokens()
99             new_task_attrs = {
100                 'job_uuid': current_job()['uuid'],
101                 'created_by_job_task_uuid': current_task()['uuid'],
102                 'sequence': if_sequence + 1,
103                 'parameters': {
104                     'input':task_input
105                     }
106                 }
107             api('v1').job_tasks().create(body=new_task_attrs).execute()
108         if and_end_task:
109             api('v1').job_tasks().update(uuid=current_task()['uuid'],
110                                        body={'success':True}
111                                        ).execute()
112             exit(0)
113
114