import gflags import httplib import httplib2 import logging import os import pprint import sys import types import subprocess import json import UserDict import re import hashlib import string import bz2 import zlib import fcntl import time import threading EMPTY_BLOCK_LOCATOR = 'd41d8cd98f00b204e9800998ecf8427e+0' from api import * from stream import * from collection import * from keep import * import util class errors: class SyntaxError(Exception): pass class AssertionError(Exception): pass class NotFoundError(Exception): pass class CommandFailedError(Exception): pass class KeepWriteError(Exception): pass class NotImplementedError(Exception): pass def task_set_output(self,s): api('v1').job_tasks().update(uuid=self['uuid'], body={ 'output':s, 'success':True, 'progress':1.0 }).execute() _current_task = None def current_task(): global _current_task if _current_task: return _current_task t = api('v1').job_tasks().get(uuid=os.environ['TASK_UUID']).execute() t = UserDict.UserDict(t) t.set_output = types.MethodType(task_set_output, t) t.tmpdir = os.environ['TASK_WORK'] _current_task = t return t _current_job = None def current_job(): global _current_job if _current_job: return _current_job t = api('v1').jobs().get(uuid=os.environ['JOB_UUID']).execute() t = UserDict.UserDict(t) t.tmpdir = os.environ['JOB_WORK'] _current_job = t return t def getjobparam(*args): return current_job()['script_parameters'].get(*args) class JobTask(object): def __init__(self, parameters=dict(), runtime_constraints=dict()): print "init jobtask %s %s" % (parameters, runtime_constraints) class job_setup: @staticmethod def one_task_per_input_file(if_sequence=0, and_end_task=True): if if_sequence != current_task()['sequence']: return job_input = current_job()['script_parameters']['input'] cr = CollectionReader(job_input) for s in cr.all_streams(): for f in s.all_files(): task_input = f.as_manifest() new_task_attrs = { 'job_uuid': current_job()['uuid'], 'created_by_job_task_uuid': current_task()['uuid'], 'sequence': if_sequence + 1, 'parameters': { 'input':task_input } } api('v1').job_tasks().create(body=new_task_attrs).execute() if and_end_task: api('v1').job_tasks().update(uuid=current_task()['uuid'], body={'success':True} ).execute() exit(0) @staticmethod def one_task_per_input_stream(if_sequence=0, and_end_task=True): if if_sequence != current_task()['sequence']: return job_input = current_job()['script_parameters']['input'] cr = CollectionReader(job_input) for s in cr.all_streams(): task_input = s.tokens() new_task_attrs = { 'job_uuid': current_job()['uuid'], 'created_by_job_task_uuid': current_task()['uuid'], 'sequence': if_sequence + 1, 'parameters': { 'input':task_input } } api('v1').job_tasks().create(body=new_task_attrs).execute() if and_end_task: api('v1').job_tasks().update(uuid=current_task()['uuid'], body={'success':True} ).execute() exit(0)