#!/usr/bin/perl # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME crunch-job: Execute job steps, save snapshots as requested, collate output. =head1 SYNOPSIS Obtain job details from Arvados, run tasks on compute nodes (typically invoked by scheduler on controller): crunch-job --job x-y-z --git-dir /path/to/repo/.git Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}' crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}' =head1 OPTIONS =over =item --force-unlock If the job is already locked, steal the lock and run it anyway. =item --git-dir Path to a .git directory (or a git URL) where the commit given in the job's C attribute is to be found. If this is I given, the job's C attribute will be used. =item --job-api-token Arvados API authorization token to use during the course of the job. =item --no-clear-tmp Do not clear per-job/task temporary directories during initial job setup. This can speed up development and debugging when running jobs locally. =item --job UUID of the job to run, or a JSON-encoded job resource without a UUID. If the latter is given, a new job object will be created. =back =head1 RUNNING JOBS LOCALLY crunch-job's log messages appear on stderr along with the job tasks' stderr streams. The log is saved in Keep at each checkpoint and when the job finishes. If the job succeeds, the job's output locator is printed on stdout. While the job is running, the following signals are accepted: =over =item control-C, SIGINT, SIGQUIT Save a checkpoint, terminate any job tasks that are running, and stop. =item SIGALRM Save a checkpoint and continue. =item SIGHUP Refresh node allocation (i.e., check whether any nodes have been added or unallocated) and attributes of the Job record that should affect behavior (e.g., cancel job if cancelled_at becomes non-nil). =back =cut use strict; use POSIX ':sys_wait_h'; use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; use Data::Dumper; use Digest::MD5 qw(md5_hex); use Getopt::Long; use IPC::Open2; use IO::Select; use File::Temp; use Fcntl ':flock'; use File::Path qw( make_path remove_tree ); use constant EX_TEMPFAIL => 75; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job"; if ($ENV{"USER"} ne "crunch" && $< != 0) { # use a tmp dir unique for my uid $ENV{"CRUNCH_TMP"} .= "-$<"; } } # Create the tmp directory if it does not exist if ( ! -d $ENV{"CRUNCH_TMP"} ) { make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"}; } $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); my $force_unlock; my $git_dir; my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, ); if (defined $job_api_token) { $ENV{ARVADOS_API_TOKEN} = $job_api_token; } my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST}; my $local_job = 0; $SIG{'USR1'} = sub { $main::ENV{CRUNCH_DEBUG} = 1; }; $SIG{'USR2'} = sub { $main::ENV{CRUNCH_DEBUG} = 0; }; my $arv = Arvados->new('apiVersion' => 'v1'); my $Job; my $job_id; my $dbh; my $sth; my @jobstep; my $User = retry_op(sub { $arv->{'users'}->{'current'}->execute; }); if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification $Job = retry_op(sub { $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); }); if (!$force_unlock) { # Claim this job, and make sure nobody else does eval { retry_op(sub { # lock() sets is_locked_by_uuid and changes state to Running. $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'}) }); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; }; } } else { $Job = JSON::decode_json($jobspec); if (!$resume_stash) { map { croak ("No $_ specified") unless $Job->{$_} } qw(script script_version script_parameters); } $Job->{'is_locked_by_uuid'} = $User->{'uuid'}; $Job->{'started_at'} = gmtime; $Job->{'state'} = 'Running'; $Job = retry_op(sub { $arv->{'jobs'}->{'create'}->execute('job' => $Job); }); } $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; log_writer_start($keep_logfile); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'}; Log (undef, "check slurm allocation"); my @slot; my @node; # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)") my @sinfo; if (!$have_slurm) { my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1; push @sinfo, "$localcpus localhost"; } if (exists $ENV{SLURM_NODELIST}) { push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`; } foreach (@sinfo) { my ($ncpus, $slurm_nodelist) = split; $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus; my @nodelist; while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) { my $nodelist = $1; if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/) { my $ranges = $1; foreach (split (",", $ranges)) { my ($a, $b); if (/(\d+)-(\d+)/) { $a = $1; $b = $2; } else { $a = $_; $b = $_; } push @nodelist, map { my $n = $nodelist; $n =~ s/\[[-,\d]+\]/$_/; $n; } ($a..$b); } } else { push @nodelist, $nodelist; } } foreach my $nodename (@nodelist) { Log (undef, "node $nodename - $ncpus slots"); my $node = { name => $nodename, ncpus => $ncpus, losing_streak => 0, hold_until => 0 }; foreach my $cpu (1..$ncpus) { push @slot, { node => $node, cpu => $cpu }; } } push @node, @nodelist; } # Ensure that we get one jobstep running on each allocated node before # we start overloading nodes with concurrent steps @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; $Job->update_attributes( 'tasks_summary' => { 'failed' => 0, 'todo' => 1, 'running' => 0, 'done' => 0 }); Log (undef, "start"); $SIG{'INT'} = sub { $main::please_freeze = 1; }; $SIG{'QUIT'} = sub { $main::please_freeze = 1; }; $SIG{'TERM'} = \&croak; $SIG{'TSTP'} = sub { $main::please_freeze = 1; }; $SIG{'ALRM'} = sub { $main::please_info = 1; }; $SIG{'CONT'} = sub { $main::please_continue = 1; }; $SIG{'HUP'} = sub { $main::please_refresh = 1; }; $main::please_freeze = 0; $main::please_info = 0; $main::please_continue = 0; $main::please_refresh = 0; my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); $ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked; my $squeue_kill_checked; my $output_in_keep = 0; my $latest_refresh = scalar time; if (defined $Job->{thawedfromkey}) { thaw ($Job->{thawedfromkey}); } else { my $first_task = retry_op(sub { $arv->{'job_tasks'}->{'create'}->execute('job_task' => { 'job_uuid' => $Job->{'uuid'}, 'sequence' => 0, 'qsequence' => 0, 'parameters' => {}, }); }); push @jobstep, { 'level' => 0, 'failures' => 0, 'arvados_task' => $first_task, }; push @jobstep_todo, 0; } if (!$have_slurm) { must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); } my $build_script; do { local $/ = undef; $build_script = ; }; my $nodelist = join(",", @node); if (!defined $no_clear_tmp) { # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* Log (undef, "Clean work dirs"); my $cleanpid = fork(); if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep $CRUNCH_TMP/task/*.keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src*']); exit (1); } while (1) { last if $cleanpid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($cleanpid); select (undef, undef, undef, 0.1); } Log (undef, "Cleanup command exited ".exit_status_s($?)); } my $git_archive; if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) { # If script_version looks like an absolute path, *and* the --git-dir # argument was not given -- which implies we were not invoked by # crunch-dispatch -- we will use the given path as a working # directory instead of resolving script_version to a git commit (or # doing anything else with git). $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'}; $ENV{"CRUNCH_SRC"} = $Job->{'script_version'}; } else { # Resolve the given script_version to a git commit sha1. Also, if # the repository is remote, clone it into our local filesystem: this # ensures "git archive" will work, and is necessary to reliably # resolve a symbolic script_version like "master^". $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository}); $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; # If we're running under crunch-dispatch, it will have already # pulled the appropriate source tree into its own repository, and # given us that repo's path as $git_dir. # # If we're running a "local" job, we might have to fetch content # from a remote repository. # # (Currently crunch-dispatch gives a local path with --git-dir, but # we might as well accept URLs there too in case it changes its # mind.) my $repo = $git_dir || $Job->{'repository'}; # Repository can be remote or local. If remote, we'll need to fetch it # to a local dir before doing `git log` et al. my $repo_location; if ($repo =~ m{://|^[^/]*:}) { # $repo is a git url we can clone, like git:// or https:// or # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is # not recognized here because distinguishing that from a local # path is too fragile. If you really need something strange here, # use the ssh:// form. $repo_location = 'remote'; } elsif ($repo =~ m{^\.*/}) { # $repo is a local path to a git index. We'll also resolve ../foo # to ../foo/.git if the latter is a directory. To help # disambiguate local paths from named hosted repositories, this # form must be given as ./ or ../ if it's a relative path. if (-d "$repo/.git") { $repo = "$repo/.git"; } $repo_location = 'local'; } else { # $repo is none of the above. It must be the name of a hosted # repository. my $arv_repo_list = retry_op(sub { $arv->{'repositories'}->{'list'}->execute( 'filters' => [['name','=',$repo]]); }); my @repos_found = @{$arv_repo_list->{'items'}}; my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { Log(undef, "Repository '$repo' -> " . join(", ", map { $_->{'uuid'} } @repos_found)); } if ($n_found != 1) { croak("Error: Found $n_found repositories with name '$repo'."); } $repo = $repos_found[0]->{'fetch_url'}; $repo_location = 'remote'; } Log(undef, "Using $repo_location repository '$repo'"); $ENV{"CRUNCH_SRC_URL"} = $repo; # Resolve given script_version (we'll call that $treeish here) to a # commit sha1 ($commit). my $treeish = $Job->{'script_version'}; my $commit; if ($repo_location eq 'remote') { # We minimize excess object-fetching by re-using the same bare # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we # just keep adding remotes to it as needed. my $local_repo = $ENV{'CRUNCH_TMP'}."/.git"; my $gitcmd = "git --git-dir=\Q$local_repo\E"; # Set up our local repo for caching remote objects, making # archives, etc. if (!-d $local_repo) { make_path($local_repo) or croak("Error: could not create $local_repo"); } # This works (exits 0 and doesn't delete fetched objects) even # if $local_repo is already initialized: `$gitcmd init --bare`; if ($?) { croak("Error: $gitcmd init --bare exited ".exit_status_s($?)); } # If $treeish looks like a hash (or abbrev hash) we look it up in # our local cache first, since that's cheaper. (We don't want to # do that with tags/branches though -- those change over time, so # they should always be resolved by the remote repo.) if ($treeish =~ /^[0-9a-f]{7,40}$/s) { # Hide stderr because it's normal for this to fail: my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`; if ($? == 0 && # Careful not to resolve a branch named abcdeff to commit 1234567: $sha1 =~ /^$treeish/ && $sha1 =~ /^([0-9a-f]{40})$/s) { $commit = $1; Log(undef, "Commit $commit already present in $local_repo"); } } if (!defined $commit) { # If $treeish isn't just a hash or abbrev hash, or isn't here # yet, we need to fetch the remote to resolve it correctly. # First, remove all local heads. This prevents a name that does # not exist on the remote from resolving to (or colliding with) # a previously fetched branch or tag (possibly from a different # remote). remove_tree("$local_repo/refs/heads", {keep_root => 1}); Log(undef, "Fetching objects from $repo to $local_repo"); `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`; if ($?) { croak("Error: `$gitcmd fetch` exited ".exit_status_s($?)); } } # Now that the data is all here, we will use our local repo for # the rest of our git activities. $repo = $local_repo; } my $gitcmd = "git --git-dir=\Q$repo\E"; my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`; unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) { croak("`$gitcmd rev-list` exited " .exit_status_s($?) .", '$treeish' not found. Giving up."); } $commit = $1; Log(undef, "Version $treeish is commit $commit"); if ($commit ne $Job->{'script_version'}) { # Record the real commit id in the database, frozentokey, logs, # etc. -- instead of an abbreviation or a branch name which can # become ambiguous or point to a different commit in the future. if (!$Job->update_attributes('script_version' => $commit)) { croak("Error: failed to update job's script_version attribute"); } } $ENV{"CRUNCH_SRC_COMMIT"} = $commit; $git_archive = `$gitcmd archive ''\Q$commit\E`; if ($?) { croak("Error: $gitcmd archive exited ".exit_status_s($?)); } } if (!defined $git_archive) { Log(undef, "Skip install phase (no git archive)"); if ($have_slurm) { Log(undef, "Warning: This probably means workers have no source tree!"); } } else { Log(undef, "Run install script on all workers"); my @srunargs = ("srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); my @execargs = ("sh", "-c", "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); my $installpid = fork(); if ($installpid == 0) { srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); exit (1); } while (1) { last if $installpid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($installpid); select (undef, undef, undef, 0.1); } Log (undef, "Install script exited ".exit_status_s($?)); } if (!$have_slurm) { # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above) must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); } # If this job requires a Docker image, install that. my $docker_bin = "/usr/bin/docker.io"; my ($docker_locator, $docker_stream, $docker_hash); if ($docker_locator = $Job->{docker_image_locator}) { ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) { croak("No Docker image hash found from locator $docker_locator"); } $docker_stream =~ s/^\.//; my $docker_install_script = qq{ if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load fi }; my $docker_pid = fork(); if ($docker_pid == 0) { srun (["srun", "--nodelist=" . join(',', @node)], ["/bin/sh", "-ec", $docker_install_script]); exit ($?); } while (1) { last if $docker_pid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($docker_pid); select (undef, undef, undef, 0.1); } if ($? != 0) { croak("Installing Docker image from $docker_locator exited " .exit_status_s($?)); } } foreach (qw (script script_version script_parameters runtime_constraints)) { Log (undef, "$_ " . (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_})); } foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } $main::success = undef; ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; Log (undef, "start level $level"); my %proc; my @freeslot = (0..$#slot); my @holdslot; my %reader; my $progress_is_dirty = 1; my $progress_stats_updated = 0; update_progress_stats(); THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { my $id = $jobstep_todo[$todo_ptr]; my $Jobstep = $jobstep[$id]; if ($Jobstep->{level} != $level) { next; } pipe $reader{$id}, "writer" or croak ($!); my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!); fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; my $childslotname = join (".", $slot[$childslot]->{node}->{name}, $slot[$childslot]->{cpu}); my $childpid = fork(); if ($childpid == 0) { $SIG{'INT'} = 'DEFAULT'; $SIG{'QUIT'} = 'DEFAULT'; $SIG{'TERM'} = 'DEFAULT'; foreach (values (%reader)) { close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec open(STDOUT,">&writer"); open(STDERR,">&writer"); undef $dbh; undef $sth; delete $ENV{"GNUPGHOME"}; $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'}; $ENV{"TASK_QSEQUENCE"} = $id; $ENV{"TASK_SEQUENCE"} = $level; $ENV{"JOB_SCRIPT"} = $Job->{script}; while (my ($param, $value) = each %{$Job->{script_parameters}}) { $param =~ tr/a-z/A-Z/; $ENV{"JOB_PARAMETER_$param"} = $value; } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; $ENV{"GZIP"} = "-n"; my @srunargs = ( "srun", "--nodelist=".$childnode->{name}, qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); my $build_script_to_send = ""; my $command = "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " ."&& cd $ENV{CRUNCH_TMP} "; if ($build_script) { $build_script_to_send = $build_script; $command .= "&& perl -"; } $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec "; if ($docker_hash) { my $cidfile = "$ENV{CRUNCH_TMP}/$ENV{TASK_UUID}.cid"; $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$cidfile -poll=10000 "; $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --attach=stdin -i --user=crunch --cidfile=$cidfile --sig-proxy "; # Dynamically configure the container to use the host system as its # DNS server. Get the host's global addresses from the ip command, # and turn them into docker --dns options using gawk. $command .= q{$(ip -o address show scope global | gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') }; # The source tree and $destdir directory (which we have # installed on the worker host) are available in the container, # under the same path. $command .= "--volume=\Q$ENV{CRUNCH_SRC}:$ENV{CRUNCH_SRC}:ro\E "; $command .= "--volume=\Q$ENV{CRUNCH_INSTALL}:$ENV{CRUNCH_INSTALL}:ro\E "; # Currently, we make arv-mount's mount point appear at /keep # inside the container (instead of using the same path as the # host like we do with CRUNCH_SRC and CRUNCH_INSTALL). However, # crunch scripts and utilities must not rely on this. They must # use $TASK_KEEPMOUNT. $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E "; $ENV{TASK_KEEPMOUNT} = "/keep"; # TASK_WORK is a plain docker data volume: it starts out empty, # is writable, and persists until no containers use it any # more. We don't use --volumes-from to share it with other # containers: it is only accessible to this task, and it goes # away when this task stops. $command .= "--volume=\Q$ENV{TASK_WORK}\E "; # JOB_WORK is also a plain docker data volume for now. TODO: # Share a single JOB_WORK volume across all task containers on a # given worker node, and delete it when the job ends (and, in # case that doesn't work, when the next job starts). $command .= "--volume=\Q$ENV{JOB_WORK}\E "; while (my ($env_key, $env_val) = each %ENV) { if ($env_key =~ /^(ARVADOS|CRUNCH|JOB|TASK)_/) { $command .= "--env=\Q$env_key=$env_val\E "; } } $command .= "--env=\QHOME=$ENV{HOME}\E "; $command .= "\Q$docker_hash\E "; $command .= "stdbuf --output=0 --error=0 "; $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; $command .= "stdbuf --output=0 --error=0 "; $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $build_script_to_send); # exec() failed, we assume nothing happened. die "srun() failed on build script\n"; } close("writer"); if (!defined $childpid) { close $reader{$id}; delete $reader{$id}; next; } shift @freeslot; $proc{$childpid} = { jobstep => $id, time => time, slot => $childslot, jobstepname => "$job_id.$id.$childpid", }; croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; $slot[$childslot]->{pid} = $childpid; Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'}); Log ($id, "child $childpid started on $childslotname"); $Jobstep->{starttime} = time; $Jobstep->{node} = $childnode->{name}; $Jobstep->{slotindex} = $childslot; delete $Jobstep->{stderr}; delete $Jobstep->{finishtime}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); $Jobstep->{'arvados_task'}->save; splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; $progress_is_dirty = 1; while (!@freeslot || (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze; if ($main::please_info) { $main::please_info = 0; freeze(); collate_output(); save_meta(1); update_progress_stats(); } my $gotsome = readfrompipes () + reapchildren (); if (!$gotsome) { check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); } elsif (time - $progress_stats_updated >= 30) { update_progress_stats(); } if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { my $message = "Repeated failure rate too high ($thisround_failed_multiple/" .($thisround_failed+$thisround_succeeded) .") -- giving up on this round"; Log (undef, $message); last THISROUND; } # move slots from freeslot to holdslot (or back to freeslot) if necessary for (my $i=$#freeslot; $i>=0; $i--) { if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) { push @holdslot, (splice @freeslot, $i, 1); } } for (my $i=$#holdslot; $i>=0; $i--) { if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) { push @freeslot, (splice @holdslot, $i, 1); } } # give up if no nodes are succeeding if (!grep { $_->{node}->{losing_streak} == 0 && $_->{node}->{hold_count} < 4 } @slot) { my $message = "Every node has failed -- giving up on this round"; Log (undef, $message); last THISROUND; } } } push @freeslot, splice @holdslot; map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot); Log (undef, "wait for last ".(scalar keys %proc)." children to finish"); while (%proc) { if ($main::please_continue) { $main::please_continue = 0; goto THISROUND; } $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info; readfrompipes (); if (!reapchildren()) { check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); killem (keys %proc) if $main::please_freeze; } } update_progress_stats(); freeze_if_want_freeze(); if (!defined $main::success) { if (@jobstep_todo && $thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } if (!@jobstep_todo) { $main::success = 1; } } goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); my $collated_output = &collate_output(); if (!$collated_output) { Log(undef, "output undef"); } else { eval { open(my $orig_manifest, '-|', 'arv-get', $collated_output) or die "failed to get collated manifest: $!"; my $orig_manifest_text = ''; while (my $manifest_line = <$orig_manifest>) { $orig_manifest_text .= $manifest_line; } my $output = retry_op(sub { $arv->{'collections'}->{'create'}->execute( 'collection' => {'manifest_text' => $orig_manifest_text}); }); Log(undef, "output uuid " . $output->{uuid}); Log(undef, "output hash " . $output->{portable_data_hash}); $Job->update_attributes('output' => $output->{portable_data_hash}); }; if ($@) { Log (undef, "Failed to register output manifest: $@"); } } Log (undef, "finish"); save_meta(); my $final_state; if ($collated_output && $main::success) { $final_state = 'Complete'; } else { $final_state = 'Failed'; } $Job->update_attributes('state' => $final_state); exit (($final_state eq 'Complete') ? 0 : 1); sub update_progress_stats { $progress_stats_updated = time; return if !$progress_is_dirty; my ($todo, $done, $running) = (scalar @jobstep_todo, scalar @jobstep_done, scalar @slot - scalar @freeslot - scalar @holdslot); $Job->{'tasks_summary'} ||= {}; $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; $Job->{'tasks_summary'}->{'running'} = $running; $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'}); Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } sub reapchildren { my $pid = waitpid (-1, WNOHANG); return 0 if $pid <= 0; my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} . "." . $slot[$proc{$pid}->{slot}]->{cpu}); my $jobstepid = $proc{$pid}->{jobstep}; my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepid]; my $childstatus = $?; my $exitvalue = $childstatus >> 8; my $exitinfo = "exit ".exit_status_s($childstatus); $Jobstep->{'arvados_task'}->reload; my $task_success = $Jobstep->{'arvados_task'}->{success}; Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success"); if (!defined $task_success) { # task did not indicate one way or the other --> fail $Jobstep->{'arvados_task'}->{success} = 0; $Jobstep->{'arvados_task'}->save; $task_success = 0; } if (!$task_success) { my $temporary_fail; $temporary_fail ||= $Jobstep->{node_fail}; $temporary_fail ||= ($exitvalue == 111); ++$thisround_failed; ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; # Check for signs of a failed or misconfigured node if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { # Don't count this against jobstep failure thresholds if this # node is already suspected faulty and srun exited quickly if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && $elapsed < 5) { Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name}); $temporary_fail ||= 1; } ban_node_by_slot($proc{$pid}->{slot}); } Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', ++$Jobstep->{'failures'}, $temporary_fail ? 'temporary ' : 'permanent', $elapsed)); if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { # Give up on this task, and the whole job $main::success = 0; $main::please_freeze = 1; } # Put this task back on the todo queue push @jobstep_todo, $jobstepid; $Job->{'tasks_summary'}->{'failed'}++; } else { ++$thisround_succeeded; $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; push @jobstep_done, $jobstepid; Log ($jobstepid, "success in $elapsed seconds"); } $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); $Jobstep->{'arvados_task'}->save; process_stderr ($jobstepid, $task_success); Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output}); close $reader{$jobstepid}; delete $reader{$jobstepid}; delete $slot[$proc{$pid}->{slot}]->{pid}; push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; if ($task_success) { # Load new tasks my $newtask_list = []; my $newtask_results; do { $newtask_results = retry_op(sub { $arv->{'job_tasks'}->{'list'}->execute( 'where' => { 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} }, 'order' => 'qsequence', 'offset' => scalar(@$newtask_list), ); }); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { my $jobstep = { 'level' => $arvados_task->{'sequence'}, 'failures' => 0, 'arvados_task' => $arvados_task }; push @jobstep, $jobstep; push @jobstep_todo, $#jobstep; } } $progress_is_dirty = 1; 1; } sub check_refresh_wanted { my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; my $Job2 = retry_op(sub { $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); }); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', 'state') { $Job->{$attr} = $Job2->{$attr}; } if ($Job->{'state'} ne "Running") { if ($Job->{'state'} eq "Cancelled") { Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'}); } else { Log (undef, "Job state unexpectedly changed to " . $Job->{'state'}); } $main::success = 0; $main::please_freeze = 1; } } } sub check_squeue { # return if the kill list was checked <4 seconds ago if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4) { return; } $squeue_kill_checked = time; # use killem() on procs whose killtime is reached for (keys %proc) { if (exists $proc{$_}->{killtime} && $proc{$_}->{killtime} <= time) { killem ($_); } } # return if the squeue was checked <60 seconds ago if (defined $squeue_checked && $squeue_checked > time - 60) { return; } $squeue_checked = time; if (!$have_slurm) { # here is an opportunity to check for mysterious problems with local procs return; } # get a list of steps still running my @squeue = `squeue -s -h -o '%i %j' && echo ok`; chop @squeue; if ($squeue[-1] ne "ok") { return; } pop @squeue; # which of my jobsteps are running, according to squeue? my %ok; foreach (@squeue) { if (/^(\d+)\.(\d+) (\S+)/) { if ($1 eq $ENV{SLURM_JOBID}) { $ok{$3} = 1; } } } # which of my active child procs (>60s old) were not mentioned by squeue? foreach (keys %proc) { if ($proc{$_}->{time} < time - 60 && !exists $ok{$proc{$_}->{jobstepname}} && !exists $proc{$_}->{killtime}) { # kill this proc if it hasn't exited in 30 seconds $proc{$_}->{killtime} = time + 30; } } } sub release_allocation { if ($have_slurm) { Log (undef, "release job allocation"); system "scancel $ENV{SLURM_JOBID}"; } } sub readfrompipes { my $gotsome = 0; foreach my $job (keys %reader) { my $buf; while (0 < sysread ($reader{$job}, $buf, 8192)) { print STDERR $buf if $ENV{CRUNCH_DEBUG}; $jobstep[$job]->{stderr} .= $buf; preprocess_stderr ($job); if (length ($jobstep[$job]->{stderr}) > 16384) { substr ($jobstep[$job]->{stderr}, 0, 8192) = ""; } $gotsome = 1; } } return $gotsome; } sub preprocess_stderr { my $job = shift; while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { my $line = $1; substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; Log ($job, "stderr $line"); if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) { # whoa. $main::please_freeze = 1; } elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) { $jobstep[$job]->{node_fail} = 1; ban_node_by_slot($jobstep[$job]->{slotindex}); } } } sub process_stderr { my $job = shift; my $task_success = shift; preprocess_stderr ($job); map { Log ($job, "stderr $_"); } split ("\n", $jobstep[$job]->{stderr}); } sub fetch_block { my $hash = shift; my ($keep, $child_out, $output_block); my $cmd = "arv-get \Q$hash\E"; open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!"; $output_block = ''; while (1) { my $buf; my $bytes = sysread($keep, $buf, 1024 * 1024); if (!defined $bytes) { die "reading from arv-get: $!"; } elsif ($bytes == 0) { # sysread returns 0 at the end of the pipe. last; } else { # some bytes were read into buf. $output_block .= $buf; } } close $keep; return $output_block; } sub collate_output { Log (undef, "collate"); my ($child_out, $child_in); my $pid = open2($child_out, $child_in, 'arv-put', '--raw', '--retries', retry_count()); my $joboutput; for (@jobstep) { next if (!exists $_->{'arvados_task'}->{'output'} || !$_->{'arvados_task'}->{'success'}); my $output = $_->{'arvados_task'}->{output}; if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/) { $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/; print $child_in $output; } elsif (@jobstep == 1) { $joboutput = $output; last; } elsif (defined (my $outblock = fetch_block ($output))) { $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/; print $child_in $outblock; } else { Log (undef, "XXX fetch_block($output) failed XXX"); $main::success = 0; } } $child_in->close; if (!defined $joboutput) { my $s = IO::Select->new($child_out); if ($s->can_read(120)) { sysread($child_out, $joboutput, 64 * 1024 * 1024); chomp($joboutput); # TODO: Ensure exit status == 0. } else { Log (undef, "timed out reading from 'arv-put'"); } } # TODO: kill $pid instead of waiting, now that we've decided to # ignore further output. waitpid($pid, 0); return $joboutput; } sub killem { foreach (@_) { my $sig = 2; # SIGINT first if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 15; # SIGTERM if SIGINT doesn't work } if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 9; # SIGKILL if SIGTERM doesn't work } if (!exists $proc{$_}->{"sent_$sig"}) { Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); kill $sig, $_; select (undef, undef, undef, 0.1); if ($sig == 2) { kill $sig, $_; # srun wants two SIGINT to really interrupt } $proc{$_}->{"sent_$sig"} = time; $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"}; } } } sub fhbits { my($bits); for (@_) { vec($bits,fileno($_),1) = 1; } $bits; } # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: # # log_writer_start($logfilename) # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # # log_writer_send($txt) # Writes $txt to the output log collection. # # log_writer_finish() # Closes the arv-put pipe and returns the output that it produces. # # log_writer_is_active() # Returns a true value if there is currently a live arv-put # process, false otherwise. # my ($log_pipe_in, $log_pipe_out, $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', '--portable-data-hash', '--retries', '3', '--filename', $logfilename, '-'); } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; } sub log_writer_finish() { return unless $log_pipe_pid; close($log_pipe_in); my $arv_put_output; my $s = IO::Select->new($log_pipe_out); if ($s->can_read(120)) { sysread($log_pipe_out, $arv_put_output, 1024); chomp($arv_put_output); } else { Log (undef, "timed out reading from 'arv-put'"); } waitpid($log_pipe_pid, 0); $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef; if ($?) { Log("log_writer_finish: arv-put exited ".exit_status_s($?)) } return $arv_put_output; } sub log_writer_is_active() { return $log_pipe_pid; } sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { for my $line (split (/\n/, $_[1])) { Log ($_[0], $line); } return; } my $fh = select STDERR; $|=1; select $fh; my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; if (log_writer_is_active() || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); if (log_writer_is_active()) { log_writer_send($datetime . " " . $message); } } sub croak { my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); freeze() if @jobstep_todo; collate_output() if @jobstep_todo; cleanup(); save_meta(); die; } sub cleanup { return unless $Job; if ($Job->{'state'} eq 'Cancelled') { $Job->update_attributes('finished_at' => scalar gmtime); } else { $Job->update_attributes('state' => 'Failed'); } } sub save_meta { my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); my $loglocator = log_writer_finish(); Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; $Job->update_attributes('log', $loglocator); } sub freeze_if_want_freeze { if ($main::please_freeze) { release_allocation(); if (@_) { # kill some srun procs before freeze+stop map { $proc{$_} = {} } @_; while (%proc) { killem (keys %proc); select (undef, undef, undef, 0.1); my $died; while (($died = waitpid (-1, WNOHANG)) > 0) { delete $proc{$died}; } } } freeze(); collate_output(); cleanup(); save_meta(); exit 1; } } sub freeze { Log (undef, "Freeze not implemented"); return; } sub thaw { croak ("Thaw not implemented"); } sub freezequote { my $s = shift; $s =~ s/\\/\\\\/g; $s =~ s/\n/\\n/g; return $s; } sub freezeunquote { my $s = shift; $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge; return $s; } sub srun { my $srunargs = shift; my $execargs = shift; my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; $Data::Dumper::Terse = 1; $Data::Dumper::Indent = 0; my $show_cmd = Dumper($args); $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; $show_cmd =~ s/\n/ /g; warn "starting: $show_cmd\n"; if (defined $stdin) { my $child = open STDIN, "-|"; defined $child or die "no fork: $!"; if ($child == 0) { print $stdin or die $!; close STDOUT or die $!; exit 0; } } return system (@$args) if $opts->{fork}; exec @$args; warn "ENV size is ".length(join(" ",%ENV)); die "exec failed: $!: @$args"; } sub ban_node_by_slot { # Don't start any new jobsteps on this node for 60 seconds my $slotid = shift; $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; $slot[$slotid]->{node}->{hold_count}++; Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); } sub must_lock_now { my ($lockfile, $error_message) = @_; open L, ">", $lockfile or croak("$lockfile: $!"); if (!flock L, LOCK_EX|LOCK_NB) { croak("Can't lock $lockfile: $error_message\n"); } } sub find_docker_image { # Given a Keep locator, check to see if it contains a Docker image. # If so, return its stream name and Docker hash. # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); my $image = retry_op(sub { $arv->{collections}->{get}->execute(uuid => $locator); }); if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); next if (!@tokens); $streamname = shift(@tokens); foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) { if (defined($filename)) { return (undef, undef); # More than one file in the Collection. } else { $filename = (split(/:/, $filedata, 3))[2]; } } } } if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) { return ($streamname, $1); } else { return (undef, undef); } } sub retry_count { # Calculate the number of times an operation should be retried, # assuming exponential backoff, and that we're willing to retry as # long as tasks have been running. Enforce a minimum of 3 retries. my ($starttime, $endtime, $timediff, $retries); if (@jobstep) { $starttime = $jobstep[0]->{starttime}; $endtime = $jobstep[-1]->{finishtime}; } if (!defined($starttime)) { $timediff = 0; } elsif (!defined($endtime)) { $timediff = time - $starttime; } else { $timediff = ($endtime - $starttime) - (time - $endtime); } if ($timediff > 0) { $retries = int(log($timediff) / log(2)); } else { $retries = 1; # Use the minimum. } return ($retries > 3) ? $retries : 3; } sub retry_op { # Given a function reference, call it with the remaining arguments. If # it dies, retry it with exponential backoff until it succeeds, or until # the current retry_count is exhausted. my $operation = shift; my $retries = retry_count(); foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); my $result = eval { $operation->(@_); }; if (!$@) { return $result; } elsif ($try_count < $retries) { my $sleep_time = $next_try - time; sleep($sleep_time) if ($sleep_time > 0); } } # Ensure the error message ends in a newline, so Perl doesn't add # retry_op's line number to it. chomp($@); die($@ . "\n"); } sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". my $exitcode = shift; my $s = $exitcode >> 8; if ($exitcode & 0x7f) { $s .= " with signal " . ($exitcode & 0x7f); } if ($exitcode & 0x80) { $s .= " with core dump"; } return $s; } __DATA__ #!/usr/bin/perl # checkout-and-build use Fcntl ':flock'; use File::Path qw( make_path remove_tree ); my $destdir = $ENV{"CRUNCH_SRC"}; my $commit = $ENV{"CRUNCH_SRC_COMMIT"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; my $task_work = $ENV{"TASK_WORK"}; for my $dir ($destdir, $task_work) { if ($dir) { make_path $dir; -e $dir or die "Failed to create temporary directory ($dir): $!"; } } if ($task_work) { remove_tree($task_work, {keep_root => 1}); } open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; if (readlink ("$destdir.commit") eq $commit && -d $destdir) { if (@ARGV) { exec(@ARGV); die "Cannot exec `@ARGV`: $!"; } else { exit 0; } } unlink "$destdir.commit"; open STDERR_ORIG, ">&STDERR"; open STDOUT, ">", "$destdir.log"; open STDERR, ">&STDOUT"; mkdir $destdir; my @git_archive_data = ; if (@git_archive_data) { open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; print TARX @git_archive_data; if(!close(TARX)) { die "'tar -C $destdir -xf -' exited $?: $!"; } } my $pwd; chomp ($pwd = `pwd`); my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; mkdir $install_dir; for my $src_path ("$destdir/arvados/sdk/python") { if (-d $src_path) { shell_or_die ("virtualenv", $install_dir); shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install"); } } if (-e "$destdir/crunch_scripts/install") { shell_or_die ("$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { # Old version shell_or_die ("./tests/autotests.sh", $install_dir); } elsif (-e "./install.sh") { shell_or_die ("./install.sh", $install_dir); } if ($commit) { unlink "$destdir.commit.new"; symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!"; rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!"; } close L; if (@ARGV) { exec(@ARGV); die "Cannot exec `@ARGV`: $!"; } else { exit 0; } sub shell_or_die { if ($ENV{"DEBUG"}) { print STDERR "@_\n"; } if (system (@_) != 0) { my $err = $!; my $exitstatus = sprintf("exit %d signal %d", $? >> 8, $? & 0x7f); open STDERR, ">&STDERR_ORIG"; system ("cat $destdir.log >&2"); die "@_ failed ($err): $exitstatus"; } } __DATA__