From 0e21fa5e4df50ca201474fed35c4055945beceaf Mon Sep 17 00:00:00 2001 From: Tom Clegg Date: Thu, 11 Jul 2013 03:43:07 +0000 Subject: [PATCH] add util functions, fix up tmp dirs --- sdk/python/arvados.py | 113 ++++++++++++++++++++++++++++++++++++- services/crunch/crunch-job | 16 ++++-- 2 files changed, 122 insertions(+), 7 deletions(-) diff --git a/sdk/python/arvados.py b/sdk/python/arvados.py index f017b1e06e..a1c7de9984 100644 --- a/sdk/python/arvados.py +++ b/sdk/python/arvados.py @@ -13,6 +13,7 @@ import hashlib import string import bz2 import zlib +import fcntl from apiclient import errors from apiclient.discovery import build @@ -63,7 +64,7 @@ def current_task(): t = service.job_tasks().get(uuid=os.environ['TASK_UUID']).execute() t = UserDict.UserDict(t) t.set_output = types.MethodType(task_set_output, t) - t.tmpdir = os.environ['TASK_TMPDIR'] + t.tmpdir = os.environ['TASK_WORK'] _current_task = t return t @@ -74,7 +75,7 @@ def current_job(): return _current_job t = service.jobs().get(uuid=os.environ['JOB_UUID']).execute() t = UserDict.UserDict(t) - t.tmpdir = os.environ['CRUNCH_WORK'] + t.tmpdir = os.environ['JOB_WORK'] _current_job = t return t @@ -135,6 +136,114 @@ class util: cwd=path) return path + @staticmethod + def tarball_extract(tarball, path): + """Retrieve a tarball from Keep and extract it to a local + directory. Return the absolute path where the tarball was + extracted. If the top level of the tarball contained just one + file or directory, return the absolute path of that single + item. + + tarball -- collection locator + path -- where to extract the tarball: absolute, or relative to job tmp + """ + if not re.search('^/', path): + path = os.path.join(current_job().tmpdir, path) + lockfile = open(path + '.lock', 'w') + fcntl.flock(lockfile, fcntl.LOCK_EX) + try: + os.stat(path) + except OSError: + os.mkdir(path) + already_have_it = False + try: + if os.readlink(os.path.join(path, '.locator')) == tarball: + already_have_it = True + except OSError: + pass + if not already_have_it: + + # emulate "rm -f" (i.e., if the file does not exist, we win) + try: + os.unlink(os.path.join(path, '.locator')) + except OSError: + if os.path.exists(os.path.join(path, '.locator')): + os.unlink(os.path.join(path, '.locator')) + + for f in CollectionReader(tarball).all_files(): + decompress_flag = '' + if re.search('\.(tbz|tar.bz2)$', f.name()): + decompress_flag = 'j' + elif re.search('\.(tgz|tar.gz)$', f.name()): + decompress_flag = 'z' + p = subprocess.Popen(["tar", + "-C", path, + ("-x%sf" % decompress_flag), + "-"], + stdout=None, + stdin=subprocess.PIPE, stderr=sys.stderr, + shell=False, close_fds=True) + while True: + buf = f.read(2**20) + if len(buf) == 0: + break + p.stdin.write(buf) + p.stdin.close() + p.wait() + if p.returncode != 0: + lockfile.close() + raise Exception("tar exited %d" % p.returncode) + os.symlink(tarball, os.path.join(path, '.locator')) + tld_extracts = filter(lambda f: f != '.locator', os.listdir(path)) + lockfile.close() + if len(tld_extracts) == 1: + return os.path.join(path, tld_extracts[0]) + return path + + @staticmethod + def collection_extract(collection, path, files=[]): + """Retrieve a collection from Keep and extract it to a local + directory. Return the absolute path where the collection was + extracted. + + collection -- collection locator + path -- where to extract: absolute, or relative to job tmp + """ + if not re.search('^/', path): + path = os.path.join(current_job().tmpdir, path) + lockfile = open(path + '.lock', 'w') + fcntl.flock(lockfile, fcntl.LOCK_EX) + try: + os.stat(path) + except OSError: + os.mkdir(path) + already_have_it = False + try: + if os.readlink(os.path.join(path, '.locator')) == collection: + already_have_it = True + except OSError: + pass + if not already_have_it: + # emulate "rm -f" (i.e., if the file does not exist, we win) + try: + os.unlink(os.path.join(path, '.locator')) + except OSError: + if os.path.exists(os.path.join(path, '.locator')): + os.unlink(os.path.join(path, '.locator')) + + for f in CollectionReader(collection).all_files(): + if files == [] or f.name() in files: + outfile = open(os.path.join(path, f.name()), 'w') + while True: + buf = f.read(2**20) + if len(buf) == 0: + break + outfile.write(buf) + outfile.close() + os.symlink(collection, os.path.join(path, '.locator')) + lockfile.close() + return path + class DataReader: def __init__(self, data_locator): self.data_locator = data_locator diff --git a/services/crunch/crunch-job b/services/crunch/crunch-job index 4ab942b487..6c0031c617 100755 --- a/services/crunch/crunch-job +++ b/services/crunch/crunch-job @@ -76,8 +76,13 @@ use IPC::System::Simple qw(capturex); $ENV{"TMPDIR"} ||= "/tmp"; $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job"; -$ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; -mkdir ($ENV{"CRUNCH_TMP"}); +if ($ENV{"USER"} ne "crunch" && $< != 0) { + # use a tmp dir unique for my uid + $ENV{"CRUNCH_TMP"} .= "-$<"; +} +$ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; +$ENV{"CRUNCH_TMP"} = $ENV{"JOB_WORK"}; # deprecated +mkdir ($ENV{"JOB_WORK"}); my $force_unlock; my $git_dir; @@ -340,7 +345,7 @@ else if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']); + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']); exit (1); } while (1) @@ -544,7 +549,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; - $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu}; + $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu}; + $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; $ENV{"GZIP"} = "-n"; @@ -558,7 +564,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my @execargs = qw(sh); my $build_script_to_send = ""; my $command = - "mkdir -p $ENV{CRUNCH_WORK} $ENV{CRUNCH_TMP} " + "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} " ."&& cd $ENV{CRUNCH_TMP} "; if ($build_script) { -- 2.30.2