2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
134 $main::ENV{CRUNCH_DEBUG} = 1;
138 $main::ENV{CRUNCH_DEBUG} = 0;
143 my $arv = Arvados->new('apiVersion' => 'v1');
146 my $User = $arv->{'users'}->{'current'}->execute;
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'success'} ne undef) {
165 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
168 if ($Job->{'running'}) {
169 Log(undef, "Job 'running' flag is already set");
172 if ($Job->{'started_at'}) {
173 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
180 $Job = JSON::decode_json($jobspec);
184 map { croak ("No $_ specified") unless $Job->{$_} }
185 qw(script script_version script_parameters);
188 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
189 $Job->{'started_at'} = gmtime;
191 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
195 $job_id = $Job->{'uuid'};
197 my $keep_logfile = $job_id . '.log.txt';
198 $local_logfile = File::Temp->new();
200 $Job->{'runtime_constraints'} ||= {};
201 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
202 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
205 Log (undef, "check slurm allocation");
208 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
212 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
213 push @sinfo, "$localcpus localhost";
215 if (exists $ENV{SLURM_NODELIST})
217 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
221 my ($ncpus, $slurm_nodelist) = split;
222 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
225 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
228 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
231 foreach (split (",", $ranges))
244 push @nodelist, map {
246 $n =~ s/\[[-,\d]+\]/$_/;
253 push @nodelist, $nodelist;
256 foreach my $nodename (@nodelist)
258 Log (undef, "node $nodename - $ncpus slots");
259 my $node = { name => $nodename,
263 foreach my $cpu (1..$ncpus)
265 push @slot, { node => $node,
269 push @node, @nodelist;
274 # Ensure that we get one jobstep running on each allocated node before
275 # we start overloading nodes with concurrent steps
277 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
284 # Claim this job, and make sure nobody else does
285 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
286 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
287 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
290 $Job->update_attributes('started_at' => scalar gmtime,
293 'tasks_summary' => { 'failed' => 0,
300 Log (undef, "start");
301 $SIG{'INT'} = sub { $main::please_freeze = 1; };
302 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
303 $SIG{'TERM'} = \&croak;
304 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
305 $SIG{'ALRM'} = sub { $main::please_info = 1; };
306 $SIG{'CONT'} = sub { $main::please_continue = 1; };
307 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
309 $main::please_freeze = 0;
310 $main::please_info = 0;
311 $main::please_continue = 0;
312 $main::please_refresh = 0;
313 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
315 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
316 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
317 $ENV{"JOB_UUID"} = $job_id;
321 my @jobstep_todo = ();
322 my @jobstep_done = ();
323 my @jobstep_tomerge = ();
324 my $jobstep_tomerge_level = 0;
326 my $squeue_kill_checked;
327 my $output_in_keep = 0;
328 my $latest_refresh = scalar time;
332 if (defined $Job->{thawedfromkey})
334 thaw ($Job->{thawedfromkey});
338 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
339 'job_uuid' => $Job->{'uuid'},
344 push @jobstep, { 'level' => 0,
346 'arvados_task' => $first_task,
348 push @jobstep_todo, 0;
354 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
361 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
363 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
366 if (!defined $no_clear_tmp) {
367 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
368 system($clear_tmp_cmd) == 0
369 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
371 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
372 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
374 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
375 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
376 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
378 or croak ("setup.py in $src_path failed: exit ".($?>>8));
386 $build_script = <DATA>;
388 Log (undef, "Install revision ".$Job->{script_version});
389 my $nodelist = join(",", @node);
391 if (!defined $no_clear_tmp) {
392 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
394 my $cleanpid = fork();
397 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
398 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
403 last if $cleanpid == waitpid (-1, WNOHANG);
404 freeze_if_want_freeze ($cleanpid);
405 select (undef, undef, undef, 0.1);
407 Log (undef, "Clean-work-dir exited $?");
410 # Install requested code version
413 my @srunargs = ("srun",
414 "--nodelist=$nodelist",
415 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
417 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
418 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
422 my $treeish = $Job->{'script_version'};
424 # If we're running under crunch-dispatch, it will have pulled the
425 # appropriate source tree into its own repository, and given us that
426 # repo's path as $git_dir. If we're running a "local" job, and a
427 # script_version was specified, it's up to the user to provide the
428 # full path to a local repository in Job->{repository}.
430 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
431 # git-archive --remote where appropriate.
433 # TODO: Accept a locally-hosted Arvados repository by name or
434 # UUID. Use arvados.v1.repositories.list or .get to figure out the
435 # appropriate fetch-url.
436 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
438 $ENV{"CRUNCH_SRC_URL"} = $repo;
440 if (-d "$repo/.git") {
441 # We were given a working directory, but we are only interested in
443 $repo = "$repo/.git";
446 # If this looks like a subversion r#, look for it in git-svn commit messages
448 if ($treeish =~ m{^\d{1,4}$}) {
449 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
451 Log(undef, "git Subversion search exited $?");
452 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
454 Log(undef, "Using commit $commit for Subversion revision $treeish");
458 # If that didn't work, try asking git to look it up as a tree-ish.
460 if (!defined $commit) {
461 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
463 Log(undef, "git rev-list exited $? with result '$found'");
464 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
466 Log(undef, "Using commit $commit for tree-ish $treeish");
467 if ($commit ne $treeish) {
468 # Make sure we record the real commit id in the database,
469 # frozentokey, logs, etc. -- instead of an abbreviation or a
470 # branch name which can become ambiguous or point to a
471 # different commit in the future.
472 $Job->{'script_version'} = $commit;
474 $Job->update_attributes('script_version' => $commit) or
475 croak("Error while updating job");
480 if (defined $commit) {
481 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
482 @execargs = ("sh", "-c",
483 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
484 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
485 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
488 croak ("could not figure out commit id for $treeish");
491 # Note: this section is almost certainly unnecessary if we're
492 # running tasks in docker containers.
493 my $installpid = fork();
494 if ($installpid == 0)
496 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
501 last if $installpid == waitpid (-1, WNOHANG);
502 freeze_if_want_freeze ($installpid);
503 select (undef, undef, undef, 0.1);
505 Log (undef, "Install exited $?");
510 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
511 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
514 # If this job requires a Docker image, install that.
515 my $docker_bin = "/usr/bin/docker.io";
516 my ($docker_locator, $docker_stream, $docker_hash);
517 if ($docker_locator = $Job->{docker_image_locator}) {
518 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
521 croak("No Docker image hash found from locator $docker_locator");
523 $docker_stream =~ s/^\.//;
524 my $docker_install_script = qq{
525 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
526 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
529 my $docker_pid = fork();
530 if ($docker_pid == 0)
532 srun (["srun", "--nodelist=" . join(',', @node)],
533 ["/bin/sh", "-ec", $docker_install_script]);
538 last if $docker_pid == waitpid (-1, WNOHANG);
539 freeze_if_want_freeze ($docker_pid);
540 select (undef, undef, undef, 0.1);
544 croak("Installing Docker image from $docker_locator returned exit code $?");
548 foreach (qw (script script_version script_parameters runtime_constraints))
552 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
554 foreach (split (/\n/, $Job->{knobs}))
556 Log (undef, "knob " . $_);
561 $main::success = undef;
567 my $thisround_succeeded = 0;
568 my $thisround_failed = 0;
569 my $thisround_failed_multiple = 0;
571 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
572 or $a <=> $b } @jobstep_todo;
573 my $level = $jobstep[$jobstep_todo[0]]->{level};
574 Log (undef, "start level $level");
579 my @freeslot = (0..$#slot);
582 my $progress_is_dirty = 1;
583 my $progress_stats_updated = 0;
585 update_progress_stats();
590 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
592 my $id = $jobstep_todo[$todo_ptr];
593 my $Jobstep = $jobstep[$id];
594 if ($Jobstep->{level} != $level)
599 pipe $reader{$id}, "writer" or croak ($!);
600 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
601 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
603 my $childslot = $freeslot[0];
604 my $childnode = $slot[$childslot]->{node};
605 my $childslotname = join (".",
606 $slot[$childslot]->{node}->{name},
607 $slot[$childslot]->{cpu});
608 my $childpid = fork();
611 $SIG{'INT'} = 'DEFAULT';
612 $SIG{'QUIT'} = 'DEFAULT';
613 $SIG{'TERM'} = 'DEFAULT';
615 foreach (values (%reader))
619 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
620 open(STDOUT,">&writer");
621 open(STDERR,">&writer");
626 delete $ENV{"GNUPGHOME"};
627 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
628 $ENV{"TASK_QSEQUENCE"} = $id;
629 $ENV{"TASK_SEQUENCE"} = $level;
630 $ENV{"JOB_SCRIPT"} = $Job->{script};
631 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
632 $param =~ tr/a-z/A-Z/;
633 $ENV{"JOB_PARAMETER_$param"} = $value;
635 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
636 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
637 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
638 $ENV{"HOME"} = $ENV{"TASK_WORK"};
639 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
640 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
641 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
642 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
648 "--nodelist=".$childnode->{name},
649 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
650 "--job-name=$job_id.$id.$$",
652 my $build_script_to_send = "";
654 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
655 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
656 ."&& cd $ENV{CRUNCH_TMP} ";
659 $build_script_to_send = $build_script;
663 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
666 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
667 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
668 # Dynamically configure the container to use the host system as its
669 # DNS server. Get the host's global addresses from the ip command,
670 # and turn them into docker --dns options using gawk.
672 q{$(ip -o address show scope global |
673 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
674 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
675 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
676 $command .= "--env=\QHOME=/home/crunch\E ";
677 while (my ($env_key, $env_val) = each %ENV)
679 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
680 if ($env_key eq "TASK_WORK") {
681 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
683 elsif ($env_key eq "TASK_KEEPMOUNT") {
684 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
687 $command .= "--env=\Q$env_key=$env_val\E ";
691 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
692 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
693 $command .= "\Q$docker_hash\E ";
694 $command .= "stdbuf --output=0 --error=0 ";
695 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
698 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
699 $command .= "stdbuf --output=0 --error=0 ";
700 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
703 my @execargs = ('bash', '-c', $command);
704 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
705 # exec() failed, we assume nothing happened.
706 Log(undef, "srun() failed on build script");
710 if (!defined $childpid)
717 $proc{$childpid} = { jobstep => $id,
720 jobstepname => "$job_id.$id.$childpid",
722 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
723 $slot[$childslot]->{pid} = $childpid;
725 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
726 Log ($id, "child $childpid started on $childslotname");
727 $Jobstep->{starttime} = time;
728 $Jobstep->{node} = $childnode->{name};
729 $Jobstep->{slotindex} = $childslot;
730 delete $Jobstep->{stderr};
731 delete $Jobstep->{finishtime};
733 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
734 $Jobstep->{'arvados_task'}->save;
736 splice @jobstep_todo, $todo_ptr, 1;
739 $progress_is_dirty = 1;
743 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
745 last THISROUND if $main::please_freeze;
746 if ($main::please_info)
748 $main::please_info = 0;
752 update_progress_stats();
759 check_refresh_wanted();
761 update_progress_stats();
762 select (undef, undef, undef, 0.1);
764 elsif (time - $progress_stats_updated >= 30)
766 update_progress_stats();
768 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
769 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
771 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
772 .($thisround_failed+$thisround_succeeded)
773 .") -- giving up on this round";
774 Log (undef, $message);
778 # move slots from freeslot to holdslot (or back to freeslot) if necessary
779 for (my $i=$#freeslot; $i>=0; $i--) {
780 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
781 push @holdslot, (splice @freeslot, $i, 1);
784 for (my $i=$#holdslot; $i>=0; $i--) {
785 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
786 push @freeslot, (splice @holdslot, $i, 1);
790 # give up if no nodes are succeeding
791 if (!grep { $_->{node}->{losing_streak} == 0 &&
792 $_->{node}->{hold_count} < 4 } @slot) {
793 my $message = "Every node has failed -- giving up on this round";
794 Log (undef, $message);
801 push @freeslot, splice @holdslot;
802 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
805 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
808 if ($main::please_continue) {
809 $main::please_continue = 0;
812 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
816 check_refresh_wanted();
818 update_progress_stats();
819 select (undef, undef, undef, 0.1);
820 killem (keys %proc) if $main::please_freeze;
824 update_progress_stats();
825 freeze_if_want_freeze();
828 if (!defined $main::success)
831 $thisround_succeeded == 0 &&
832 ($thisround_failed == 0 || $thisround_failed > 4))
834 my $message = "stop because $thisround_failed tasks failed and none succeeded";
835 Log (undef, $message);
844 goto ONELEVEL if !defined $main::success;
847 release_allocation();
849 my $collated_output = &collate_output();
851 if (!$collated_output) {
852 Log(undef, "output undef");
856 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
857 or die "failed to get collated manifest: $!";
858 my $orig_manifest_text = '';
859 while (my $manifest_line = <$orig_manifest>) {
860 $orig_manifest_text .= $manifest_line;
862 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
863 'manifest_text' => $orig_manifest_text,
865 Log(undef, "output uuid " . $output->{uuid});
866 Log(undef, "output hash " . $output->{portable_data_hash});
867 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
870 Log (undef, "Failed to register output manifest: $@");
874 Log (undef, "finish");
879 $Job->update_attributes('running' => 0,
880 'success' => $collated_output && $main::success,
881 'finished_at' => scalar gmtime)
884 exit ($Job->{'success'} ? 1 : 0);
888 sub update_progress_stats
890 $progress_stats_updated = time;
891 return if !$progress_is_dirty;
892 my ($todo, $done, $running) = (scalar @jobstep_todo,
893 scalar @jobstep_done,
894 scalar @slot - scalar @freeslot - scalar @holdslot);
895 $Job->{'tasks_summary'} ||= {};
896 $Job->{'tasks_summary'}->{'todo'} = $todo;
897 $Job->{'tasks_summary'}->{'done'} = $done;
898 $Job->{'tasks_summary'}->{'running'} = $running;
900 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
902 Log (undef, "status: $done done, $running running, $todo todo");
903 $progress_is_dirty = 0;
910 my $pid = waitpid (-1, WNOHANG);
911 return 0 if $pid <= 0;
913 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
915 . $slot[$proc{$pid}->{slot}]->{cpu});
916 my $jobstepid = $proc{$pid}->{jobstep};
917 my $elapsed = time - $proc{$pid}->{time};
918 my $Jobstep = $jobstep[$jobstepid];
920 my $childstatus = $?;
921 my $exitvalue = $childstatus >> 8;
922 my $exitinfo = sprintf("exit %d signal %d%s",
925 ($childstatus & 128 ? ' core dump' : ''));
926 $Jobstep->{'arvados_task'}->reload;
927 my $task_success = $Jobstep->{'arvados_task'}->{success};
929 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
931 if (!defined $task_success) {
932 # task did not indicate one way or the other --> fail
933 $Jobstep->{'arvados_task'}->{success} = 0;
934 $Jobstep->{'arvados_task'}->save;
941 $temporary_fail ||= $Jobstep->{node_fail};
942 $temporary_fail ||= ($exitvalue == 111);
945 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
947 # Check for signs of a failed or misconfigured node
948 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
949 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
950 # Don't count this against jobstep failure thresholds if this
951 # node is already suspected faulty and srun exited quickly
952 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
954 Log ($jobstepid, "blaming failure on suspect node " .
955 $slot[$proc{$pid}->{slot}]->{node}->{name});
956 $temporary_fail ||= 1;
958 ban_node_by_slot($proc{$pid}->{slot});
961 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
962 ++$Jobstep->{'failures'},
963 $temporary_fail ? 'temporary ' : 'permanent',
966 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
967 # Give up on this task, and the whole job
969 $main::please_freeze = 1;
972 # Put this task back on the todo queue
973 push @jobstep_todo, $jobstepid;
975 $Job->{'tasks_summary'}->{'failed'}++;
979 ++$thisround_succeeded;
980 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
981 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
982 push @jobstep_done, $jobstepid;
983 Log ($jobstepid, "success in $elapsed seconds");
985 $Jobstep->{exitcode} = $childstatus;
986 $Jobstep->{finishtime} = time;
987 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
988 $Jobstep->{'arvados_task'}->save;
989 process_stderr ($jobstepid, $task_success);
990 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
992 close $reader{$jobstepid};
993 delete $reader{$jobstepid};
994 delete $slot[$proc{$pid}->{slot}]->{pid};
995 push @freeslot, $proc{$pid}->{slot};
1000 my $newtask_list = [];
1001 my $newtask_results;
1003 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1005 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1007 'order' => 'qsequence',
1008 'offset' => scalar(@$newtask_list),
1010 push(@$newtask_list, @{$newtask_results->{items}});
1011 } while (@{$newtask_results->{items}});
1012 foreach my $arvados_task (@$newtask_list) {
1014 'level' => $arvados_task->{'sequence'},
1016 'arvados_task' => $arvados_task
1018 push @jobstep, $jobstep;
1019 push @jobstep_todo, $#jobstep;
1023 $progress_is_dirty = 1;
1027 sub check_refresh_wanted
1029 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1030 if (@stat && $stat[9] > $latest_refresh) {
1031 $latest_refresh = scalar time;
1032 if ($job_has_uuid) {
1033 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1034 for my $attr ('cancelled_at',
1035 'cancelled_by_user_uuid',
1036 'cancelled_by_client_uuid') {
1037 $Job->{$attr} = $Job2->{$attr};
1039 if ($Job->{'cancelled_at'}) {
1040 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
1041 " by user " . $Job->{cancelled_by_user_uuid});
1043 $main::please_freeze = 1;
1051 # return if the kill list was checked <4 seconds ago
1052 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1056 $squeue_kill_checked = time;
1058 # use killem() on procs whose killtime is reached
1061 if (exists $proc{$_}->{killtime}
1062 && $proc{$_}->{killtime} <= time)
1068 # return if the squeue was checked <60 seconds ago
1069 if (defined $squeue_checked && $squeue_checked > time - 60)
1073 $squeue_checked = time;
1077 # here is an opportunity to check for mysterious problems with local procs
1081 # get a list of steps still running
1082 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1084 if ($squeue[-1] ne "ok")
1090 # which of my jobsteps are running, according to squeue?
1094 if (/^(\d+)\.(\d+) (\S+)/)
1096 if ($1 eq $ENV{SLURM_JOBID})
1103 # which of my active child procs (>60s old) were not mentioned by squeue?
1104 foreach (keys %proc)
1106 if ($proc{$_}->{time} < time - 60
1107 && !exists $ok{$proc{$_}->{jobstepname}}
1108 && !exists $proc{$_}->{killtime})
1110 # kill this proc if it hasn't exited in 30 seconds
1111 $proc{$_}->{killtime} = time + 30;
1117 sub release_allocation
1121 Log (undef, "release job allocation");
1122 system "scancel $ENV{SLURM_JOBID}";
1130 foreach my $job (keys %reader)
1133 while (0 < sysread ($reader{$job}, $buf, 8192))
1135 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1136 $jobstep[$job]->{stderr} .= $buf;
1137 preprocess_stderr ($job);
1138 if (length ($jobstep[$job]->{stderr}) > 16384)
1140 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1149 sub preprocess_stderr
1153 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1155 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1156 Log ($job, "stderr $line");
1157 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1159 $main::please_freeze = 1;
1161 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1162 $jobstep[$job]->{node_fail} = 1;
1163 ban_node_by_slot($jobstep[$job]->{slotindex});
1172 my $task_success = shift;
1173 preprocess_stderr ($job);
1176 Log ($job, "stderr $_");
1177 } split ("\n", $jobstep[$job]->{stderr});
1183 my ($keep, $child_out, $output_block);
1185 my $cmd = "arv-get \Q$hash\E";
1186 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1190 my $bytes = sysread($keep, $buf, 1024 * 1024);
1191 if (!defined $bytes) {
1192 die "reading from arv-get: $!";
1193 } elsif ($bytes == 0) {
1194 # sysread returns 0 at the end of the pipe.
1197 # some bytes were read into buf.
1198 $output_block .= $buf;
1202 return $output_block;
1207 Log (undef, "collate");
1209 my ($child_out, $child_in);
1210 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1211 '--retries', put_retry_count());
1215 next if (!exists $_->{'arvados_task'}->{'output'} ||
1216 !$_->{'arvados_task'}->{'success'});
1217 my $output = $_->{'arvados_task'}->{output};
1218 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1220 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1221 print $child_in $output;
1223 elsif (@jobstep == 1)
1225 $joboutput = $output;
1228 elsif (defined (my $outblock = fetch_block ($output)))
1230 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1231 print $child_in $outblock;
1235 Log (undef, "XXX fetch_block($output) failed XXX");
1241 if (!defined $joboutput) {
1242 my $s = IO::Select->new($child_out);
1243 if ($s->can_read(120)) {
1244 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1247 Log (undef, "timed out reading from 'arv-put'");
1260 my $sig = 2; # SIGINT first
1261 if (exists $proc{$_}->{"sent_$sig"} &&
1262 time - $proc{$_}->{"sent_$sig"} > 4)
1264 $sig = 15; # SIGTERM if SIGINT doesn't work
1266 if (exists $proc{$_}->{"sent_$sig"} &&
1267 time - $proc{$_}->{"sent_$sig"} > 4)
1269 $sig = 9; # SIGKILL if SIGTERM doesn't work
1271 if (!exists $proc{$_}->{"sent_$sig"})
1273 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1275 select (undef, undef, undef, 0.1);
1278 kill $sig, $_; # srun wants two SIGINT to really interrupt
1280 $proc{$_}->{"sent_$sig"} = time;
1281 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1291 vec($bits,fileno($_),1) = 1;
1297 sub Log # ($jobstep_id, $logmessage)
1299 if ($_[1] =~ /\n/) {
1300 for my $line (split (/\n/, $_[1])) {
1305 my $fh = select STDERR; $|=1; select $fh;
1306 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1307 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1310 if ($local_logfile || -t STDERR) {
1311 my @gmtime = gmtime;
1312 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1313 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1315 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1317 if ($local_logfile) {
1318 print $local_logfile $datetime . " " . $message;
1325 my ($package, $file, $line) = caller;
1326 my $message = "@_ at $file line $line\n";
1327 Log (undef, $message);
1328 freeze() if @jobstep_todo;
1329 collate_output() if @jobstep_todo;
1331 save_meta() if $local_logfile;
1338 return if !$job_has_uuid;
1339 $Job->update_attributes('running' => 0,
1341 'finished_at' => scalar gmtime);
1347 my $justcheckpoint = shift; # false if this will be the last meta saved
1348 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1350 $local_logfile->flush;
1351 my $retry_count = put_retry_count();
1352 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1353 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1354 my $loglocator = `$cmd`;
1355 die "system $cmd failed: $?" if $?;
1358 $local_logfile = undef; # the temp file is automatically deleted
1359 Log (undef, "log manifest is $loglocator");
1360 $Job->{'log'} = $loglocator;
1361 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1365 sub freeze_if_want_freeze
1367 if ($main::please_freeze)
1369 release_allocation();
1372 # kill some srun procs before freeze+stop
1373 map { $proc{$_} = {} } @_;
1376 killem (keys %proc);
1377 select (undef, undef, undef, 0.1);
1379 while (($died = waitpid (-1, WNOHANG)) > 0)
1381 delete $proc{$died};
1396 Log (undef, "Freeze not implemented");
1403 croak ("Thaw not implemented");
1419 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1426 my $srunargs = shift;
1427 my $execargs = shift;
1428 my $opts = shift || {};
1430 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1431 print STDERR (join (" ",
1432 map { / / ? "'$_'" : $_ }
1435 if $ENV{CRUNCH_DEBUG};
1437 if (defined $stdin) {
1438 my $child = open STDIN, "-|";
1439 defined $child or die "no fork: $!";
1441 print $stdin or die $!;
1442 close STDOUT or die $!;
1447 return system (@$args) if $opts->{fork};
1450 warn "ENV size is ".length(join(" ",%ENV));
1451 die "exec failed: $!: @$args";
1455 sub ban_node_by_slot {
1456 # Don't start any new jobsteps on this node for 60 seconds
1458 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1459 $slot[$slotid]->{node}->{hold_count}++;
1460 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1465 my ($lockfile, $error_message) = @_;
1466 open L, ">", $lockfile or croak("$lockfile: $!");
1467 if (!flock L, LOCK_EX|LOCK_NB) {
1468 croak("Can't lock $lockfile: $error_message\n");
1472 sub find_docker_image {
1473 # Given a Keep locator, check to see if it contains a Docker image.
1474 # If so, return its stream name and Docker hash.
1475 # If not, return undef for both values.
1476 my $locator = shift;
1477 my ($streamname, $filename);
1478 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1479 foreach my $line (split(/\n/, $image->{manifest_text})) {
1480 my @tokens = split(/\s+/, $line);
1482 $streamname = shift(@tokens);
1483 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1484 if (defined($filename)) {
1485 return (undef, undef); # More than one file in the Collection.
1487 $filename = (split(/:/, $filedata, 3))[2];
1492 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1493 return ($streamname, $1);
1495 return (undef, undef);
1499 sub put_retry_count {
1500 # Calculate a --retries argument for arv-put that will have it try
1501 # approximately as long as this Job has been running.
1502 my $stoptime = shift || time;
1503 my $starttime = $jobstep[0]->{starttime};
1504 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1506 while ($timediff >= 2) {
1510 return ($retries > 3) ? $retries : 3;
1516 # checkout-and-build
1519 use File::Path qw( make_path );
1521 my $destdir = $ENV{"CRUNCH_SRC"};
1522 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1523 my $repo = $ENV{"CRUNCH_SRC_URL"};
1524 my $task_work = $ENV{"TASK_WORK"};
1526 for my $dir ($destdir, $task_work) {
1529 -e $dir or die "Failed to create temporary directory ($dir): $!";
1533 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1535 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1538 die "Cannot exec `@ARGV`: $!";
1544 unlink "$destdir.commit";
1545 open STDOUT, ">", "$destdir.log";
1546 open STDERR, ">&STDOUT";
1549 my @git_archive_data = <DATA>;
1550 if (@git_archive_data) {
1551 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1552 print TARX @git_archive_data;
1554 die "'tar -C $destdir -xf -' exited $?: $!";
1559 chomp ($pwd = `pwd`);
1560 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1563 for my $src_path ("$destdir/arvados/sdk/python") {
1565 shell_or_die ("virtualenv", $install_dir);
1566 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1570 if (-e "$destdir/crunch_scripts/install") {
1571 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1572 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1574 shell_or_die ("./tests/autotests.sh", $install_dir);
1575 } elsif (-e "./install.sh") {
1576 shell_or_die ("./install.sh", $install_dir);
1580 unlink "$destdir.commit.new";
1581 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1582 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1589 die "Cannot exec `@ARGV`: $!";
1596 if ($ENV{"DEBUG"}) {
1597 print STDERR "@_\n";
1600 or die "@_ failed: $! exit 0x".sprintf("%x",$?);