2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
134 $main::ENV{CRUNCH_DEBUG} = 1;
138 $main::ENV{CRUNCH_DEBUG} = 0;
143 my $arv = Arvados->new('apiVersion' => 'v1');
146 my $User = $arv->{'users'}->{'current'}->execute;
154 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
155 if (!$force_unlock) {
156 # If some other crunch-job process has grabbed this job (or we see
157 # other evidence that the job is already underway) we exit
158 # EX_TEMPFAIL so crunch-dispatch (our parent process) doesn't
159 # mark the job as failed.
160 if ($Job->{'is_locked_by_uuid'}) {
161 Log(undef, "Job is locked by " . $Job->{'is_locked_by_uuid'});
164 if ($Job->{'state'} ne 'Queued') {
165 Log(undef, "Job state is " . $Job->{'state'} . ", but I can only start queued jobs.");
168 if ($Job->{'success'} ne undef) {
169 Log(undef, "Job 'success' flag (" . $Job->{'success'} . ") is not null");
172 if ($Job->{'running'}) {
173 Log(undef, "Job 'running' flag is already set");
176 if ($Job->{'started_at'}) {
177 Log(undef, "Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
184 $Job = JSON::decode_json($jobspec);
188 map { croak ("No $_ specified") unless $Job->{$_} }
189 qw(script script_version script_parameters);
192 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
193 $Job->{'started_at'} = gmtime;
195 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
199 $job_id = $Job->{'uuid'};
201 my $keep_logfile = $job_id . '.log.txt';
202 $local_logfile = File::Temp->new();
204 $Job->{'runtime_constraints'} ||= {};
205 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
206 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
209 Log (undef, "check slurm allocation");
212 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
216 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
217 push @sinfo, "$localcpus localhost";
219 if (exists $ENV{SLURM_NODELIST})
221 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
225 my ($ncpus, $slurm_nodelist) = split;
226 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
229 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
232 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
235 foreach (split (",", $ranges))
248 push @nodelist, map {
250 $n =~ s/\[[-,\d]+\]/$_/;
257 push @nodelist, $nodelist;
260 foreach my $nodename (@nodelist)
262 Log (undef, "node $nodename - $ncpus slots");
263 my $node = { name => $nodename,
267 foreach my $cpu (1..$ncpus)
269 push @slot, { node => $node,
273 push @node, @nodelist;
278 # Ensure that we get one jobstep running on each allocated node before
279 # we start overloading nodes with concurrent steps
281 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
288 # Claim this job, and make sure nobody else does
289 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
290 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
291 Log(undef, "Error while updating / locking job, exiting ".EX_TEMPFAIL);
294 $Job->update_attributes('state' => 'Running',
295 'tasks_summary' => { 'failed' => 0,
302 Log (undef, "start");
303 $SIG{'INT'} = sub { $main::please_freeze = 1; };
304 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
305 $SIG{'TERM'} = \&croak;
306 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
307 $SIG{'ALRM'} = sub { $main::please_info = 1; };
308 $SIG{'CONT'} = sub { $main::please_continue = 1; };
309 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
311 $main::please_freeze = 0;
312 $main::please_info = 0;
313 $main::please_continue = 0;
314 $main::please_refresh = 0;
315 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
317 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
318 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
319 $ENV{"JOB_UUID"} = $job_id;
323 my @jobstep_todo = ();
324 my @jobstep_done = ();
325 my @jobstep_tomerge = ();
326 my $jobstep_tomerge_level = 0;
328 my $squeue_kill_checked;
329 my $output_in_keep = 0;
330 my $latest_refresh = scalar time;
334 if (defined $Job->{thawedfromkey})
336 thaw ($Job->{thawedfromkey});
340 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
341 'job_uuid' => $Job->{'uuid'},
346 push @jobstep, { 'level' => 0,
348 'arvados_task' => $first_task,
350 push @jobstep_todo, 0;
356 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
363 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
365 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
368 if (!defined $no_clear_tmp) {
369 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
370 system($clear_tmp_cmd) == 0
371 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
373 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
374 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
376 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
377 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
378 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
380 or croak ("setup.py in $src_path failed: exit ".($?>>8));
388 $build_script = <DATA>;
390 Log (undef, "Install revision ".$Job->{script_version});
391 my $nodelist = join(",", @node);
393 if (!defined $no_clear_tmp) {
394 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
396 my $cleanpid = fork();
399 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
400 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
405 last if $cleanpid == waitpid (-1, WNOHANG);
406 freeze_if_want_freeze ($cleanpid);
407 select (undef, undef, undef, 0.1);
409 Log (undef, "Clean-work-dir exited $?");
412 # Install requested code version
415 my @srunargs = ("srun",
416 "--nodelist=$nodelist",
417 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
419 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
420 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
424 my $treeish = $Job->{'script_version'};
426 # If we're running under crunch-dispatch, it will have pulled the
427 # appropriate source tree into its own repository, and given us that
428 # repo's path as $git_dir. If we're running a "local" job, and a
429 # script_version was specified, it's up to the user to provide the
430 # full path to a local repository in Job->{repository}.
432 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
433 # git-archive --remote where appropriate.
435 # TODO: Accept a locally-hosted Arvados repository by name or
436 # UUID. Use arvados.v1.repositories.list or .get to figure out the
437 # appropriate fetch-url.
438 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
440 $ENV{"CRUNCH_SRC_URL"} = $repo;
442 if (-d "$repo/.git") {
443 # We were given a working directory, but we are only interested in
445 $repo = "$repo/.git";
448 # If this looks like a subversion r#, look for it in git-svn commit messages
450 if ($treeish =~ m{^\d{1,4}$}) {
451 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
453 Log(undef, "git Subversion search exited $?");
454 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
456 Log(undef, "Using commit $commit for Subversion revision $treeish");
460 # If that didn't work, try asking git to look it up as a tree-ish.
462 if (!defined $commit) {
463 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
465 Log(undef, "git rev-list exited $? with result '$found'");
466 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
468 Log(undef, "Using commit $commit for tree-ish $treeish");
469 if ($commit ne $treeish) {
470 # Make sure we record the real commit id in the database,
471 # frozentokey, logs, etc. -- instead of an abbreviation or a
472 # branch name which can become ambiguous or point to a
473 # different commit in the future.
474 $Job->{'script_version'} = $commit;
476 $Job->update_attributes('script_version' => $commit) or
477 croak("Error while updating job");
482 if (defined $commit) {
483 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
484 @execargs = ("sh", "-c",
485 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
486 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
487 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
490 croak ("could not figure out commit id for $treeish");
493 # Note: this section is almost certainly unnecessary if we're
494 # running tasks in docker containers.
495 my $installpid = fork();
496 if ($installpid == 0)
498 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
503 last if $installpid == waitpid (-1, WNOHANG);
504 freeze_if_want_freeze ($installpid);
505 select (undef, undef, undef, 0.1);
507 Log (undef, "Install exited $?");
512 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
513 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
516 # If this job requires a Docker image, install that.
517 my $docker_bin = "/usr/bin/docker.io";
518 my ($docker_locator, $docker_stream, $docker_hash);
519 if ($docker_locator = $Job->{docker_image_locator}) {
520 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
523 croak("No Docker image hash found from locator $docker_locator");
525 $docker_stream =~ s/^\.//;
526 my $docker_install_script = qq{
527 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
528 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
531 my $docker_pid = fork();
532 if ($docker_pid == 0)
534 srun (["srun", "--nodelist=" . join(',', @node)],
535 ["/bin/sh", "-ec", $docker_install_script]);
540 last if $docker_pid == waitpid (-1, WNOHANG);
541 freeze_if_want_freeze ($docker_pid);
542 select (undef, undef, undef, 0.1);
546 croak("Installing Docker image from $docker_locator returned exit code $?");
550 foreach (qw (script script_version script_parameters runtime_constraints))
554 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
556 foreach (split (/\n/, $Job->{knobs}))
558 Log (undef, "knob " . $_);
563 $main::success = undef;
569 my $thisround_succeeded = 0;
570 my $thisround_failed = 0;
571 my $thisround_failed_multiple = 0;
573 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
574 or $a <=> $b } @jobstep_todo;
575 my $level = $jobstep[$jobstep_todo[0]]->{level};
576 Log (undef, "start level $level");
581 my @freeslot = (0..$#slot);
584 my $progress_is_dirty = 1;
585 my $progress_stats_updated = 0;
587 update_progress_stats();
592 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
594 my $id = $jobstep_todo[$todo_ptr];
595 my $Jobstep = $jobstep[$id];
596 if ($Jobstep->{level} != $level)
601 pipe $reader{$id}, "writer" or croak ($!);
602 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
603 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
605 my $childslot = $freeslot[0];
606 my $childnode = $slot[$childslot]->{node};
607 my $childslotname = join (".",
608 $slot[$childslot]->{node}->{name},
609 $slot[$childslot]->{cpu});
610 my $childpid = fork();
613 $SIG{'INT'} = 'DEFAULT';
614 $SIG{'QUIT'} = 'DEFAULT';
615 $SIG{'TERM'} = 'DEFAULT';
617 foreach (values (%reader))
621 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
622 open(STDOUT,">&writer");
623 open(STDERR,">&writer");
628 delete $ENV{"GNUPGHOME"};
629 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
630 $ENV{"TASK_QSEQUENCE"} = $id;
631 $ENV{"TASK_SEQUENCE"} = $level;
632 $ENV{"JOB_SCRIPT"} = $Job->{script};
633 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
634 $param =~ tr/a-z/A-Z/;
635 $ENV{"JOB_PARAMETER_$param"} = $value;
637 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
638 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
639 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
640 $ENV{"HOME"} = $ENV{"TASK_WORK"};
641 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
642 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
643 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
644 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
650 "--nodelist=".$childnode->{name},
651 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
652 "--job-name=$job_id.$id.$$",
654 my $build_script_to_send = "";
656 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
657 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
658 ."&& cd $ENV{CRUNCH_TMP} ";
661 $build_script_to_send = $build_script;
665 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
668 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
669 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
670 # Dynamically configure the container to use the host system as its
671 # DNS server. Get the host's global addresses from the ip command,
672 # and turn them into docker --dns options using gawk.
674 q{$(ip -o address show scope global |
675 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
676 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
677 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
678 $command .= "--env=\QHOME=/home/crunch\E ";
679 while (my ($env_key, $env_val) = each %ENV)
681 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
682 if ($env_key eq "TASK_WORK") {
683 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
685 elsif ($env_key eq "TASK_KEEPMOUNT") {
686 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
689 $command .= "--env=\Q$env_key=$env_val\E ";
693 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
694 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
695 $command .= "\Q$docker_hash\E ";
696 $command .= "stdbuf --output=0 --error=0 ";
697 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
700 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
701 $command .= "stdbuf --output=0 --error=0 ";
702 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
705 my @execargs = ('bash', '-c', $command);
706 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
707 # exec() failed, we assume nothing happened.
708 Log(undef, "srun() failed on build script");
712 if (!defined $childpid)
719 $proc{$childpid} = { jobstep => $id,
722 jobstepname => "$job_id.$id.$childpid",
724 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
725 $slot[$childslot]->{pid} = $childpid;
727 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
728 Log ($id, "child $childpid started on $childslotname");
729 $Jobstep->{starttime} = time;
730 $Jobstep->{node} = $childnode->{name};
731 $Jobstep->{slotindex} = $childslot;
732 delete $Jobstep->{stderr};
733 delete $Jobstep->{finishtime};
735 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
736 $Jobstep->{'arvados_task'}->save;
738 splice @jobstep_todo, $todo_ptr, 1;
741 $progress_is_dirty = 1;
745 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
747 last THISROUND if $main::please_freeze;
748 if ($main::please_info)
750 $main::please_info = 0;
754 update_progress_stats();
761 check_refresh_wanted();
763 update_progress_stats();
764 select (undef, undef, undef, 0.1);
766 elsif (time - $progress_stats_updated >= 30)
768 update_progress_stats();
770 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
771 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
773 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
774 .($thisround_failed+$thisround_succeeded)
775 .") -- giving up on this round";
776 Log (undef, $message);
780 # move slots from freeslot to holdslot (or back to freeslot) if necessary
781 for (my $i=$#freeslot; $i>=0; $i--) {
782 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
783 push @holdslot, (splice @freeslot, $i, 1);
786 for (my $i=$#holdslot; $i>=0; $i--) {
787 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
788 push @freeslot, (splice @holdslot, $i, 1);
792 # give up if no nodes are succeeding
793 if (!grep { $_->{node}->{losing_streak} == 0 &&
794 $_->{node}->{hold_count} < 4 } @slot) {
795 my $message = "Every node has failed -- giving up on this round";
796 Log (undef, $message);
803 push @freeslot, splice @holdslot;
804 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
807 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
810 if ($main::please_continue) {
811 $main::please_continue = 0;
814 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
818 check_refresh_wanted();
820 update_progress_stats();
821 select (undef, undef, undef, 0.1);
822 killem (keys %proc) if $main::please_freeze;
826 update_progress_stats();
827 freeze_if_want_freeze();
830 if (!defined $main::success)
833 $thisround_succeeded == 0 &&
834 ($thisround_failed == 0 || $thisround_failed > 4))
836 my $message = "stop because $thisround_failed tasks failed and none succeeded";
837 Log (undef, $message);
846 goto ONELEVEL if !defined $main::success;
849 release_allocation();
851 my $collated_output = &collate_output();
853 if (!$collated_output) {
854 Log(undef, "output undef");
858 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
859 or die "failed to get collated manifest: $!";
860 my $orig_manifest_text = '';
861 while (my $manifest_line = <$orig_manifest>) {
862 $orig_manifest_text .= $manifest_line;
864 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
865 'manifest_text' => $orig_manifest_text,
867 Log(undef, "output uuid " . $output->{uuid});
868 Log(undef, "output hash " . $output->{portable_data_hash});
869 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
872 Log (undef, "Failed to register output manifest: $@");
876 Log (undef, "finish");
881 if ($collated_output && $main::success) {
882 $final_state = 'Complete';
884 $final_state = 'Failed';
886 $Job->update_attributes('state' => $final_state);
888 exit (($final_state eq 'Complete') ? 0 : 1);
892 sub update_progress_stats
894 $progress_stats_updated = time;
895 return if !$progress_is_dirty;
896 my ($todo, $done, $running) = (scalar @jobstep_todo,
897 scalar @jobstep_done,
898 scalar @slot - scalar @freeslot - scalar @holdslot);
899 $Job->{'tasks_summary'} ||= {};
900 $Job->{'tasks_summary'}->{'todo'} = $todo;
901 $Job->{'tasks_summary'}->{'done'} = $done;
902 $Job->{'tasks_summary'}->{'running'} = $running;
904 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
906 Log (undef, "status: $done done, $running running, $todo todo");
907 $progress_is_dirty = 0;
914 my $pid = waitpid (-1, WNOHANG);
915 return 0 if $pid <= 0;
917 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
919 . $slot[$proc{$pid}->{slot}]->{cpu});
920 my $jobstepid = $proc{$pid}->{jobstep};
921 my $elapsed = time - $proc{$pid}->{time};
922 my $Jobstep = $jobstep[$jobstepid];
924 my $childstatus = $?;
925 my $exitvalue = $childstatus >> 8;
926 my $exitinfo = sprintf("exit %d signal %d%s",
929 ($childstatus & 128 ? ' core dump' : ''));
930 $Jobstep->{'arvados_task'}->reload;
931 my $task_success = $Jobstep->{'arvados_task'}->{success};
933 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
935 if (!defined $task_success) {
936 # task did not indicate one way or the other --> fail
937 $Jobstep->{'arvados_task'}->{success} = 0;
938 $Jobstep->{'arvados_task'}->save;
945 $temporary_fail ||= $Jobstep->{node_fail};
946 $temporary_fail ||= ($exitvalue == 111);
949 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
951 # Check for signs of a failed or misconfigured node
952 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
953 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
954 # Don't count this against jobstep failure thresholds if this
955 # node is already suspected faulty and srun exited quickly
956 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
958 Log ($jobstepid, "blaming failure on suspect node " .
959 $slot[$proc{$pid}->{slot}]->{node}->{name});
960 $temporary_fail ||= 1;
962 ban_node_by_slot($proc{$pid}->{slot});
965 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
966 ++$Jobstep->{'failures'},
967 $temporary_fail ? 'temporary ' : 'permanent',
970 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
971 # Give up on this task, and the whole job
973 $main::please_freeze = 1;
975 # Put this task back on the todo queue
976 push @jobstep_todo, $jobstepid;
977 $Job->{'tasks_summary'}->{'failed'}++;
981 ++$thisround_succeeded;
982 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
983 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
984 push @jobstep_done, $jobstepid;
985 Log ($jobstepid, "success in $elapsed seconds");
987 $Jobstep->{exitcode} = $childstatus;
988 $Jobstep->{finishtime} = time;
989 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
990 $Jobstep->{'arvados_task'}->save;
991 process_stderr ($jobstepid, $task_success);
992 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
994 close $reader{$jobstepid};
995 delete $reader{$jobstepid};
996 delete $slot[$proc{$pid}->{slot}]->{pid};
997 push @freeslot, $proc{$pid}->{slot};
1000 if ($task_success) {
1002 my $newtask_list = [];
1003 my $newtask_results;
1005 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
1007 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
1009 'order' => 'qsequence',
1010 'offset' => scalar(@$newtask_list),
1012 push(@$newtask_list, @{$newtask_results->{items}});
1013 } while (@{$newtask_results->{items}});
1014 foreach my $arvados_task (@$newtask_list) {
1016 'level' => $arvados_task->{'sequence'},
1018 'arvados_task' => $arvados_task
1020 push @jobstep, $jobstep;
1021 push @jobstep_todo, $#jobstep;
1025 $progress_is_dirty = 1;
1029 sub check_refresh_wanted
1031 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1032 if (@stat && $stat[9] > $latest_refresh) {
1033 $latest_refresh = scalar time;
1034 if ($job_has_uuid) {
1035 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1036 for my $attr ('cancelled_at',
1037 'cancelled_by_user_uuid',
1038 'cancelled_by_client_uuid',
1040 $Job->{$attr} = $Job2->{$attr};
1042 if ($Job->{'state'} ne "Running") {
1043 if ($Job->{'state'} eq "Cancelled") {
1044 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1046 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1049 $main::please_freeze = 1;
1057 # return if the kill list was checked <4 seconds ago
1058 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1062 $squeue_kill_checked = time;
1064 # use killem() on procs whose killtime is reached
1067 if (exists $proc{$_}->{killtime}
1068 && $proc{$_}->{killtime} <= time)
1074 # return if the squeue was checked <60 seconds ago
1075 if (defined $squeue_checked && $squeue_checked > time - 60)
1079 $squeue_checked = time;
1083 # here is an opportunity to check for mysterious problems with local procs
1087 # get a list of steps still running
1088 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1090 if ($squeue[-1] ne "ok")
1096 # which of my jobsteps are running, according to squeue?
1100 if (/^(\d+)\.(\d+) (\S+)/)
1102 if ($1 eq $ENV{SLURM_JOBID})
1109 # which of my active child procs (>60s old) were not mentioned by squeue?
1110 foreach (keys %proc)
1112 if ($proc{$_}->{time} < time - 60
1113 && !exists $ok{$proc{$_}->{jobstepname}}
1114 && !exists $proc{$_}->{killtime})
1116 # kill this proc if it hasn't exited in 30 seconds
1117 $proc{$_}->{killtime} = time + 30;
1123 sub release_allocation
1127 Log (undef, "release job allocation");
1128 system "scancel $ENV{SLURM_JOBID}";
1136 foreach my $job (keys %reader)
1139 while (0 < sysread ($reader{$job}, $buf, 8192))
1141 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1142 $jobstep[$job]->{stderr} .= $buf;
1143 preprocess_stderr ($job);
1144 if (length ($jobstep[$job]->{stderr}) > 16384)
1146 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1155 sub preprocess_stderr
1159 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1161 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1162 Log ($job, "stderr $line");
1163 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1165 $main::please_freeze = 1;
1167 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1168 $jobstep[$job]->{node_fail} = 1;
1169 ban_node_by_slot($jobstep[$job]->{slotindex});
1178 my $task_success = shift;
1179 preprocess_stderr ($job);
1182 Log ($job, "stderr $_");
1183 } split ("\n", $jobstep[$job]->{stderr});
1189 my ($keep, $child_out, $output_block);
1191 my $cmd = "arv-get \Q$hash\E";
1192 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1196 my $bytes = sysread($keep, $buf, 1024 * 1024);
1197 if (!defined $bytes) {
1198 die "reading from arv-get: $!";
1199 } elsif ($bytes == 0) {
1200 # sysread returns 0 at the end of the pipe.
1203 # some bytes were read into buf.
1204 $output_block .= $buf;
1208 return $output_block;
1213 Log (undef, "collate");
1215 my ($child_out, $child_in);
1216 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1217 '--retries', put_retry_count());
1221 next if (!exists $_->{'arvados_task'}->{'output'} ||
1222 !$_->{'arvados_task'}->{'success'});
1223 my $output = $_->{'arvados_task'}->{output};
1224 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1226 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1227 print $child_in $output;
1229 elsif (@jobstep == 1)
1231 $joboutput = $output;
1234 elsif (defined (my $outblock = fetch_block ($output)))
1236 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1237 print $child_in $outblock;
1241 Log (undef, "XXX fetch_block($output) failed XXX");
1247 if (!defined $joboutput) {
1248 my $s = IO::Select->new($child_out);
1249 if ($s->can_read(120)) {
1250 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1253 Log (undef, "timed out reading from 'arv-put'");
1266 my $sig = 2; # SIGINT first
1267 if (exists $proc{$_}->{"sent_$sig"} &&
1268 time - $proc{$_}->{"sent_$sig"} > 4)
1270 $sig = 15; # SIGTERM if SIGINT doesn't work
1272 if (exists $proc{$_}->{"sent_$sig"} &&
1273 time - $proc{$_}->{"sent_$sig"} > 4)
1275 $sig = 9; # SIGKILL if SIGTERM doesn't work
1277 if (!exists $proc{$_}->{"sent_$sig"})
1279 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1281 select (undef, undef, undef, 0.1);
1284 kill $sig, $_; # srun wants two SIGINT to really interrupt
1286 $proc{$_}->{"sent_$sig"} = time;
1287 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1297 vec($bits,fileno($_),1) = 1;
1303 sub Log # ($jobstep_id, $logmessage)
1305 if ($_[1] =~ /\n/) {
1306 for my $line (split (/\n/, $_[1])) {
1311 my $fh = select STDERR; $|=1; select $fh;
1312 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1313 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1316 if ($local_logfile || -t STDERR) {
1317 my @gmtime = gmtime;
1318 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1319 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1321 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1323 if ($local_logfile) {
1324 print $local_logfile $datetime . " " . $message;
1331 my ($package, $file, $line) = caller;
1332 my $message = "@_ at $file line $line\n";
1333 Log (undef, $message);
1334 freeze() if @jobstep_todo;
1335 collate_output() if @jobstep_todo;
1337 save_meta() if $local_logfile;
1344 return if !$job_has_uuid;
1345 if ($Job->{'state'} eq 'Cancelled') {
1346 $Job->update_attributes('finished_at' => scalar gmtime);
1348 $Job->update_attributes('state' => 'Failed');
1355 my $justcheckpoint = shift; # false if this will be the last meta saved
1356 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1358 $local_logfile->flush;
1359 my $retry_count = put_retry_count();
1360 my $cmd = "arv-put --portable-data-hash --retries $retry_count " .
1361 "--filename ''\Q$keep_logfile\E " . quotemeta($local_logfile->filename);
1362 my $loglocator = `$cmd`;
1363 die "system $cmd failed: $?" if $?;
1366 $local_logfile = undef; # the temp file is automatically deleted
1367 Log (undef, "log manifest is $loglocator");
1368 $Job->{'log'} = $loglocator;
1369 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1373 sub freeze_if_want_freeze
1375 if ($main::please_freeze)
1377 release_allocation();
1380 # kill some srun procs before freeze+stop
1381 map { $proc{$_} = {} } @_;
1384 killem (keys %proc);
1385 select (undef, undef, undef, 0.1);
1387 while (($died = waitpid (-1, WNOHANG)) > 0)
1389 delete $proc{$died};
1404 Log (undef, "Freeze not implemented");
1411 croak ("Thaw not implemented");
1427 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1434 my $srunargs = shift;
1435 my $execargs = shift;
1436 my $opts = shift || {};
1438 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1439 print STDERR (join (" ",
1440 map { / / ? "'$_'" : $_ }
1443 if $ENV{CRUNCH_DEBUG};
1445 if (defined $stdin) {
1446 my $child = open STDIN, "-|";
1447 defined $child or die "no fork: $!";
1449 print $stdin or die $!;
1450 close STDOUT or die $!;
1455 return system (@$args) if $opts->{fork};
1458 warn "ENV size is ".length(join(" ",%ENV));
1459 die "exec failed: $!: @$args";
1463 sub ban_node_by_slot {
1464 # Don't start any new jobsteps on this node for 60 seconds
1466 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1467 $slot[$slotid]->{node}->{hold_count}++;
1468 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1473 my ($lockfile, $error_message) = @_;
1474 open L, ">", $lockfile or croak("$lockfile: $!");
1475 if (!flock L, LOCK_EX|LOCK_NB) {
1476 croak("Can't lock $lockfile: $error_message\n");
1480 sub find_docker_image {
1481 # Given a Keep locator, check to see if it contains a Docker image.
1482 # If so, return its stream name and Docker hash.
1483 # If not, return undef for both values.
1484 my $locator = shift;
1485 my ($streamname, $filename);
1486 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1487 foreach my $line (split(/\n/, $image->{manifest_text})) {
1488 my @tokens = split(/\s+/, $line);
1490 $streamname = shift(@tokens);
1491 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1492 if (defined($filename)) {
1493 return (undef, undef); # More than one file in the Collection.
1495 $filename = (split(/:/, $filedata, 3))[2];
1500 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1501 return ($streamname, $1);
1503 return (undef, undef);
1507 sub put_retry_count {
1508 # Calculate a --retries argument for arv-put that will have it try
1509 # approximately as long as this Job has been running.
1510 my $stoptime = shift || time;
1511 my $starttime = $jobstep[0]->{starttime};
1512 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1514 while ($timediff >= 2) {
1518 return ($retries > 3) ? $retries : 3;
1524 # checkout-and-build
1527 use File::Path qw( make_path );
1529 my $destdir = $ENV{"CRUNCH_SRC"};
1530 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1531 my $repo = $ENV{"CRUNCH_SRC_URL"};
1532 my $task_work = $ENV{"TASK_WORK"};
1534 for my $dir ($destdir, $task_work) {
1537 -e $dir or die "Failed to create temporary directory ($dir): $!";
1541 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1543 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1546 die "Cannot exec `@ARGV`: $!";
1552 unlink "$destdir.commit";
1553 open STDOUT, ">", "$destdir.log";
1554 open STDERR, ">&STDOUT";
1557 my @git_archive_data = <DATA>;
1558 if (@git_archive_data) {
1559 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1560 print TARX @git_archive_data;
1562 die "'tar -C $destdir -xf -' exited $?: $!";
1567 chomp ($pwd = `pwd`);
1568 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1571 for my $src_path ("$destdir/arvados/sdk/python") {
1573 shell_or_die ("virtualenv", $install_dir);
1574 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1578 if (-e "$destdir/crunch_scripts/install") {
1579 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1580 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1582 shell_or_die ("./tests/autotests.sh", $install_dir);
1583 } elsif (-e "./install.sh") {
1584 shell_or_die ("./install.sh", $install_dir);
1588 unlink "$destdir.commit.new";
1589 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1590 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1597 die "Cannot exec `@ARGV`: $!";
1604 if ($ENV{"DEBUG"}) {
1605 print STDERR "@_\n";
1608 or die "@_ failed: $! exit 0x".sprintf("%x",$?);