+def sub_outdir(v):
+ return outdir
+
+def sub_cores(v):
+ return str(multiprocessing.cpu_count())
+
+def sub_jobid(v):
+ return os.environ['JOB_UUID']
+
+def sub_taskid(v):
+ return os.environ['TASK_UUID']
+
+def sub_jobsrc(v):
+ return os.environ['CRUNCH_SRC']
+
+subst.default_subs["task.tmpdir"] = sub_tmpdir
+subst.default_subs["task.outdir"] = sub_outdir
+subst.default_subs["job.srcdir"] = sub_jobsrc
+subst.default_subs["node.cores"] = sub_cores
+subst.default_subs["job.uuid"] = sub_jobid
+subst.default_subs["task.uuid"] = sub_taskid
+
+class SigHandler(object):
+ def __init__(self):
+ self.sig = None
+
+ def send_signal(self, sp, signum):
+ sp.send_signal(signum)
+ self.sig = signum
+
+def expand_item(p, c):
+ if isinstance(c, dict):
+ if "foreach" in c and "command" in c:
+ var = c["foreach"]
+ items = get_items(p, p[var])
+ r = []
+ for i in items:
+ params = copy.copy(p)
+ params[var] = i
+ r.extend(expand_list(params, c["command"]))
+ return r
+ elif isinstance(c, list):
+ return expand_list(p, c)
+ elif isinstance(c, str) or isinstance(c, unicode):
+ return [subst.do_substitution(p, c)]
+
+ return []
+
+def expand_list(p, l):
+ return [exp for arg in l for exp in expand_item(p, arg)]
+
+def get_items(p, value):
+ if isinstance(value, list):
+ return expand_list(p, value)
+
+ fn = subst.do_substitution(p, value)
+ mode = os.stat(fn).st_mode
+ prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
+ if mode is not None:
+ if stat.S_ISDIR(mode):
+ items = ["$(dir %s/%s/)" % (prefix, l) for l in os.listdir(fn)]
+ elif stat.S_ISREG(mode):
+ with open(fn) as f:
+ items = [line for line in f]
+ return items
+ else:
+ return None
+
+stdoutname = None
+stdoutfile = None
+stdinname = None
+stdinfile = None