#!/usr/bin/env perl # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*- =head1 NAME crunch-job: Execute job steps, save snapshots as requested, collate output. =head1 SYNOPSIS Obtain job details from Arvados, run tasks on compute nodes (typically invoked by scheduler on controller): crunch-job --job x-y-z --git-dir /path/to/repo/.git Obtain job details from command line, run tasks on local machine (typically invoked by application or developer on VM): crunch-job --job '{"script_version":"/path/to/working/tree","script":"scriptname",...}' crunch-job --job '{"repository":"https://github.com/curoverse/arvados.git","script_version":"master","script":"scriptname",...}' =head1 OPTIONS =over =item --force-unlock If the job is already locked, steal the lock and run it anyway. =item --git-dir Path to a .git directory (or a git URL) where the commit given in the job's C attribute is to be found. If this is I given, the job's C attribute will be used. =item --job-api-token Arvados API authorization token to use during the course of the job. =item --no-clear-tmp Do not clear per-job/task temporary directories during initial job setup. This can speed up development and debugging when running jobs locally. =item --job UUID of the job to run, or a JSON-encoded job resource without a UUID. If the latter is given, a new job object will be created. =back =head1 RUNNING JOBS LOCALLY crunch-job's log messages appear on stderr along with the job tasks' stderr streams. The log is saved in Keep at each checkpoint and when the job finishes. If the job succeeds, the job's output locator is printed on stdout. While the job is running, the following signals are accepted: =over =item control-C, SIGINT, SIGQUIT Save a checkpoint, terminate any job tasks that are running, and stop. =item SIGALRM Save a checkpoint and continue. =item SIGHUP Refresh node allocation (i.e., check whether any nodes have been added or unallocated) and attributes of the Job record that should affect behavior (e.g., cancel job if cancelled_at becomes non-nil). =back =cut use strict; use POSIX ':sys_wait_h'; use POSIX qw(strftime); use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Arvados; use Cwd qw(realpath); use Data::Dumper; use Digest::MD5 qw(md5_hex); use Getopt::Long; use IPC::Open2; use IO::Select; use File::Temp; use Fcntl ':flock'; use File::Path qw( make_path remove_tree ); use constant TASK_TEMPFAIL => 111; use constant EX_TEMPFAIL => 75; use constant EX_RETRY_UNLOCKED => 93; $ENV{"TMPDIR"} ||= "/tmp"; unless (defined $ENV{"CRUNCH_TMP"}) { $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job"; if ($ENV{"USER"} ne "crunch" && $< != 0) { # use a tmp dir unique for my uid $ENV{"CRUNCH_TMP"} .= "-$<"; } } # Create the tmp directory if it does not exist if ( ! -d $ENV{"CRUNCH_TMP"} ) { make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"}; } $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); my %proc; my $force_unlock; my $git_dir; my $jobspec; my $job_api_token; my $no_clear_tmp; my $resume_stash; my $docker_bin = "docker.io"; my $docker_run_args = ""; GetOptions('force-unlock' => \$force_unlock, 'git-dir=s' => \$git_dir, 'job=s' => \$jobspec, 'job-api-token=s' => \$job_api_token, 'no-clear-tmp' => \$no_clear_tmp, 'resume-stash=s' => \$resume_stash, 'docker-bin=s' => \$docker_bin, 'docker-run-args=s' => \$docker_run_args, ); if (defined $job_api_token) { $ENV{ARVADOS_API_TOKEN} = $job_api_token; } my $have_slurm = exists $ENV{SLURM_JOB_ID} && exists $ENV{SLURM_NODELIST}; $SIG{'USR1'} = sub { $main::ENV{CRUNCH_DEBUG} = 1; }; $SIG{'USR2'} = sub { $main::ENV{CRUNCH_DEBUG} = 0; }; my $arv = Arvados->new('apiVersion' => 'v1'); my $Job; my $job_id; my $dbh; my $sth; my @jobstep; my $local_job; if ($jobspec =~ /^[-a-z\d]+$/) { # $jobspec is an Arvados UUID, not a JSON job specification $Job = api_call("jobs/get", uuid => $jobspec); $local_job = 0; } else { $local_job = JSON::decode_json($jobspec); } # Make sure our workers (our slurm nodes, localhost, or whatever) are # at least able to run basic commands: they aren't down or severely # misconfigured. my $cmd = ['true']; if (($Job || $local_job)->{docker_image_locator}) { $cmd = [$docker_bin, 'ps', '-q']; } Log(undef, "Sanity check is `@$cmd`"); srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"], $cmd, {fork => 1}); if ($? != 0) { Log(undef, "Sanity check failed: ".exit_status_s($?)); exit EX_TEMPFAIL; } Log(undef, "Sanity check OK"); my $User = api_call("users/current"); if (!$local_job) { if (!$force_unlock) { # Claim this job, and make sure nobody else does eval { api_call("jobs/lock", uuid => $Job->{uuid}); }; if ($@) { Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL); exit EX_TEMPFAIL; }; } } else { if (!$resume_stash) { map { croak ("No $_ specified") unless $local_job->{$_} } qw(script script_version script_parameters); } $local_job->{'is_locked_by_uuid'} = $User->{'uuid'}; $local_job->{'started_at'} = gmtime; $local_job->{'state'} = 'Running'; $Job = api_call("jobs/create", job => $local_job); } $job_id = $Job->{'uuid'}; my $keep_logfile = $job_id . '.log.txt'; log_writer_start($keep_logfile); $Job->{'runtime_constraints'} ||= {}; $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0; my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'}; my $gem_versions = `gem list --quiet arvados-cli 2>/dev/null`; if ($? == 0) { $gem_versions =~ s/^arvados-cli \(/ with arvados-cli Gem version(s) /; chomp($gem_versions); chop($gem_versions); # Closing parentheses } else { $gem_versions = ""; } Log(undef, "running from " . ((-e $0) ? realpath($0) : "stdin") . $gem_versions); Log (undef, "check slurm allocation"); my @slot; my @node; # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)") my @sinfo; if (!$have_slurm) { my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1; push @sinfo, "$localcpus localhost"; } if (exists $ENV{SLURM_NODELIST}) { push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`; } foreach (@sinfo) { my ($ncpus, $slurm_nodelist) = split; $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus; my @nodelist; while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//) { my $nodelist = $1; if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/) { my $ranges = $1; foreach (split (",", $ranges)) { my ($a, $b); if (/(\d+)-(\d+)/) { $a = $1; $b = $2; } else { $a = $_; $b = $_; } push @nodelist, map { my $n = $nodelist; $n =~ s/\[[-,\d]+\]/$_/; $n; } ($a..$b); } } else { push @nodelist, $nodelist; } } foreach my $nodename (@nodelist) { Log (undef, "node $nodename - $ncpus slots"); my $node = { name => $nodename, ncpus => $ncpus, # The number of consecutive times a task has been dispatched # to this node and failed. losing_streak => 0, # The number of consecutive times that SLURM has reported # a node failure since the last successful task. fail_count => 0, # Don't dispatch work to this node until this time # (in seconds since the epoch) has passed. hold_until => 0 }; foreach my $cpu (1..$ncpus) { push @slot, { node => $node, cpu => $cpu }; } } push @node, @nodelist; } # Ensure that we get one jobstep running on each allocated node before # we start overloading nodes with concurrent steps @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot; $Job->update_attributes( 'tasks_summary' => { 'failed' => 0, 'todo' => 1, 'running' => 0, 'done' => 0 }); Log (undef, "start"); $SIG{'INT'} = sub { $main::please_freeze = 1; }; $SIG{'QUIT'} = sub { $main::please_freeze = 1; }; $SIG{'TERM'} = \&croak; $SIG{'TSTP'} = sub { $main::please_freeze = 1; }; $SIG{'ALRM'} = sub { $main::please_info = 1; }; $SIG{'CONT'} = sub { $main::please_continue = 1; }; $SIG{'HUP'} = sub { $main::please_refresh = 1; }; $main::please_freeze = 0; $main::please_info = 0; $main::please_continue = 0; $main::please_refresh = 0; my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs}); $ENV{"CRUNCH_JOB_UUID"} = $job_id; $ENV{"JOB_UUID"} = $job_id; my @jobstep_todo = (); my @jobstep_done = (); my @jobstep_tomerge = (); my $jobstep_tomerge_level = 0; my $squeue_checked = 0; my $latest_refresh = scalar time; if (defined $Job->{thawedfromkey}) { thaw ($Job->{thawedfromkey}); } else { my $first_task = api_call("job_tasks/create", job_task => { 'job_uuid' => $Job->{'uuid'}, 'sequence' => 0, 'qsequence' => 0, 'parameters' => {}, }); push @jobstep, { 'level' => 0, 'failures' => 0, 'arvados_task' => $first_task, }; push @jobstep_todo, 0; } if (!$have_slurm) { must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here."); } my $build_script = handle_readall(\*DATA); my $nodelist = join(",", @node); my $git_tar_count = 0; if (!defined $no_clear_tmp) { # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* Log (undef, "Clean work dirs"); my $cleanpid = fork(); if ($cleanpid == 0) { # Find FUSE mounts under $CRUNCH_TMP and unmount them. # Then clean up work directories. # TODO: When #5036 is done and widely deployed, we can limit mount's # -t option to simply fuse.keep. srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']); exit (1); } while (1) { last if $cleanpid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($cleanpid); select (undef, undef, undef, 0.1); } if ($?) { Log(undef, "Clean work dirs: exit ".exit_status_s($?)); exit(EX_RETRY_UNLOCKED); } } # If this job requires a Docker image, install that. my ($docker_locator, $docker_stream, $docker_hash, $docker_limitmem, $dockeruserarg); if ($docker_locator = $Job->{docker_image_locator}) { Log (undef, "Install docker image $docker_locator"); ($docker_stream, $docker_hash) = find_docker_image($docker_locator); if (!$docker_hash) { croak("No Docker image hash found from locator $docker_locator"); } Log (undef, "docker image hash is $docker_hash"); $docker_stream =~ s/^\.//; my $docker_install_script = qq{ if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load fi }; my $docker_pid = fork(); if ($docker_pid == 0) { srun (["srun", "--nodelist=" . join(',', @node)], ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]); exit ($?); } while (1) { last if $docker_pid == waitpid (-1, WNOHANG); freeze_if_want_freeze ($docker_pid); select (undef, undef, undef, 0.1); } if ($? != 0) { Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?)); exit(EX_RETRY_UNLOCKED); } # Determine whether this version of Docker supports memory+swap limits. srun(["srun", "--nodelist=" . $node[0]], ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="], {fork => 1}); $docker_limitmem = ($? == 0); # Find a non-root Docker user to use. # Tries the default user for the container, then 'crunch', then 'nobody', # testing for whether the actual user id is non-zero. This defends against # mistakes but not malice, but we intend to harden the security in the future # so we don't want anyone getting used to their jobs running as root in their # Docker containers. my @tryusers = ("", "crunch", "nobody"); foreach my $try_user (@tryusers) { my $try_user_arg; if ($try_user eq "") { Log(undef, "Checking if container default user is not UID 0"); $try_user_arg = ""; } else { Log(undef, "Checking if user '$try_user' is not UID 0"); $try_user_arg = "--user=$try_user"; } srun(["srun", "--nodelist=" . $node[0]], ["/bin/sh", "-ec", "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " . " test \$a -ne 0"], {fork => 1}); if ($? == 0) { $dockeruserarg = $try_user_arg; if ($try_user eq "") { Log(undef, "Container will run with default user"); } else { Log(undef, "Container will run with $dockeruserarg"); } last; } } if (!defined $dockeruserarg) { croak("Could not find a user in container that is not UID 0 (tried default user, @tryusers) or there was a problem running 'id' in the container."); } if ($Job->{arvados_sdk_version}) { # The job also specifies an Arvados SDK version. Add the SDKs to the # tar file for the build script to install. Log(undef, sprintf("Packing Arvados SDK version %s for installation", $Job->{arvados_sdk_version})); add_git_archive("git", "--git-dir=$git_dir", "archive", "--prefix=.arvados.sdk/", $Job->{arvados_sdk_version}, "sdk"); } } if (!defined $git_dir && $Job->{'script_version'} =~ m{^/}) { # If script_version looks like an absolute path, *and* the --git-dir # argument was not given -- which implies we were not invoked by # crunch-dispatch -- we will use the given path as a working # directory instead of resolving script_version to a git commit (or # doing anything else with git). $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{'script_version'}; $ENV{"CRUNCH_SRC"} = $Job->{'script_version'}; } else { # Resolve the given script_version to a git commit sha1. Also, if # the repository is remote, clone it into our local filesystem: this # ensures "git archive" will work, and is necessary to reliably # resolve a symbolic script_version like "master^". $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; Log (undef, "Looking for version ".$Job->{script_version}." from repository ".$Job->{repository}); $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; # If we're running under crunch-dispatch, it will have already # pulled the appropriate source tree into its own repository, and # given us that repo's path as $git_dir. # # If we're running a "local" job, we might have to fetch content # from a remote repository. # # (Currently crunch-dispatch gives a local path with --git-dir, but # we might as well accept URLs there too in case it changes its # mind.) my $repo = $git_dir || $Job->{'repository'}; # Repository can be remote or local. If remote, we'll need to fetch it # to a local dir before doing `git log` et al. my $repo_location; if ($repo =~ m{://|^[^/]*:}) { # $repo is a git url we can clone, like git:// or https:// or # file:/// or [user@]host:repo.git. Note "user/name@host:foo" is # not recognized here because distinguishing that from a local # path is too fragile. If you really need something strange here, # use the ssh:// form. $repo_location = 'remote'; } elsif ($repo =~ m{^\.*/}) { # $repo is a local path to a git index. We'll also resolve ../foo # to ../foo/.git if the latter is a directory. To help # disambiguate local paths from named hosted repositories, this # form must be given as ./ or ../ if it's a relative path. if (-d "$repo/.git") { $repo = "$repo/.git"; } $repo_location = 'local'; } else { # $repo is none of the above. It must be the name of a hosted # repository. my $arv_repo_list = api_call("repositories/list", 'filters' => [['name','=',$repo]]); my @repos_found = @{$arv_repo_list->{'items'}}; my $n_found = $arv_repo_list->{'serverResponse'}->{'items_available'}; if ($n_found > 0) { Log(undef, "Repository '$repo' -> " . join(", ", map { $_->{'uuid'} } @repos_found)); } if ($n_found != 1) { croak("Error: Found $n_found repositories with name '$repo'."); } $repo = $repos_found[0]->{'fetch_url'}; $repo_location = 'remote'; } Log(undef, "Using $repo_location repository '$repo'"); $ENV{"CRUNCH_SRC_URL"} = $repo; # Resolve given script_version (we'll call that $treeish here) to a # commit sha1 ($commit). my $treeish = $Job->{'script_version'}; my $commit; if ($repo_location eq 'remote') { # We minimize excess object-fetching by re-using the same bare # repository in CRUNCH_TMP/.git for multiple crunch-jobs -- we # just keep adding remotes to it as needed. my $local_repo = $ENV{'CRUNCH_TMP'}."/.git"; my $gitcmd = "git --git-dir=\Q$local_repo\E"; # Set up our local repo for caching remote objects, making # archives, etc. if (!-d $local_repo) { make_path($local_repo) or croak("Error: could not create $local_repo"); } # This works (exits 0 and doesn't delete fetched objects) even # if $local_repo is already initialized: `$gitcmd init --bare`; if ($?) { croak("Error: $gitcmd init --bare exited ".exit_status_s($?)); } # If $treeish looks like a hash (or abbrev hash) we look it up in # our local cache first, since that's cheaper. (We don't want to # do that with tags/branches though -- those change over time, so # they should always be resolved by the remote repo.) if ($treeish =~ /^[0-9a-f]{7,40}$/s) { # Hide stderr because it's normal for this to fail: my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E 2>/dev/null`; if ($? == 0 && # Careful not to resolve a branch named abcdeff to commit 1234567: $sha1 =~ /^$treeish/ && $sha1 =~ /^([0-9a-f]{40})$/s) { $commit = $1; Log(undef, "Commit $commit already present in $local_repo"); } } if (!defined $commit) { # If $treeish isn't just a hash or abbrev hash, or isn't here # yet, we need to fetch the remote to resolve it correctly. # First, remove all local heads. This prevents a name that does # not exist on the remote from resolving to (or colliding with) # a previously fetched branch or tag (possibly from a different # remote). remove_tree("$local_repo/refs/heads", {keep_root => 1}); Log(undef, "Fetching objects from $repo to $local_repo"); `$gitcmd fetch --no-progress --tags ''\Q$repo\E \Q+refs/heads/*:refs/heads/*\E`; if ($?) { croak("Error: `$gitcmd fetch` exited ".exit_status_s($?)); } } # Now that the data is all here, we will use our local repo for # the rest of our git activities. $repo = $local_repo; } my $gitcmd = "git --git-dir=\Q$repo\E"; my $sha1 = `$gitcmd rev-list -n1 ''\Q$treeish\E`; unless ($? == 0 && $sha1 =~ /^([0-9a-f]{40})$/) { croak("`$gitcmd rev-list` exited " .exit_status_s($?) .", '$treeish' not found, giving up"); } $commit = $1; Log(undef, "Version $treeish is commit $commit"); if ($commit ne $Job->{'script_version'}) { # Record the real commit id in the database, frozentokey, logs, # etc. -- instead of an abbreviation or a branch name which can # become ambiguous or point to a different commit in the future. if (!$Job->update_attributes('script_version' => $commit)) { croak("Error: failed to update job's script_version attribute"); } } $ENV{"CRUNCH_SRC_COMMIT"} = $commit; add_git_archive("$gitcmd archive ''\Q$commit\E"); } my $git_archive = combined_git_archive(); if (!defined $git_archive) { Log(undef, "Skip install phase (no git archive)"); if ($have_slurm) { Log(undef, "Warning: This probably means workers have no source tree!"); } } else { my $install_exited; my $install_script_tries_left = 3; for (my $attempts = 0; $attempts < 3; $attempts++) { Log(undef, "Run install script on all workers"); my @srunargs = ("srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}, "--job-name=$job_id"); my @execargs = ("sh", "-c", "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -"); $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive); my ($install_stderr_r, $install_stderr_w); pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!"); set_nonblocking($install_stderr_r); my $installpid = fork(); if ($installpid == 0) { close($install_stderr_r); fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec open(STDOUT, ">&", $install_stderr_w); open(STDERR, ">&", $install_stderr_w); srun (\@srunargs, \@execargs, {}, $build_script . $git_archive); exit (1); } close($install_stderr_w); # Tell freeze_if_want_freeze how to kill the child, otherwise the # "waitpid(installpid)" loop won't get interrupted by a freeze: $proc{$installpid} = {}; my $stderr_buf = ''; # Track whether anything appears on stderr other than slurm errors # ("srun: ...") and the "starting: ..." message printed by the # srun subroutine itself: my $stderr_anything_from_script = 0; my $match_our_own_errors = '^(srun: error: |starting: \[)'; while ($installpid != waitpid(-1, WNOHANG)) { freeze_if_want_freeze ($installpid); # Wait up to 0.1 seconds for something to appear on stderr, then # do a non-blocking read. my $bits = fhbits($install_stderr_r); select ($bits, undef, $bits, 0.1); if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf))) { while ($stderr_buf =~ /^(.*?)\n/) { my $line = $1; substr $stderr_buf, 0, 1+length($line), ""; Log(undef, "stderr $line"); if ($line !~ /$match_our_own_errors/) { $stderr_anything_from_script = 1; } } } } delete $proc{$installpid}; $install_exited = $?; close($install_stderr_r); if (length($stderr_buf) > 0) { if ($stderr_buf !~ /$match_our_own_errors/) { $stderr_anything_from_script = 1; } Log(undef, "stderr $stderr_buf") } Log (undef, "Install script exited ".exit_status_s($install_exited)); last if $install_exited == 0 || $main::please_freeze; # If the install script fails but doesn't print an error message, # the next thing anyone is likely to do is just run it again in # case it was a transient problem like "slurm communication fails # because the network isn't reliable enough". So we'll just do # that ourselves (up to 3 attempts in total). OTOH, if there is an # error message, the problem is more likely to have a real fix and # we should fail the job so the fixing process can start, instead # of doing 2 more attempts. last if $stderr_anything_from_script; } foreach my $tar_filename (map { tar_filename_n($_); } (1..$git_tar_count)) { unlink($tar_filename); } if ($install_exited != 0) { croak("Giving up"); } } foreach (qw (script script_version script_parameters runtime_constraints)) { Log (undef, "$_ " . (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_})); } foreach (split (/\n/, $Job->{knobs})) { Log (undef, "knob " . $_); } $main::success = undef; ONELEVEL: my $thisround_succeeded = 0; my $thisround_failed = 0; my $thisround_failed_multiple = 0; my $working_slot_count = scalar(@slot); @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level} or $a <=> $b } @jobstep_todo; my $level = $jobstep[$jobstep_todo[0]]->{level}; my $initial_tasks_this_level = 0; foreach my $id (@jobstep_todo) { $initial_tasks_this_level++ if ($jobstep[$id]->{level} == $level); } # If the number of tasks scheduled at this level #T is smaller than the number # of slots available #S, only use the first #T slots, or the first slot on # each node, whichever number is greater. # # When we dispatch tasks later, we'll allocate whole-node resources like RAM # based on these numbers. Using fewer slots makes more resources available # to each individual task, which should normally be a better strategy when # there are fewer of them running with less parallelism. # # Note that this calculation is not redone if the initial tasks at # this level queue more tasks at the same level. This may harm # overall task throughput for that level. my @freeslot; if ($initial_tasks_this_level < @node) { @freeslot = (0..$#node); } elsif ($initial_tasks_this_level < @slot) { @freeslot = (0..$initial_tasks_this_level - 1); } else { @freeslot = (0..$#slot); } my $round_num_freeslots = scalar(@freeslot); my %round_max_slots = (); for (my $ii = $#freeslot; $ii >= 0; $ii--) { my $this_slot = $slot[$freeslot[$ii]]; my $node_name = $this_slot->{node}->{name}; $round_max_slots{$node_name} ||= $this_slot->{cpu}; last if (scalar(keys(%round_max_slots)) >= @node); } Log(undef, "start level $level with $round_num_freeslots slots"); my @holdslot; my %reader; my $progress_is_dirty = 1; my $progress_stats_updated = 0; update_progress_stats(); THISROUND: for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) { # Don't create new tasks if we already know the job's final result. last if defined($main::success); my $id = $jobstep_todo[$todo_ptr]; my $Jobstep = $jobstep[$id]; if ($Jobstep->{level} != $level) { next; } pipe $reader{$id}, "writer" or croak("pipe() failed: $!"); set_nonblocking($reader{$id}); my $childslot = $freeslot[0]; my $childnode = $slot[$childslot]->{node}; my $childslotname = join (".", $slot[$childslot]->{node}->{name}, $slot[$childslot]->{cpu}); my $childpid = fork(); if ($childpid == 0) { $SIG{'INT'} = 'DEFAULT'; $SIG{'QUIT'} = 'DEFAULT'; $SIG{'TERM'} = 'DEFAULT'; foreach (values (%reader)) { close($_); } fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec open(STDOUT,">&writer"); open(STDERR,">&writer"); undef $dbh; undef $sth; delete $ENV{"GNUPGHOME"}; $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'}; $ENV{"TASK_QSEQUENCE"} = $id; $ENV{"TASK_SEQUENCE"} = $level; $ENV{"JOB_SCRIPT"} = $Job->{script}; while (my ($param, $value) = each %{$Job->{script_parameters}}) { $param =~ tr/a-z/A-Z/; $ENV{"JOB_PARAMETER_$param"} = $value; } $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; $ENV{"TASK_WORK"} = $ENV{"CRUNCH_TMP"}."/task/$childslotname"; $ENV{"HOME"} = $ENV{"TASK_WORK"}; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated $ENV{"CRUNCH_NODE_SLOTS"} = $round_max_slots{$ENV{TASK_SLOT_NODE}}; $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; my $keep_mnt = $ENV{"TASK_WORK"}.".keep"; $ENV{"GZIP"} = "-n"; my @srunargs = ( "srun", "--nodelist=".$childnode->{name}, qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'}, "--job-name=$job_id.$id.$$", ); my $stdbuf = " stdbuf --output=0 --error=0 "; my $arv_file_cache = ""; if (defined($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'})) { $arv_file_cache = "--file-cache=" . ($Job->{'runtime_constraints'}->{'keep_cache_mb_per_task'} * 1024 * 1024); } my $command = "if [ -e \Q$ENV{TASK_WORK}\E ]; then rm -rf \Q$ENV{TASK_WORK}\E; fi; " ."mkdir -p \Q$ENV{CRUNCH_TMP}\E \Q$ENV{JOB_WORK}\E \Q$ENV{TASK_WORK}\E \Q$keep_mnt\E " ."&& cd \Q$ENV{CRUNCH_TMP}\E " # These environment variables get used explicitly later in # $command. No tool is expected to read these values directly. .q{&& MEM=$(awk '($1 == "MemTotal:"){print $2}' {arvados_sdk_version}) { $command .= $stdbuf; $command .= "perl - \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E"; } else { $command .= "/bin/sh -c \'python -c " . '"from pkg_resources import get_distribution as get; print \"Using Arvados SDK version\", get(\"arvados-python-client\").version"' . ">&2 2>/dev/null; " . "mkdir -p \"$ENV{JOB_WORK}\" \"$ENV{TASK_WORK}\" && " . "if which stdbuf >/dev/null ; then " . " exec $stdbuf \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . " else " . " exec \Q$ENV{CRUNCH_SRC}/crunch_scripts/$Job->{script}\E ;" . " fi\'"; } } else { # Non-docker run $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 "; $command .= $stdbuf; $command .= "perl - $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; } my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $build_script); # exec() failed, we assume nothing happened. die "srun() failed on build script\n"; } close("writer"); if (!defined $childpid) { close $reader{$id}; delete $reader{$id}; next; } shift @freeslot; $proc{$childpid} = { jobstep => $id, time => time, slot => $childslot, jobstepname => "$job_id.$id.$childpid", }; croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid}; $slot[$childslot]->{pid} = $childpid; Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'}); Log ($id, "child $childpid started on $childslotname"); $Jobstep->{starttime} = time; $Jobstep->{node} = $childnode->{name}; $Jobstep->{slotindex} = $childslot; delete $Jobstep->{stderr}; delete $Jobstep->{finishtime}; delete $Jobstep->{tempfail}; $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime}); $Jobstep->{'arvados_task'}->save; splice @jobstep_todo, $todo_ptr, 1; --$todo_ptr; $progress_is_dirty = 1; while (!@freeslot || ($round_num_freeslots > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze; if ($main::please_info) { $main::please_info = 0; freeze(); create_output_collection(); save_meta(1); update_progress_stats(); } my $gotsome = readfrompipes () + reapchildren (); if (!$gotsome || ($latest_refresh + 2 < scalar time)) { check_refresh_wanted(); check_squeue(); update_progress_stats(); } elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty) { update_progress_stats(); } if (!$gotsome) { select (undef, undef, undef, 0.1); } $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 && $_->{node}->{hold_count} < 4 } @slot); if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) || ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded)) { my $message = "Repeated failure rate too high ($thisround_failed_multiple/" .($thisround_failed+$thisround_succeeded) .") -- giving up on this round"; Log (undef, $message); last THISROUND; } # move slots from freeslot to holdslot (or back to freeslot) if necessary for (my $i=$#freeslot; $i>=0; $i--) { if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) { push @holdslot, (splice @freeslot, $i, 1); } } for (my $i=$#holdslot; $i>=0; $i--) { if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) { push @freeslot, (splice @holdslot, $i, 1); } } # give up if no nodes are succeeding if ($working_slot_count < 1) { Log(undef, "Every node has failed -- giving up"); last THISROUND; } } } push @freeslot, splice @holdslot; map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot); Log (undef, "wait for last ".(scalar keys %proc)." children to finish"); while (%proc) { if ($main::please_continue) { $main::please_continue = 0; goto THISROUND; } $main::please_info = 0, freeze(), create_output_collection(), save_meta(1) if $main::please_info; readfrompipes (); if (!reapchildren()) { check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); killem (keys %proc) if $main::please_freeze; } } update_progress_stats(); freeze_if_want_freeze(); if (!defined $main::success) { if (!@jobstep_todo) { $main::success = 1; } elsif ($working_slot_count < 1) { save_output_collection(); save_meta(); exit(EX_RETRY_UNLOCKED); } elsif ($thisround_succeeded == 0 && ($thisround_failed == 0 || $thisround_failed > 4)) { my $message = "stop because $thisround_failed tasks failed and none succeeded"; Log (undef, $message); $main::success = 0; } } goto ONELEVEL if !defined $main::success; release_allocation(); freeze(); my $collated_output = save_output_collection(); Log (undef, "finish"); save_meta(); my $final_state; if ($collated_output && $main::success) { $final_state = 'Complete'; } else { $final_state = 'Failed'; } $Job->update_attributes('state' => $final_state); exit (($final_state eq 'Complete') ? 0 : 1); sub update_progress_stats { $progress_stats_updated = time; return if !$progress_is_dirty; my ($todo, $done, $running) = (scalar @jobstep_todo, scalar @jobstep_done, scalar keys(%proc)); $Job->{'tasks_summary'} ||= {}; $Job->{'tasks_summary'}->{'todo'} = $todo; $Job->{'tasks_summary'}->{'done'} = $done; $Job->{'tasks_summary'}->{'running'} = $running; $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'}); Log (undef, "status: $done done, $running running, $todo todo"); $progress_is_dirty = 0; } sub reapchildren { my $pid = waitpid (-1, WNOHANG); return 0 if $pid <= 0; my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name} . "." . $slot[$proc{$pid}->{slot}]->{cpu}); my $jobstepid = $proc{$pid}->{jobstep}; my $elapsed = time - $proc{$pid}->{time}; my $Jobstep = $jobstep[$jobstepid]; my $childstatus = $?; my $exitvalue = $childstatus >> 8; my $exitinfo = "exit ".exit_status_s($childstatus); $Jobstep->{'arvados_task'}->reload; my $task_success = $Jobstep->{'arvados_task'}->{success}; Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success"); if (!defined $task_success) { # task did not indicate one way or the other --> fail Log($jobstepid, sprintf( "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.", exit_status_s($childstatus))); $Jobstep->{'arvados_task'}->{success} = 0; $Jobstep->{'arvados_task'}->save; $task_success = 0; } if (!$task_success) { my $temporary_fail; $temporary_fail ||= $Jobstep->{tempfail}; $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL); ++$thisround_failed; ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1; # Check for signs of a failed or misconfigured node if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >= 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) { # Don't count this against jobstep failure thresholds if this # node is already suspected faulty and srun exited quickly if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} && $elapsed < 5) { Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name}); $temporary_fail ||= 1; } ban_node_by_slot($proc{$pid}->{slot}); } Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds', ++$Jobstep->{'failures'}, $temporary_fail ? 'temporary' : 'permanent', $elapsed)); if (!$temporary_fail || $Jobstep->{'failures'} >= 3) { # Give up on this task, and the whole job $main::success = 0; } # Put this task back on the todo queue push @jobstep_todo, $jobstepid; $Job->{'tasks_summary'}->{'failed'}++; } else { ++$thisround_succeeded; $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0; $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0; $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0; push @jobstep_done, $jobstepid; Log ($jobstepid, "success in $elapsed seconds"); } $Jobstep->{exitcode} = $childstatus; $Jobstep->{finishtime} = time; $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime}); $Jobstep->{'arvados_task'}->save; process_stderr ($jobstepid, $task_success); Log ($jobstepid, sprintf("task output (%d bytes): %s", length($Jobstep->{'arvados_task'}->{output}), $Jobstep->{'arvados_task'}->{output})); close $reader{$jobstepid}; delete $reader{$jobstepid}; delete $slot[$proc{$pid}->{slot}]->{pid}; push @freeslot, $proc{$pid}->{slot}; delete $proc{$pid}; if ($task_success) { # Load new tasks my $newtask_list = []; my $newtask_results; do { $newtask_results = api_call( "job_tasks/list", 'where' => { 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid} }, 'order' => 'qsequence', 'offset' => scalar(@$newtask_list), ); push(@$newtask_list, @{$newtask_results->{items}}); } while (@{$newtask_results->{items}}); foreach my $arvados_task (@$newtask_list) { my $jobstep = { 'level' => $arvados_task->{'sequence'}, 'failures' => 0, 'arvados_task' => $arvados_task }; push @jobstep, $jobstep; push @jobstep_todo, $#jobstep; } } $progress_is_dirty = 1; 1; } sub check_refresh_wanted { my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; if (@stat && $stat[9] > $latest_refresh) { $latest_refresh = scalar time; my $Job2 = api_call("jobs/get", uuid => $jobspec); for my $attr ('cancelled_at', 'cancelled_by_user_uuid', 'cancelled_by_client_uuid', 'state') { $Job->{$attr} = $Job2->{$attr}; } if ($Job->{'state'} ne "Running") { if ($Job->{'state'} eq "Cancelled") { Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'}); } else { Log (undef, "Job state unexpectedly changed to " . $Job->{'state'}); } $main::success = 0; $main::please_freeze = 1; } } } sub check_squeue { my $last_squeue_check = $squeue_checked; # Do not call `squeue` or check the kill list more than once every # 15 seconds. return if $last_squeue_check > time - 15; $squeue_checked = time; # Look for children from which we haven't received stderr data since # the last squeue check. If no such children exist, all procs are # alive and there's no need to even look at squeue. # # As long as the crunchstat poll interval (10s) is shorter than the # squeue check interval (15s) this should make the squeue check an # infrequent event. my $silent_procs = 0; for my $procinfo (values %proc) { my $jobstep = $jobstep[$procinfo->{jobstep}]; if ($jobstep->{stderr_at} < $last_squeue_check) { $silent_procs++; } } return if $silent_procs == 0; # use killem() on procs whose killtime is reached while (my ($pid, $procinfo) = each %proc) { my $jobstep = $jobstep[$procinfo->{jobstep}]; if (exists $procinfo->{killtime} && $procinfo->{killtime} <= time && $jobstep->{stderr_at} < $last_squeue_check) { my $sincewhen = ""; if ($jobstep->{stderr_at}) { $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s"; } Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)"); killem ($pid); } } if (!$have_slurm) { # here is an opportunity to check for mysterious problems with local procs return; } # Get a list of steps still running. Note: squeue(1) says --steps # selects a format (which we override anyway) and allows us to # specify which steps we're interested in (which we don't). # Importantly, it also changes the meaning of %j from "job name" to # "step name" and (although this isn't mentioned explicitly in the # docs) switches from "one line per job" mode to "one line per step" # mode. Without it, we'd just get a list of one job, instead of a # list of N steps. my @squeue = `squeue --jobs=\Q$ENV{SLURM_JOB_ID}\E --steps --format='%j' --noheader`; if ($? != 0) { Log(undef, "warning: squeue exit status $? ($!)"); return; } chop @squeue; # which of my jobsteps are running, according to squeue? my %ok; for my $jobstepname (@squeue) { $ok{$jobstepname} = 1; } # Check for child procs >60s old and not mentioned by squeue. while (my ($pid, $procinfo) = each %proc) { if ($procinfo->{time} < time - 60 && $procinfo->{jobstepname} && !exists $ok{$procinfo->{jobstepname}} && !exists $procinfo->{killtime}) { # According to slurm, this task has ended (successfully or not) # -- but our srun child hasn't exited. First we must wait (30 # seconds) in case this is just a race between communication # channels. Then, if our srun child process still hasn't # terminated, we'll conclude some slurm communication # error/delay has caused the task to die without notifying srun, # and we'll kill srun ourselves. $procinfo->{killtime} = time + 30; Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited"); } } } sub release_allocation { if ($have_slurm) { Log (undef, "release job allocation"); system "scancel $ENV{SLURM_JOB_ID}"; } } sub readfrompipes { my $gotsome = 0; foreach my $job (keys %reader) { my $buf; if (0 < sysread ($reader{$job}, $buf, 65536)) { print STDERR $buf if $ENV{CRUNCH_DEBUG}; $jobstep[$job]->{stderr_at} = time; $jobstep[$job]->{stderr} .= $buf; # Consume everything up to the last \n preprocess_stderr ($job); if (length ($jobstep[$job]->{stderr}) > 16384) { # If we get a lot of stderr without a newline, chop off the # front to avoid letting our buffer grow indefinitely. substr ($jobstep[$job]->{stderr}, 0, length($jobstep[$job]->{stderr}) - 8192) = ""; } $gotsome = 1; } } return $gotsome; } sub preprocess_stderr { my $job = shift; while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) { my $line = $1; substr $jobstep[$job]->{stderr}, 0, 1+length($line), ""; Log ($job, "stderr $line"); if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) { # whoa. $main::please_freeze = 1; } elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) { my $job_slot_index = $jobstep[$job]->{slotindex}; $slot[$job_slot_index]->{node}->{fail_count}++; $jobstep[$job]->{tempfail} = 1; ban_node_by_slot($job_slot_index); } elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) { $jobstep[$job]->{tempfail} = 1; ban_node_by_slot($jobstep[$job]->{slotindex}); } elsif ($line =~ /\bKeep(Read|Write|Request)Error:/) { $jobstep[$job]->{tempfail} = 1; } } } sub process_stderr { my $job = shift; my $task_success = shift; preprocess_stderr ($job); map { Log ($job, "stderr $_"); } split ("\n", $jobstep[$job]->{stderr}); } sub fetch_block { my $hash = shift; my $keep; if (!open($keep, "-|", "arv-get", "--retries", retry_count(), $hash)) { Log(undef, "fetch_block run error from arv-get $hash: $!"); return undef; } my $output_block = ""; while (1) { my $buf; my $bytes = sysread($keep, $buf, 1024 * 1024); if (!defined $bytes) { Log(undef, "fetch_block read error from arv-get: $!"); $output_block = undef; last; } elsif ($bytes == 0) { # sysread returns 0 at the end of the pipe. last; } else { # some bytes were read into buf. $output_block .= $buf; } } close $keep; if ($?) { Log(undef, "fetch_block arv-get exited " . exit_status_s($?)); $output_block = undef; } return $output_block; } # Create a collection by concatenating the output of all tasks (each # task's output is either a manifest fragment, a locator for a # manifest fragment stored in Keep, or nothing at all). Return the # portable_data_hash of the new collection. sub create_output_collection { Log (undef, "collate"); my ($child_out, $child_in); my $pid = open2($child_out, $child_in, 'python', '-c', q{ import arvados import sys print (arvados.api("v1").collections(). create(body={"manifest_text": sys.stdin.read()}). execute(num_retries=int(sys.argv[1]))["portable_data_hash"]) }, retry_count()); my $task_idx = -1; my $manifest_size = 0; for (@jobstep) { ++$task_idx; my $output = $_->{'arvados_task'}->{output}; next if (!defined($output)); my $next_write; if ($output =~ /^[0-9a-f]{32}(\+\S+)*$/) { $next_write = fetch_block($output); } else { $next_write = $output; } if (defined($next_write)) { if (!defined(syswrite($child_in, $next_write))) { # There's been an error writing. Stop the loop. # We'll log details about the exit code later. last; } else { $manifest_size += length($next_write); } } else { my $uuid = $_->{'arvados_task'}->{'uuid'}; Log (undef, "Error retrieving '$output' output by task $task_idx ($uuid)"); $main::success = 0; } } close($child_in); Log(undef, "collated output manifest text to send to API server is $manifest_size bytes with access tokens"); my $joboutput; my $s = IO::Select->new($child_out); if ($s->can_read(120)) { sysread($child_out, $joboutput, 1024 * 1024); waitpid($pid, 0); if ($?) { Log(undef, "output collection creation exited " . exit_status_s($?)); $joboutput = undef; } else { chomp($joboutput); } } else { Log (undef, "timed out while creating output collection"); foreach my $signal (2, 2, 2, 15, 15, 9) { kill($signal, $pid); last if waitpid($pid, WNOHANG) == -1; sleep(1); } } close($child_out); return $joboutput; } # Calls create_output_collection, logs the result, and returns it. # If that was successful, save that as the output in the job record. sub save_output_collection { my $collated_output = create_output_collection(); if (!$collated_output) { Log(undef, "Failed to write output collection"); } else { Log(undef, "job output $collated_output"); $Job->update_attributes('output' => $collated_output); } return $collated_output; } sub killem { foreach (@_) { my $sig = 2; # SIGINT first if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 15; # SIGTERM if SIGINT doesn't work } if (exists $proc{$_}->{"sent_$sig"} && time - $proc{$_}->{"sent_$sig"} > 4) { $sig = 9; # SIGKILL if SIGTERM doesn't work } if (!exists $proc{$_}->{"sent_$sig"}) { Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_"); kill $sig, $_; select (undef, undef, undef, 0.1); if ($sig == 2) { kill $sig, $_; # srun wants two SIGINT to really interrupt } $proc{$_}->{"sent_$sig"} = time; $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"}; } } } sub fhbits { my($bits); for (@_) { vec($bits,fileno($_),1) = 1; } $bits; } # Send log output to Keep via arv-put. # # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe. # $log_pipe_out_buf is a string containing all output read from arv-put so far. # $log_pipe_out_select is an IO::Select object around $log_pipe_out. # $log_pipe_pid is the pid of the arv-put subprocess. # # The only functions that should access these variables directly are: # # log_writer_start($logfilename) # Starts an arv-put pipe, reading data on stdin and writing it to # a $logfilename file in an output collection. # # log_writer_read_output([$timeout]) # Read output from $log_pipe_out and append it to $log_pipe_out_buf. # Passes $timeout to the select() call, with a default of 0.01. # Returns the result of the last read() call on $log_pipe_out, or # -1 if read() wasn't called because select() timed out. # Only other log_writer_* functions should need to call this. # # log_writer_send($txt) # Writes $txt to the output log collection. # # log_writer_finish() # Closes the arv-put pipe and returns the output that it produces. # # log_writer_is_active() # Returns a true value if there is currently a live arv-put # process, false otherwise. # my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select, $log_pipe_pid); sub log_writer_start($) { my $logfilename = shift; $log_pipe_pid = open2($log_pipe_out, $log_pipe_in, 'arv-put', '--stream', '--retries', '3', '--filename', $logfilename, '-'); $log_pipe_out_buf = ""; $log_pipe_out_select = IO::Select->new($log_pipe_out); } sub log_writer_read_output { my $timeout = shift || 0.01; my $read = -1; while ($read && $log_pipe_out_select->can_read($timeout)) { $read = read($log_pipe_out, $log_pipe_out_buf, 65536, length($log_pipe_out_buf)); } if (!defined($read)) { Log(undef, "error reading log manifest from arv-put: $!"); } return $read; } sub log_writer_send($) { my $txt = shift; print $log_pipe_in $txt; log_writer_read_output(); } sub log_writer_finish() { return unless $log_pipe_pid; close($log_pipe_in); my $logger_failed = 0; my $read_result = log_writer_read_output(120); if ($read_result == -1) { $logger_failed = -1; Log (undef, "timed out reading from 'arv-put'"); } elsif ($read_result != 0) { $logger_failed = -2; Log(undef, "failed to read arv-put log manifest to EOF"); } waitpid($log_pipe_pid, 0); if ($?) { $logger_failed ||= $?; Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?)) } close($log_pipe_out); my $arv_put_output = $logger_failed ? undef : $log_pipe_out_buf; $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf = $log_pipe_out_select = undef; return $arv_put_output; } sub log_writer_is_active() { return $log_pipe_pid; } sub Log # ($jobstep_id, $logmessage) { if ($_[1] =~ /\n/) { for my $line (split (/\n/, $_[1])) { Log ($_[0], $line); } return; } my $fh = select STDERR; $|=1; select $fh; my $message = sprintf ("%s %d %s %s", $job_id, $$, @_); $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge; $message .= "\n"; my $datetime; if (log_writer_is_active() || -t STDERR) { my @gmtime = gmtime; $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]); } print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message); if (log_writer_is_active()) { log_writer_send($datetime . " " . $message); } } sub croak { my ($package, $file, $line) = caller; my $message = "@_ at $file line $line\n"; Log (undef, $message); freeze() if @jobstep_todo; create_output_collection() if @jobstep_todo; cleanup(); save_meta(); die; } sub cleanup { return unless $Job; if ($Job->{'state'} eq 'Cancelled') { $Job->update_attributes('finished_at' => scalar gmtime); } else { $Job->update_attributes('state' => 'Failed'); } } sub save_meta { my $justcheckpoint = shift; # false if this will be the last meta saved return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm return unless log_writer_is_active(); my $log_manifest = log_writer_finish(); return unless defined($log_manifest); if ($Job->{log}) { my $prev_log_coll = api_call("collections/get", uuid => $Job->{log}); $log_manifest = $prev_log_coll->{manifest_text} . $log_manifest; } my $log_coll = api_call( "collections/create", ensure_unique_name => 1, collection => { manifest_text => $log_manifest, owner_uuid => $Job->{owner_uuid}, name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}), }); Log(undef, "log collection is " . $log_coll->{portable_data_hash}); $Job->update_attributes('log' => $log_coll->{portable_data_hash}); } sub freeze_if_want_freeze { if ($main::please_freeze) { release_allocation(); if (@_) { # kill some srun procs before freeze+stop map { $proc{$_} = {} } @_; while (%proc) { killem (keys %proc); select (undef, undef, undef, 0.1); my $died; while (($died = waitpid (-1, WNOHANG)) > 0) { delete $proc{$died}; } } } freeze(); create_output_collection(); cleanup(); save_meta(); exit 1; } } sub freeze { Log (undef, "Freeze not implemented"); return; } sub thaw { croak ("Thaw not implemented"); } sub freezequote { my $s = shift; $s =~ s/\\/\\\\/g; $s =~ s/\n/\\n/g; return $s; } sub freezeunquote { my $s = shift; $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge; return $s; } sub srun { my $srunargs = shift; my $execargs = shift; my $opts = shift || {}; my $stdin = shift; my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs; $Data::Dumper::Terse = 1; $Data::Dumper::Indent = 0; my $show_cmd = Dumper($args); $show_cmd =~ s/(TOKEN\\*=)[^\s\']+/${1}[...]/g; $show_cmd =~ s/\n/ /g; if ($opts->{fork}) { Log(undef, "starting: $show_cmd"); } else { # This is a child process: parent is in charge of reading our # stderr and copying it to Log() if needed. warn "starting: $show_cmd\n"; } if (defined $stdin) { my $child = open STDIN, "-|"; defined $child or die "no fork: $!"; if ($child == 0) { print $stdin or die $!; close STDOUT or die $!; exit 0; } } return system (@$args) if $opts->{fork}; exec @$args; warn "ENV size is ".length(join(" ",%ENV)); die "exec failed: $!: @$args"; } sub ban_node_by_slot { # Don't start any new jobsteps on this node for 60 seconds my $slotid = shift; $slot[$slotid]->{node}->{hold_until} = 60 + scalar time; $slot[$slotid]->{node}->{hold_count}++; Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds"); } sub must_lock_now { my ($lockfile, $error_message) = @_; open L, ">", $lockfile or croak("$lockfile: $!"); if (!flock L, LOCK_EX|LOCK_NB) { croak("Can't lock $lockfile: $error_message\n"); } } sub find_docker_image { # Given a Keep locator, check to see if it contains a Docker image. # If so, return its stream name and Docker hash. # If not, return undef for both values. my $locator = shift; my ($streamname, $filename); my $image = api_call("collections/get", uuid => $locator); if ($image) { foreach my $line (split(/\n/, $image->{manifest_text})) { my @tokens = split(/\s+/, $line); next if (!@tokens); $streamname = shift(@tokens); foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) { if (defined($filename)) { return (undef, undef); # More than one file in the Collection. } else { $filename = (split(/:/, $filedata, 3))[2]; } } } } if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) { return ($streamname, $1); } else { return (undef, undef); } } sub retry_count { # Calculate the number of times an operation should be retried, # assuming exponential backoff, and that we're willing to retry as # long as tasks have been running. Enforce a minimum of 3 retries. my ($starttime, $endtime, $timediff, $retries); if (@jobstep) { $starttime = $jobstep[0]->{starttime}; $endtime = $jobstep[-1]->{finishtime}; } if (!defined($starttime)) { $timediff = 0; } elsif (!defined($endtime)) { $timediff = time - $starttime; } else { $timediff = ($endtime - $starttime) - (time - $endtime); } if ($timediff > 0) { $retries = int(log($timediff) / log(2)); } else { $retries = 1; # Use the minimum. } return ($retries > 3) ? $retries : 3; } sub retry_op { # Pass in two function references. # This method will be called with the remaining arguments. # If it dies, retry it with exponential backoff until it succeeds, # or until the current retry_count is exhausted. After each failure # that can be retried, the second function will be called with # the current try count (0-based), next try time, and error message. my $operation = shift; my $retry_callback = shift; my $retries = retry_count(); foreach my $try_count (0..$retries) { my $next_try = time + (2 ** $try_count); my $result = eval { $operation->(@_); }; if (!$@) { return $result; } elsif ($try_count < $retries) { $retry_callback->($try_count, $next_try, $@); my $sleep_time = $next_try - time; sleep($sleep_time) if ($sleep_time > 0); } } # Ensure the error message ends in a newline, so Perl doesn't add # retry_op's line number to it. chomp($@); die($@ . "\n"); } sub api_call { # Pass in a /-separated API method name, and arguments for it. # This function will call that method, retrying as needed until # the current retry_count is exhausted, with a log on the first failure. my $method_name = shift; my $log_api_retry = sub { my ($try_count, $next_try_at, $errmsg) = @_; $errmsg =~ s/\s*\bat \Q$0\E line \d+\.?\s*//; $errmsg =~ s/\s/ /g; $errmsg =~ s/\s+$//; my $retry_msg; if ($next_try_at < time) { $retry_msg = "Retrying."; } else { my $next_try_fmt = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($next_try_at); $retry_msg = "Retrying at $next_try_fmt."; } Log(undef, "API method $method_name failed: $errmsg. $retry_msg"); }; my $method = $arv; foreach my $key (split(/\//, $method_name)) { $method = $method->{$key}; } return retry_op(sub { $method->execute(@_); }, $log_api_retry, @_); } sub exit_status_s { # Given a $?, return a human-readable exit code string like "0" or # "1" or "0 with signal 1" or "1 with signal 11". my $exitcode = shift; my $s = $exitcode >> 8; if ($exitcode & 0x7f) { $s .= " with signal " . ($exitcode & 0x7f); } if ($exitcode & 0x80) { $s .= " with core dump"; } return $s; } sub handle_readall { # Pass in a glob reference to a file handle. # Read all its contents and return them as a string. my $fh_glob_ref = shift; local $/ = undef; return <$fh_glob_ref>; } sub tar_filename_n { my $n = shift; return sprintf("%s/git.%s.%d.tar", $ENV{CRUNCH_TMP}, $job_id, $n); } sub add_git_archive { # Pass in a git archive command as a string or list, a la system(). # This method will save its output to be included in the archive sent to the # build script. my $git_input; $git_tar_count++; if (!open(GIT_ARCHIVE, ">", tar_filename_n($git_tar_count))) { croak("Failed to save git archive: $!"); } my $git_pid = open2(">&GIT_ARCHIVE", $git_input, @_); close($git_input); waitpid($git_pid, 0); close(GIT_ARCHIVE); if ($?) { croak("Failed to save git archive: git exited " . exit_status_s($?)); } } sub combined_git_archive { # Combine all saved tar archives into a single archive, then return its # contents in a string. Return undef if no archives have been saved. if ($git_tar_count < 1) { return undef; } my $base_tar_name = tar_filename_n(1); foreach my $tar_to_append (map { tar_filename_n($_); } (2..$git_tar_count)) { my $tar_exit = system("tar", "-Af", $base_tar_name, $tar_to_append); if ($tar_exit != 0) { croak("Error preparing build archive: tar -A exited " . exit_status_s($tar_exit)); } } if (!open(GIT_TAR, "<", $base_tar_name)) { croak("Could not open build archive: $!"); } my $tar_contents = handle_readall(\*GIT_TAR); close(GIT_TAR); return $tar_contents; } sub set_nonblocking { my $fh = shift; my $flags = fcntl ($fh, F_GETFL, 0) or croak ($!); fcntl ($fh, F_SETFL, $flags | O_NONBLOCK) or croak ($!); } __DATA__ #!/usr/bin/env perl # # This is crunch-job's internal dispatch script. crunch-job running on the API # server invokes this script on individual compute nodes, or localhost if we're # running a job locally. It gets called in two modes: # # * No arguments: Installation mode. Read a tar archive from the DATA # file handle; it includes the Crunch script's source code, and # maybe SDKs as well. Those should be installed in the proper # locations. This runs outside of any Docker container, so don't try to # introspect Crunch's runtime environment. # # * With arguments: Crunch script run mode. This script should set up the # environment, then run the command specified in the arguments. This runs # inside any Docker container. use Fcntl ':flock'; use File::Path qw( make_path remove_tree ); use POSIX qw(getcwd); use constant TASK_TEMPFAIL => 111; # Map SDK subdirectories to the path environments they belong to. my %SDK_ENVVARS = ("perl/lib" => "PERLLIB", "ruby/lib" => "RUBYLIB"); my $destdir = $ENV{"CRUNCH_SRC"}; my $archive_hash = $ENV{"CRUNCH_GIT_ARCHIVE_HASH"}; my $repo = $ENV{"CRUNCH_SRC_URL"}; my $install_dir = $ENV{"CRUNCH_INSTALL"} || (getcwd() . "/opt"); my $job_work = $ENV{"JOB_WORK"}; my $task_work = $ENV{"TASK_WORK"}; open(STDOUT_ORIG, ">&", STDOUT); open(STDERR_ORIG, ">&", STDERR); for my $dir ($destdir, $job_work, $task_work) { if ($dir) { make_path $dir; -e $dir or die "Failed to create temporary directory ($dir): $!"; } } if ($task_work) { remove_tree($task_work, {keep_root => 1}); } ### Crunch script run mode if (@ARGV) { # We want to do routine logging during task 0 only. This gives the user # the information they need, but avoids repeating the information for every # task. my $Log; if ($ENV{TASK_SEQUENCE} eq "0") { $Log = sub { my $msg = shift; printf STDERR_ORIG "[Crunch] $msg\n", @_; }; } else { $Log = sub { }; } my $python_src = "$install_dir/python"; my $venv_dir = "$job_work/.arvados.venv"; my $venv_built = -e "$venv_dir/bin/activate"; if ((!$venv_built) and (-d $python_src) and can_run("virtualenv")) { shell_or_die(undef, "virtualenv", "--quiet", "--system-site-packages", "--python=python2.7", $venv_dir); shell_or_die(TASK_TEMPFAIL, "$venv_dir/bin/pip", "--quiet", "install", "-I", $python_src); $venv_built = 1; $Log->("Built Python SDK virtualenv"); } my @pysdk_version_cmd = ("python", "-c", "from pkg_resources import get_distribution as get; print get('arvados-python-client').version"); if ($venv_built) { $Log->("Running in Python SDK virtualenv"); @pysdk_version_cmd = (); my $orig_argv = join(" ", map { quotemeta($_); } @ARGV); @ARGV = ("/bin/sh", "-ec", ". \Q$venv_dir/bin/activate\E; exec $orig_argv"); } elsif (-d $python_src) { $Log->("Warning: virtualenv not found inside Docker container default " . "\$PATH. Can't install Python SDK."); } if (@pysdk_version_cmd) { open(my $pysdk_version_pipe, "-|", @pysdk_version_cmd); my $pysdk_version = <$pysdk_version_pipe>; close($pysdk_version_pipe); if ($? == 0) { chomp($pysdk_version); $Log->("Using Arvados SDK version $pysdk_version"); } else { # A lot could've gone wrong here, but pretty much all of it means that # Python won't be able to load the Arvados SDK. $Log->("Warning: Arvados SDK not found"); } } while (my ($sdk_dir, $sdk_envkey) = each(%SDK_ENVVARS)) { my $sdk_path = "$install_dir/$sdk_dir"; if (-d $sdk_path) { if ($ENV{$sdk_envkey}) { $ENV{$sdk_envkey} = "$sdk_path:" . $ENV{$sdk_envkey}; } else { $ENV{$sdk_envkey} = $sdk_path; } $Log->("Arvados SDK added to %s", $sdk_envkey); } } exec(@ARGV); die "Cannot exec `@ARGV`: $!"; } ### Installation mode open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; if (readlink ("$destdir.archive_hash") eq $archive_hash && -d $destdir) { # This exact git archive (source + arvados sdk) is already installed # here, so there's no need to reinstall it. # We must consume our DATA section, though: otherwise the process # feeding it to us will get SIGPIPE. my $buf; while (read(DATA, $buf, 65536)) { } exit(0); } unlink "$destdir.archive_hash"; mkdir $destdir; do { # Ignore SIGPIPE: we check retval of close() instead. See perlipc(1). local $SIG{PIPE} = "IGNORE"; warn "Extracting archive: $archive_hash\n"; # --ignore-zeros is necessary sometimes: depending on how much NUL # padding tar -A put on our combined archive (which in turn depends # on the length of the component archives) tar without # --ignore-zeros will exit before consuming stdin and cause close() # to fail on the resulting SIGPIPE. if (!open(TARX, "|-", "tar", "--ignore-zeros", "-xC", $destdir)) { die "Error launching 'tar -xC $destdir': $!"; } # If we send too much data to tar in one write (> 4-5 MiB), it stops, and we # get SIGPIPE. We must feed it data incrementally. my $tar_input; while (read(DATA, $tar_input, 65536)) { print TARX $tar_input; } if(!close(TARX)) { die "'tar -xC $destdir' exited $?: $!"; } }; mkdir $install_dir; my $sdk_root = "$destdir/.arvados.sdk/sdk"; if (-d $sdk_root) { foreach my $sdk_lang (("python", map { (split /\//, $_, 2)[0]; } keys(%SDK_ENVVARS))) { if (-d "$sdk_root/$sdk_lang") { if (!rename("$sdk_root/$sdk_lang", "$install_dir/$sdk_lang")) { die "Failed to install $sdk_lang SDK: $!"; } } } } my $python_dir = "$install_dir/python"; if ((-d $python_dir) and can_run("python2.7")) { open(my $egg_info_pipe, "-|", "python2.7 \Q$python_dir/setup.py\E egg_info 2>&1 >/dev/null"); my @egg_info_errors = <$egg_info_pipe>; close($egg_info_pipe); if ($?) { if (@egg_info_errors and (($egg_info_errors[-1] =~ /\bgit\b/) or ($egg_info_errors[-1] =~ /\[Errno 2\]/))) { # egg_info apparently failed because it couldn't ask git for a build tag. # Specify no build tag. open(my $pysdk_cfg, ">>", "$python_dir/setup.cfg"); print $pysdk_cfg "\n[egg_info]\ntag_build =\n"; close($pysdk_cfg); } else { my $egg_info_exit = $? >> 8; foreach my $errline (@egg_info_errors) { warn $errline; } warn "python setup.py egg_info failed: exit $egg_info_exit"; exit ($egg_info_exit || 1); } } } # Hide messages from the install script (unless it fails: shell_or_die # will show $destdir.log in that case). open(STDOUT, ">>", "$destdir.log"); open(STDERR, ">&", STDOUT); if (-e "$destdir/crunch_scripts/install") { shell_or_die (undef, "$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") { # Old version shell_or_die (undef, "./tests/autotests.sh", $install_dir); } elsif (-e "./install.sh") { shell_or_die (undef, "./install.sh", $install_dir); } if ($archive_hash) { unlink "$destdir.archive_hash.new"; symlink ($archive_hash, "$destdir.archive_hash.new") or die "$destdir.archive_hash.new: $!"; rename ("$destdir.archive_hash.new", "$destdir.archive_hash") or die "$destdir.archive_hash: $!"; } close L; sub can_run { my $command_name = shift; open(my $which, "-|", "which", $command_name); while (<$which>) { } close($which); return ($? == 0); } sub shell_or_die { my $exitcode = shift; if ($ENV{"DEBUG"}) { print STDERR "@_\n"; } if (system (@_) != 0) { my $err = $!; my $code = $?; my $exitstatus = sprintf("exit %d signal %d", $code >> 8, $code & 0x7f); open STDERR, ">&STDERR_ORIG"; system ("cat $destdir.log >&2"); warn "@_ failed ($err): $exitstatus"; if (defined($exitcode)) { exit $exitcode; } else { exit (($code >> 8) || 1); } } } __DATA__