2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use POSIX qw(strftime);
78 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
80 use Digest::MD5 qw(md5_hex);
86 use File::Path qw( make_path );
88 use constant EX_TEMPFAIL => 75;
90 $ENV{"TMPDIR"} ||= "/tmp";
91 unless (defined $ENV{"CRUNCH_TMP"}) {
92 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
93 if ($ENV{"USER"} ne "crunch" && $< != 0) {
94 # use a tmp dir unique for my uid
95 $ENV{"CRUNCH_TMP"} .= "-$<";
99 # Create the tmp directory if it does not exist
100 if ( ! -d $ENV{"CRUNCH_TMP"} ) {
101 make_path $ENV{"CRUNCH_TMP"} or die "Failed to create temporary working directory: " . $ENV{"CRUNCH_TMP"};
104 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
105 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
106 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
107 mkdir ($ENV{"JOB_WORK"});
115 GetOptions('force-unlock' => \$force_unlock,
116 'git-dir=s' => \$git_dir,
117 'job=s' => \$jobspec,
118 'job-api-token=s' => \$job_api_token,
119 'no-clear-tmp' => \$no_clear_tmp,
120 'resume-stash=s' => \$resume_stash,
123 if (defined $job_api_token) {
124 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
127 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
128 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
129 my $local_job = !$job_has_uuid;
134 $main::ENV{CRUNCH_DEBUG} = 1;
138 $main::ENV{CRUNCH_DEBUG} = 0;
143 my $arv = Arvados->new('apiVersion' => 'v1');
145 my $User = $arv->{'users'}->{'current'}->execute;
153 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
154 if (!$force_unlock) {
155 # Claim this job, and make sure nobody else does
157 # lock() sets is_locked_by_uuid and changes state to Running.
158 $arv->{'jobs'}->{'lock'}->execute('uuid' => $Job->{'uuid'})
161 Log(undef, "Error while locking job, exiting ".EX_TEMPFAIL);
168 $Job = JSON::decode_json($jobspec);
172 map { croak ("No $_ specified") unless $Job->{$_} }
173 qw(script script_version script_parameters);
176 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
177 $Job->{'started_at'} = gmtime;
179 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
183 $job_id = $Job->{'uuid'};
185 my $keep_logfile = $job_id . '.log.txt';
186 log_writer_start($keep_logfile);
188 $Job->{'runtime_constraints'} ||= {};
189 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
190 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
193 Log (undef, "check slurm allocation");
196 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
200 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
201 push @sinfo, "$localcpus localhost";
203 if (exists $ENV{SLURM_NODELIST})
205 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
209 my ($ncpus, $slurm_nodelist) = split;
210 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
213 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
216 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
219 foreach (split (",", $ranges))
232 push @nodelist, map {
234 $n =~ s/\[[-,\d]+\]/$_/;
241 push @nodelist, $nodelist;
244 foreach my $nodename (@nodelist)
246 Log (undef, "node $nodename - $ncpus slots");
247 my $node = { name => $nodename,
251 foreach my $cpu (1..$ncpus)
253 push @slot, { node => $node,
257 push @node, @nodelist;
262 # Ensure that we get one jobstep running on each allocated node before
263 # we start overloading nodes with concurrent steps
265 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
268 $Job->update_attributes(
269 'tasks_summary' => { 'failed' => 0,
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
306 if (defined $Job->{thawedfromkey})
308 thaw ($Job->{thawedfromkey});
312 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313 'job_uuid' => $Job->{'uuid'},
318 push @jobstep, { 'level' => 0,
320 'arvados_task' => $first_task,
322 push @jobstep_todo, 0;
328 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
337 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
340 if (!defined $no_clear_tmp) {
341 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
342 system($clear_tmp_cmd) == 0
343 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
345 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
346 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
348 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
349 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
350 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
352 or croak ("setup.py in $src_path failed: exit ".($?>>8));
360 $build_script = <DATA>;
362 Log (undef, "Install revision ".$Job->{script_version});
363 my $nodelist = join(",", @node);
365 if (!defined $no_clear_tmp) {
366 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
368 my $cleanpid = fork();
371 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
372 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then for i in $JOB_WORK/*keep; do /bin/fusermount -z -u $i; done; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
377 last if $cleanpid == waitpid (-1, WNOHANG);
378 freeze_if_want_freeze ($cleanpid);
379 select (undef, undef, undef, 0.1);
381 Log (undef, "Clean-work-dir exited $?");
384 # Install requested code version
387 my @srunargs = ("srun",
388 "--nodelist=$nodelist",
389 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
391 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
392 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
396 my $treeish = $Job->{'script_version'};
398 # If we're running under crunch-dispatch, it will have pulled the
399 # appropriate source tree into its own repository, and given us that
400 # repo's path as $git_dir. If we're running a "local" job, and a
401 # script_version was specified, it's up to the user to provide the
402 # full path to a local repository in Job->{repository}.
404 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
405 # git-archive --remote where appropriate.
407 # TODO: Accept a locally-hosted Arvados repository by name or
408 # UUID. Use arvados.v1.repositories.list or .get to figure out the
409 # appropriate fetch-url.
410 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
412 $ENV{"CRUNCH_SRC_URL"} = $repo;
414 if (-d "$repo/.git") {
415 # We were given a working directory, but we are only interested in
417 $repo = "$repo/.git";
420 # If this looks like a subversion r#, look for it in git-svn commit messages
422 if ($treeish =~ m{^\d{1,4}$}) {
423 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
425 Log(undef, "git Subversion search exited $?");
426 if (($? == 0) && ($gitlog =~ /^[a-f0-9]{40}$/)) {
428 Log(undef, "Using commit $commit for Subversion revision $treeish");
432 # If that didn't work, try asking git to look it up as a tree-ish.
434 if (!defined $commit) {
435 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
437 Log(undef, "git rev-list exited $? with result '$found'");
438 if (($? == 0) && ($found =~ /^[0-9a-f]{40}$/s)) {
440 Log(undef, "Using commit $commit for tree-ish $treeish");
441 if ($commit ne $treeish) {
442 # Make sure we record the real commit id in the database,
443 # frozentokey, logs, etc. -- instead of an abbreviation or a
444 # branch name which can become ambiguous or point to a
445 # different commit in the future.
446 $Job->{'script_version'} = $commit;
448 $Job->update_attributes('script_version' => $commit) or
449 croak("Error while updating job");
454 if (defined $commit) {
455 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
456 @execargs = ("sh", "-c",
457 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
458 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
459 croak("git archive failed: exit " . ($? >> 8)) if ($? != 0);
462 croak ("could not figure out commit id for $treeish");
465 # Note: this section is almost certainly unnecessary if we're
466 # running tasks in docker containers.
467 my $installpid = fork();
468 if ($installpid == 0)
470 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
475 last if $installpid == waitpid (-1, WNOHANG);
476 freeze_if_want_freeze ($installpid);
477 select (undef, undef, undef, 0.1);
479 Log (undef, "Install exited $?");
484 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
485 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
488 # If this job requires a Docker image, install that.
489 my $docker_bin = "/usr/bin/docker.io";
490 my ($docker_locator, $docker_stream, $docker_hash);
491 if ($docker_locator = $Job->{docker_image_locator}) {
492 ($docker_stream, $docker_hash) = find_docker_image($docker_locator);
495 croak("No Docker image hash found from locator $docker_locator");
497 $docker_stream =~ s/^\.//;
498 my $docker_install_script = qq{
499 if ! $docker_bin images -q --no-trunc | grep -qxF \Q$docker_hash\E; then
500 arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
503 my $docker_pid = fork();
504 if ($docker_pid == 0)
506 srun (["srun", "--nodelist=" . join(',', @node)],
507 ["/bin/sh", "-ec", $docker_install_script]);
512 last if $docker_pid == waitpid (-1, WNOHANG);
513 freeze_if_want_freeze ($docker_pid);
514 select (undef, undef, undef, 0.1);
518 croak("Installing Docker image from $docker_locator returned exit code $?");
522 foreach (qw (script script_version script_parameters runtime_constraints))
526 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
528 foreach (split (/\n/, $Job->{knobs}))
530 Log (undef, "knob " . $_);
535 $main::success = undef;
541 my $thisround_succeeded = 0;
542 my $thisround_failed = 0;
543 my $thisround_failed_multiple = 0;
545 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
546 or $a <=> $b } @jobstep_todo;
547 my $level = $jobstep[$jobstep_todo[0]]->{level};
548 Log (undef, "start level $level");
553 my @freeslot = (0..$#slot);
556 my $progress_is_dirty = 1;
557 my $progress_stats_updated = 0;
559 update_progress_stats();
564 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
566 my $id = $jobstep_todo[$todo_ptr];
567 my $Jobstep = $jobstep[$id];
568 if ($Jobstep->{level} != $level)
573 pipe $reader{$id}, "writer" or croak ($!);
574 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
575 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
577 my $childslot = $freeslot[0];
578 my $childnode = $slot[$childslot]->{node};
579 my $childslotname = join (".",
580 $slot[$childslot]->{node}->{name},
581 $slot[$childslot]->{cpu});
582 my $childpid = fork();
585 $SIG{'INT'} = 'DEFAULT';
586 $SIG{'QUIT'} = 'DEFAULT';
587 $SIG{'TERM'} = 'DEFAULT';
589 foreach (values (%reader))
593 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
594 open(STDOUT,">&writer");
595 open(STDERR,">&writer");
600 delete $ENV{"GNUPGHOME"};
601 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
602 $ENV{"TASK_QSEQUENCE"} = $id;
603 $ENV{"TASK_SEQUENCE"} = $level;
604 $ENV{"JOB_SCRIPT"} = $Job->{script};
605 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
606 $param =~ tr/a-z/A-Z/;
607 $ENV{"JOB_PARAMETER_$param"} = $value;
609 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
610 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
611 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
612 $ENV{"HOME"} = $ENV{"TASK_WORK"};
613 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
614 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
615 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
616 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
622 "--nodelist=".$childnode->{name},
623 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
624 "--job-name=$job_id.$id.$$",
626 my $build_script_to_send = "";
628 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
629 ."mkdir -p $ENV{CRUNCH_TMP} $ENV{JOB_WORK} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
630 ."&& cd $ENV{CRUNCH_TMP} ";
633 $build_script_to_send = $build_script;
637 $command .= "&& exec arv-mount --by-id --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
640 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -cgroup-parent=docker -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=10000 ";
641 $command .= "$docker_bin run --rm=true --attach=stdout --attach=stderr --user=crunch --cidfile=$ENV{TASK_WORK}/docker.cid ";
642 # Dynamically configure the container to use the host system as its
643 # DNS server. Get the host's global addresses from the ip command,
644 # and turn them into docker --dns options using gawk.
646 q{$(ip -o address show scope global |
647 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
648 $command .= "--volume=\Q$ENV{CRUNCH_SRC}:/tmp/crunch-src:ro\E ";
649 $command .= "--volume=\Q$ENV{TASK_KEEPMOUNT}:/keep:ro\E ";
650 $command .= "--env=\QHOME=/home/crunch\E ";
651 while (my ($env_key, $env_val) = each %ENV)
653 if ($env_key =~ /^(ARVADOS|JOB|TASK)_/) {
654 if ($env_key eq "TASK_WORK") {
655 $command .= "--env=\QTASK_WORK=/tmp/crunch-job\E ";
657 elsif ($env_key eq "TASK_KEEPMOUNT") {
658 $command .= "--env=\QTASK_KEEPMOUNT=/keep\E ";
661 $command .= "--env=\Q$env_key=$env_val\E ";
665 $command .= "--env=\QCRUNCH_NODE_SLOTS=$ENV{CRUNCH_NODE_SLOTS}\E ";
666 $command .= "--env=\QCRUNCH_SRC=/tmp/crunch-src\E ";
667 $command .= "\Q$docker_hash\E ";
668 $command .= "stdbuf --output=0 --error=0 ";
669 $command .= "/tmp/crunch-src/crunch_scripts/" . $Job->{"script"};
672 $command .= "crunchstat -cgroup-root=/sys/fs/cgroup -poll=10000 ";
673 $command .= "stdbuf --output=0 --error=0 ";
674 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
677 my @execargs = ('bash', '-c', $command);
678 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
679 # exec() failed, we assume nothing happened.
680 Log(undef, "srun() failed on build script");
684 if (!defined $childpid)
691 $proc{$childpid} = { jobstep => $id,
694 jobstepname => "$job_id.$id.$childpid",
696 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
697 $slot[$childslot]->{pid} = $childpid;
699 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
700 Log ($id, "child $childpid started on $childslotname");
701 $Jobstep->{starttime} = time;
702 $Jobstep->{node} = $childnode->{name};
703 $Jobstep->{slotindex} = $childslot;
704 delete $Jobstep->{stderr};
705 delete $Jobstep->{finishtime};
707 $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
708 $Jobstep->{'arvados_task'}->save;
710 splice @jobstep_todo, $todo_ptr, 1;
713 $progress_is_dirty = 1;
717 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
719 last THISROUND if $main::please_freeze;
720 if ($main::please_info)
722 $main::please_info = 0;
726 update_progress_stats();
733 check_refresh_wanted();
735 update_progress_stats();
736 select (undef, undef, undef, 0.1);
738 elsif (time - $progress_stats_updated >= 30)
740 update_progress_stats();
742 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
743 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
745 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
746 .($thisround_failed+$thisround_succeeded)
747 .") -- giving up on this round";
748 Log (undef, $message);
752 # move slots from freeslot to holdslot (or back to freeslot) if necessary
753 for (my $i=$#freeslot; $i>=0; $i--) {
754 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
755 push @holdslot, (splice @freeslot, $i, 1);
758 for (my $i=$#holdslot; $i>=0; $i--) {
759 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
760 push @freeslot, (splice @holdslot, $i, 1);
764 # give up if no nodes are succeeding
765 if (!grep { $_->{node}->{losing_streak} == 0 &&
766 $_->{node}->{hold_count} < 4 } @slot) {
767 my $message = "Every node has failed -- giving up on this round";
768 Log (undef, $message);
775 push @freeslot, splice @holdslot;
776 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
779 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
782 if ($main::please_continue) {
783 $main::please_continue = 0;
786 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
790 check_refresh_wanted();
792 update_progress_stats();
793 select (undef, undef, undef, 0.1);
794 killem (keys %proc) if $main::please_freeze;
798 update_progress_stats();
799 freeze_if_want_freeze();
802 if (!defined $main::success)
805 $thisround_succeeded == 0 &&
806 ($thisround_failed == 0 || $thisround_failed > 4))
808 my $message = "stop because $thisround_failed tasks failed and none succeeded";
809 Log (undef, $message);
818 goto ONELEVEL if !defined $main::success;
821 release_allocation();
823 my $collated_output = &collate_output();
825 if (!$collated_output) {
826 Log(undef, "output undef");
830 open(my $orig_manifest, '-|', 'arv-get', $collated_output)
831 or die "failed to get collated manifest: $!";
832 my $orig_manifest_text = '';
833 while (my $manifest_line = <$orig_manifest>) {
834 $orig_manifest_text .= $manifest_line;
836 my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
837 'manifest_text' => $orig_manifest_text,
839 Log(undef, "output uuid " . $output->{uuid});
840 Log(undef, "output hash " . $output->{portable_data_hash});
841 $Job->update_attributes('output' => $output->{portable_data_hash}) if $job_has_uuid;
844 Log (undef, "Failed to register output manifest: $@");
848 Log (undef, "finish");
853 if ($collated_output && $main::success) {
854 $Job->update_attributes('state' => 'Complete')
856 $Job->update_attributes('state' => 'Failed')
860 exit ($Job->{'state'} != 'Complete' ? 1 : 0);
864 sub update_progress_stats
866 $progress_stats_updated = time;
867 return if !$progress_is_dirty;
868 my ($todo, $done, $running) = (scalar @jobstep_todo,
869 scalar @jobstep_done,
870 scalar @slot - scalar @freeslot - scalar @holdslot);
871 $Job->{'tasks_summary'} ||= {};
872 $Job->{'tasks_summary'}->{'todo'} = $todo;
873 $Job->{'tasks_summary'}->{'done'} = $done;
874 $Job->{'tasks_summary'}->{'running'} = $running;
876 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
878 Log (undef, "status: $done done, $running running, $todo todo");
879 $progress_is_dirty = 0;
886 my $pid = waitpid (-1, WNOHANG);
887 return 0 if $pid <= 0;
889 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
891 . $slot[$proc{$pid}->{slot}]->{cpu});
892 my $jobstepid = $proc{$pid}->{jobstep};
893 my $elapsed = time - $proc{$pid}->{time};
894 my $Jobstep = $jobstep[$jobstepid];
896 my $childstatus = $?;
897 my $exitvalue = $childstatus >> 8;
898 my $exitinfo = sprintf("exit %d signal %d%s",
901 ($childstatus & 128 ? ' core dump' : ''));
902 $Jobstep->{'arvados_task'}->reload;
903 my $task_success = $Jobstep->{'arvados_task'}->{success};
905 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
907 if (!defined $task_success) {
908 # task did not indicate one way or the other --> fail
909 $Jobstep->{'arvados_task'}->{success} = 0;
910 $Jobstep->{'arvados_task'}->save;
917 $temporary_fail ||= $Jobstep->{node_fail};
918 $temporary_fail ||= ($exitvalue == 111);
921 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
923 # Check for signs of a failed or misconfigured node
924 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
925 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
926 # Don't count this against jobstep failure thresholds if this
927 # node is already suspected faulty and srun exited quickly
928 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
930 Log ($jobstepid, "blaming failure on suspect node " .
931 $slot[$proc{$pid}->{slot}]->{node}->{name});
932 $temporary_fail ||= 1;
934 ban_node_by_slot($proc{$pid}->{slot});
937 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
938 ++$Jobstep->{'failures'},
939 $temporary_fail ? 'temporary ' : 'permanent',
942 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
943 # Give up on this task, and the whole job
945 $main::please_freeze = 1;
948 # Put this task back on the todo queue
949 push @jobstep_todo, $jobstepid;
951 $Job->{'tasks_summary'}->{'failed'}++;
955 ++$thisround_succeeded;
956 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
957 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
958 push @jobstep_done, $jobstepid;
959 Log ($jobstepid, "success in $elapsed seconds");
961 $Jobstep->{exitcode} = $childstatus;
962 $Jobstep->{finishtime} = time;
963 $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
964 $Jobstep->{'arvados_task'}->save;
965 process_stderr ($jobstepid, $task_success);
966 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
968 close $reader{$jobstepid};
969 delete $reader{$jobstepid};
970 delete $slot[$proc{$pid}->{slot}]->{pid};
971 push @freeslot, $proc{$pid}->{slot};
976 my $newtask_list = [];
979 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
981 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
983 'order' => 'qsequence',
984 'offset' => scalar(@$newtask_list),
986 push(@$newtask_list, @{$newtask_results->{items}});
987 } while (@{$newtask_results->{items}});
988 foreach my $arvados_task (@$newtask_list) {
990 'level' => $arvados_task->{'sequence'},
992 'arvados_task' => $arvados_task
994 push @jobstep, $jobstep;
995 push @jobstep_todo, $#jobstep;
999 $progress_is_dirty = 1;
1003 sub check_refresh_wanted
1005 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
1006 if (@stat && $stat[9] > $latest_refresh) {
1007 $latest_refresh = scalar time;
1008 if ($job_has_uuid) {
1009 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
1010 for my $attr ('cancelled_at',
1011 'cancelled_by_user_uuid',
1012 'cancelled_by_client_uuid',
1014 $Job->{$attr} = $Job2->{$attr};
1016 if ($Job->{'state'} ne "Running") {
1017 if ($Job->{'state'} eq "Cancelled") {
1018 Log (undef, "Job cancelled at " . $Job->{'cancelled_at'} . " by user " . $Job->{'cancelled_by_user_uuid'});
1020 Log (undef, "Job state unexpectedly changed to " . $Job->{'state'});
1023 $main::please_freeze = 1;
1031 # return if the kill list was checked <4 seconds ago
1032 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1036 $squeue_kill_checked = time;
1038 # use killem() on procs whose killtime is reached
1041 if (exists $proc{$_}->{killtime}
1042 && $proc{$_}->{killtime} <= time)
1048 # return if the squeue was checked <60 seconds ago
1049 if (defined $squeue_checked && $squeue_checked > time - 60)
1053 $squeue_checked = time;
1057 # here is an opportunity to check for mysterious problems with local procs
1061 # get a list of steps still running
1062 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1064 if ($squeue[-1] ne "ok")
1070 # which of my jobsteps are running, according to squeue?
1074 if (/^(\d+)\.(\d+) (\S+)/)
1076 if ($1 eq $ENV{SLURM_JOBID})
1083 # which of my active child procs (>60s old) were not mentioned by squeue?
1084 foreach (keys %proc)
1086 if ($proc{$_}->{time} < time - 60
1087 && !exists $ok{$proc{$_}->{jobstepname}}
1088 && !exists $proc{$_}->{killtime})
1090 # kill this proc if it hasn't exited in 30 seconds
1091 $proc{$_}->{killtime} = time + 30;
1097 sub release_allocation
1101 Log (undef, "release job allocation");
1102 system "scancel $ENV{SLURM_JOBID}";
1110 foreach my $job (keys %reader)
1113 while (0 < sysread ($reader{$job}, $buf, 8192))
1115 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1116 $jobstep[$job]->{stderr} .= $buf;
1117 preprocess_stderr ($job);
1118 if (length ($jobstep[$job]->{stderr}) > 16384)
1120 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1129 sub preprocess_stderr
1133 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1135 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1136 Log ($job, "stderr $line");
1137 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1139 $main::please_freeze = 1;
1141 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1142 $jobstep[$job]->{node_fail} = 1;
1143 ban_node_by_slot($jobstep[$job]->{slotindex});
1152 my $task_success = shift;
1153 preprocess_stderr ($job);
1156 Log ($job, "stderr $_");
1157 } split ("\n", $jobstep[$job]->{stderr});
1163 my ($keep, $child_out, $output_block);
1165 my $cmd = "arv-get \Q$hash\E";
1166 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1170 my $bytes = sysread($keep, $buf, 1024 * 1024);
1171 if (!defined $bytes) {
1172 die "reading from arv-get: $!";
1173 } elsif ($bytes == 0) {
1174 # sysread returns 0 at the end of the pipe.
1177 # some bytes were read into buf.
1178 $output_block .= $buf;
1182 return $output_block;
1187 Log (undef, "collate");
1189 my ($child_out, $child_in);
1190 my $pid = open2($child_out, $child_in, 'arv-put', '--raw',
1191 '--retries', put_retry_count());
1195 next if (!exists $_->{'arvados_task'}->{'output'} ||
1196 !$_->{'arvados_task'}->{'success'});
1197 my $output = $_->{'arvados_task'}->{output};
1198 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1200 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1201 print $child_in $output;
1203 elsif (@jobstep == 1)
1205 $joboutput = $output;
1208 elsif (defined (my $outblock = fetch_block ($output)))
1210 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1211 print $child_in $outblock;
1215 Log (undef, "XXX fetch_block($output) failed XXX");
1221 if (!defined $joboutput) {
1222 my $s = IO::Select->new($child_out);
1223 if ($s->can_read(120)) {
1224 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1227 Log (undef, "timed out reading from 'arv-put'");
1240 my $sig = 2; # SIGINT first
1241 if (exists $proc{$_}->{"sent_$sig"} &&
1242 time - $proc{$_}->{"sent_$sig"} > 4)
1244 $sig = 15; # SIGTERM if SIGINT doesn't work
1246 if (exists $proc{$_}->{"sent_$sig"} &&
1247 time - $proc{$_}->{"sent_$sig"} > 4)
1249 $sig = 9; # SIGKILL if SIGTERM doesn't work
1251 if (!exists $proc{$_}->{"sent_$sig"})
1253 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1255 select (undef, undef, undef, 0.1);
1258 kill $sig, $_; # srun wants two SIGINT to really interrupt
1260 $proc{$_}->{"sent_$sig"} = time;
1261 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1271 vec($bits,fileno($_),1) = 1;
1277 # Send log output to Keep via arv-put.
1279 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
1280 # $log_pipe_pid is the pid of the arv-put subprocess.
1282 # The only functions that should access these variables directly are:
1284 # log_writer_start($logfilename)
1285 # Starts an arv-put pipe, reading data on stdin and writing it to
1286 # a $logfilename file in an output collection.
1288 # log_writer_send($txt)
1289 # Writes $txt to the output log collection.
1291 # log_writer_finish()
1292 # Closes the arv-put pipe and returns the output that it produces.
1294 # log_writer_is_active()
1295 # Returns a true value if there is currently a live arv-put
1296 # process, false otherwise.
1298 my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
1300 sub log_writer_start($)
1302 my $logfilename = shift;
1303 $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
1304 'arv-put', '--portable-data-hash',
1306 '--filename', $logfilename,
1310 sub log_writer_send($)
1313 print $log_pipe_in $txt;
1316 sub log_writer_finish()
1318 return unless $log_pipe_pid;
1320 close($log_pipe_in);
1323 my $s = IO::Select->new($log_pipe_out);
1324 if ($s->can_read(120)) {
1325 sysread($log_pipe_out, $arv_put_output, 1024);
1326 chomp($arv_put_output);
1328 Log (undef, "timed out reading from 'arv-put'");
1331 waitpid($log_pipe_pid, 0);
1332 $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
1334 Log("log_writer_finish: arv-put returned error $?")
1337 return $arv_put_output;
1340 sub log_writer_is_active() {
1341 return $log_pipe_pid;
1344 sub Log # ($jobstep_id, $logmessage)
1346 if ($_[1] =~ /\n/) {
1347 for my $line (split (/\n/, $_[1])) {
1352 my $fh = select STDERR; $|=1; select $fh;
1353 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1354 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1357 if (log_writer_is_active() || -t STDERR) {
1358 my @gmtime = gmtime;
1359 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1360 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1362 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1364 if (log_writer_is_active()) {
1365 log_writer_send($datetime . " " . $message);
1372 my ($package, $file, $line) = caller;
1373 my $message = "@_ at $file line $line\n";
1374 Log (undef, $message);
1375 freeze() if @jobstep_todo;
1376 collate_output() if @jobstep_todo;
1378 save_meta() if log_writer_is_active();
1385 return if !$job_has_uuid;
1386 if ($Job->{'state'} eq 'Cancelled') {
1387 $Job->update_attributes('finished_at' => scalar gmtime);
1389 $Job->update_attributes('state' => 'Failed');
1396 my $justcheckpoint = shift; # false if this will be the last meta saved
1397 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1399 my $loglocator = log_writer_finish();
1400 Log (undef, "log manifest is $loglocator");
1401 $Job->{'log'} = $loglocator;
1402 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1406 sub freeze_if_want_freeze
1408 if ($main::please_freeze)
1410 release_allocation();
1413 # kill some srun procs before freeze+stop
1414 map { $proc{$_} = {} } @_;
1417 killem (keys %proc);
1418 select (undef, undef, undef, 0.1);
1420 while (($died = waitpid (-1, WNOHANG)) > 0)
1422 delete $proc{$died};
1437 Log (undef, "Freeze not implemented");
1444 croak ("Thaw not implemented");
1460 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1467 my $srunargs = shift;
1468 my $execargs = shift;
1469 my $opts = shift || {};
1471 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1472 print STDERR (join (" ",
1473 map { / / ? "'$_'" : $_ }
1476 if $ENV{CRUNCH_DEBUG};
1478 if (defined $stdin) {
1479 my $child = open STDIN, "-|";
1480 defined $child or die "no fork: $!";
1482 print $stdin or die $!;
1483 close STDOUT or die $!;
1488 return system (@$args) if $opts->{fork};
1491 warn "ENV size is ".length(join(" ",%ENV));
1492 die "exec failed: $!: @$args";
1496 sub ban_node_by_slot {
1497 # Don't start any new jobsteps on this node for 60 seconds
1499 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1500 $slot[$slotid]->{node}->{hold_count}++;
1501 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1506 my ($lockfile, $error_message) = @_;
1507 open L, ">", $lockfile or croak("$lockfile: $!");
1508 if (!flock L, LOCK_EX|LOCK_NB) {
1509 croak("Can't lock $lockfile: $error_message\n");
1513 sub find_docker_image {
1514 # Given a Keep locator, check to see if it contains a Docker image.
1515 # If so, return its stream name and Docker hash.
1516 # If not, return undef for both values.
1517 my $locator = shift;
1518 my ($streamname, $filename);
1519 if (my $image = $arv->{collections}->{get}->execute(uuid => $locator)) {
1520 foreach my $line (split(/\n/, $image->{manifest_text})) {
1521 my @tokens = split(/\s+/, $line);
1523 $streamname = shift(@tokens);
1524 foreach my $filedata (grep(/^\d+:\d+:/, @tokens)) {
1525 if (defined($filename)) {
1526 return (undef, undef); # More than one file in the Collection.
1528 $filename = (split(/:/, $filedata, 3))[2];
1533 if (defined($filename) and ($filename =~ /^([0-9A-Fa-f]{64})\.tar$/)) {
1534 return ($streamname, $1);
1536 return (undef, undef);
1540 sub put_retry_count {
1541 # Calculate a --retries argument for arv-put that will have it try
1542 # approximately as long as this Job has been running.
1543 my $stoptime = shift || time;
1544 my $starttime = $jobstep[0]->{starttime};
1545 my $timediff = defined($starttime) ? ($stoptime - $starttime) : 1;
1547 while ($timediff >= 2) {
1551 return ($retries > 3) ? $retries : 3;
1557 # checkout-and-build
1560 use File::Path qw( make_path );
1562 my $destdir = $ENV{"CRUNCH_SRC"};
1563 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1564 my $repo = $ENV{"CRUNCH_SRC_URL"};
1565 my $task_work = $ENV{"TASK_WORK"};
1567 for my $dir ($destdir, $task_work) {
1570 -e $dir or die "Failed to create temporary directory ($dir): $!";
1574 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1576 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1579 die "Cannot exec `@ARGV`: $!";
1585 unlink "$destdir.commit";
1586 open STDOUT, ">", "$destdir.log";
1587 open STDERR, ">&STDOUT";
1590 my @git_archive_data = <DATA>;
1591 if (@git_archive_data) {
1592 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1593 print TARX @git_archive_data;
1595 die "'tar -C $destdir -xf -' exited $?: $!";
1600 chomp ($pwd = `pwd`);
1601 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1604 for my $src_path ("$destdir/arvados/sdk/python") {
1606 shell_or_die ("virtualenv", $install_dir);
1607 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1611 if (-e "$destdir/crunch_scripts/install") {
1612 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1613 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1615 shell_or_die ("./tests/autotests.sh", $install_dir);
1616 } elsif (-e "./install.sh") {
1617 shell_or_die ("./install.sh", $install_dir);
1621 unlink "$destdir.commit.new";
1622 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1623 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1630 die "Cannot exec `@ARGV`: $!";
1637 if ($ENV{"DEBUG"}) {
1638 print STDERR "@_\n";
1641 or die "@_ failed: $! exit 0x".sprintf("%x",$?);