projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Disable chunking
[arvados.git]
/
crunch_scripts
/
run-command
diff --git
a/crunch_scripts/run-command
b/crunch_scripts/run-command
index 4708bd8aba235a32c95c464407266ce1f271c231..d9f575ba25875ba5f86377c254ff5af757dc21a0 100755
(executable)
--- a/
crunch_scripts/run-command
+++ b/
crunch_scripts/run-command
@@
-13,6
+13,8
@@
import signal
import stat
import copy
import traceback
import stat
import copy
import traceback
+import pprint
+import multiprocessing
os.umask(0077)
os.umask(0077)
@@
-26,6
+28,8
@@
os.mkdir("output")
os.chdir("output")
os.chdir("output")
+outdir = os.getcwd()
+
taskp = None
jobp = arvados.current_job()['script_parameters']
if len(arvados.current_task()['parameters']) > 0:
taskp = None
jobp = arvados.current_job()['script_parameters']
if len(arvados.current_task()['parameters']) > 0:
@@
-34,16
+38,19
@@
if len(arvados.current_task()['parameters']) > 0:
links = []
def sub_link(v):
links = []
def sub_link(v):
- r = os.path.
basename(v
)
- os.symlink(
os.path.join(os.environ['TASK_KEEPMOUNT'], v)
, r)
+ r = os.path.
join(outdir, os.path.basename(v)
)
+ os.symlink(
v
, r)
links.append(r)
return r
def sub_tmpdir(v):
return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
links.append(r)
return r
def sub_tmpdir(v):
return os.path.join(arvados.current_task().tmpdir, 'tmpdir')
+def sub_outdir(v):
+ return outdir
+
def sub_cores(v):
def sub_cores(v):
- return
os.environ['CRUNCH_NODE_SLOTS']
+ return
str(multiprocessing.cpu_count())
def sub_jobid(v):
return os.environ['JOB_UUID']
def sub_jobid(v):
return os.environ['JOB_UUID']
@@
-51,11
+58,16
@@
def sub_jobid(v):
def sub_taskid(v):
return os.environ['TASK_UUID']
def sub_taskid(v):
return os.environ['TASK_UUID']
+def sub_jobsrc(v):
+ return os.environ['CRUNCH_SRC']
+
subst.default_subs["link "] = sub_link
subst.default_subs["task.tmpdir"] = sub_tmpdir
subst.default_subs["link "] = sub_link
subst.default_subs["task.tmpdir"] = sub_tmpdir
+subst.default_subs["task.outdir"] = sub_outdir
+subst.default_subs["job.srcdir"] = sub_jobsrc
subst.default_subs["node.cores"] = sub_cores
subst.default_subs["node.cores"] = sub_cores
-subst.default_subs["job.id"] = sub_jobid
-subst.default_subs["task.id"] = sub_taskid
+subst.default_subs["job.
uu
id"] = sub_jobid
+subst.default_subs["task.
uu
id"] = sub_taskid
rcode = 1
rcode = 1
@@
-84,7
+96,7
@@
def expand_item(p, c):
return r
elif isinstance(c, list):
return expand_list(p, c)
return r
elif isinstance(c, list):
return expand_list(p, c)
- elif isinstance(c, str):
+ elif isinstance(c, str)
or isinstance(c, unicode)
:
return [subst.do_substitution(p, c)]
return []
return [subst.do_substitution(p, c)]
return []
@@
-101,7
+113,7
@@
def get_items(p, value):
prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
if mode != None:
if stat.S_ISDIR(mode):
prefix = fn[len(os.environ['TASK_KEEPMOUNT'])+1:]
if mode != None:
if stat.S_ISDIR(mode):
- items = ["$(dir %s/%s)" % (prefix, l) for l in os.listdir(fn)]
+ items = ["$(dir %s/%s
/
)" % (prefix, l) for l in os.listdir(fn)]
elif stat.S_ISREG(mode):
with open(fn) as f:
items = [line for line in f]
elif stat.S_ISREG(mode):
with open(fn) as f:
items = [line for line in f]
@@
-147,7
+159,8
@@
try:
except Exception as e:
print("run-command: caught exception:")
traceback.print_exc(file=sys.stdout)
except Exception as e:
print("run-command: caught exception:")
traceback.print_exc(file=sys.stdout)
- print("run-command: parameters is %s" % p)
+ print("run-command: task parameters was:")
+ pprint.pprint(p)
sys.exit(1)
try:
sys.exit(1)
try: