2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
44 =head1 RUNNING JOBS LOCALLY
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
50 If the job succeeds, the job's output locator is printed on stdout.
52 While the job is running, the following signals are accepted:
56 =item control-C, SIGINT, SIGQUIT
58 Save a checkpoint, terminate any job tasks that are running, and stop.
62 Save a checkpoint and continue.
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88 if ($ENV{"USER"} ne "crunch" && $< != 0) {
89 # use a tmp dir unique for my uid
90 $ENV{"CRUNCH_TMP"} .= "-$<";
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
100 if (defined $ENV{"ARV_CLI"}) {
101 $arv_cli = $ENV{"ARV_CLI"};
113 GetOptions('force-unlock' => \$force_unlock,
114 'git-dir=s' => \$git_dir,
115 'job=s' => \$jobspec,
116 'job-api-token=s' => \$job_api_token,
117 'no-clear-tmp' => \$no_clear_tmp,
118 'resume-stash=s' => \$resume_stash,
121 if (defined $job_api_token) {
122 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
125 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
126 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
127 my $local_job = !$job_has_uuid;
132 $main::ENV{CRUNCH_DEBUG} = 1;
136 $main::ENV{CRUNCH_DEBUG} = 0;
141 my $arv = Arvados->new('apiVersion' => 'v1');
144 my $User = $arv->{'users'}->{'current'}->execute;
152 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
153 if (!$force_unlock) {
154 if ($Job->{'is_locked_by_uuid'}) {
155 croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
157 if ($Job->{'success'} ne undef) {
158 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
160 if ($Job->{'running'}) {
161 croak("Job 'running' flag is already set");
163 if ($Job->{'started_at'}) {
164 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
170 $Job = JSON::decode_json($jobspec);
174 map { croak ("No $_ specified") unless $Job->{$_} }
175 qw(script script_version script_parameters);
178 $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
179 $Job->{'started_at'} = gmtime;
181 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
185 $job_id = $Job->{'uuid'};
187 my $keep_logfile = $job_id . '.log.txt';
188 $local_logfile = File::Temp->new();
190 $Job->{'runtime_constraints'} ||= {};
191 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
192 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
195 Log (undef, "check slurm allocation");
198 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
202 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
203 push @sinfo, "$localcpus localhost";
205 if (exists $ENV{SLURM_NODELIST})
207 push @sinfo, `sinfo -h --format='%c %N' --nodes=\Q$ENV{SLURM_NODELIST}\E`;
211 my ($ncpus, $slurm_nodelist) = split;
212 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
215 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
218 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
221 foreach (split (",", $ranges))
234 push @nodelist, map {
236 $n =~ s/\[[-,\d]+\]/$_/;
243 push @nodelist, $nodelist;
246 foreach my $nodename (@nodelist)
248 Log (undef, "node $nodename - $ncpus slots");
249 my $node = { name => $nodename,
253 foreach my $cpu (1..$ncpus)
255 push @slot, { node => $node,
259 push @node, @nodelist;
264 # Ensure that we get one jobstep running on each allocated node before
265 # we start overloading nodes with concurrent steps
267 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
274 # Claim this job, and make sure nobody else does
275 unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
276 $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
277 croak("Error while updating / locking job");
279 $Job->update_attributes('started_at' => scalar gmtime,
282 'tasks_summary' => { 'failed' => 0,
289 Log (undef, "start");
290 $SIG{'INT'} = sub { $main::please_freeze = 1; };
291 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
292 $SIG{'TERM'} = \&croak;
293 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
294 $SIG{'ALRM'} = sub { $main::please_info = 1; };
295 $SIG{'CONT'} = sub { $main::please_continue = 1; };
296 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
298 $main::please_freeze = 0;
299 $main::please_info = 0;
300 $main::please_continue = 0;
301 $main::please_refresh = 0;
302 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
304 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
305 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
306 $ENV{"JOB_UUID"} = $job_id;
310 my @jobstep_todo = ();
311 my @jobstep_done = ();
312 my @jobstep_tomerge = ();
313 my $jobstep_tomerge_level = 0;
315 my $squeue_kill_checked;
316 my $output_in_keep = 0;
317 my $latest_refresh = scalar time;
321 if (defined $Job->{thawedfromkey})
323 thaw ($Job->{thawedfromkey});
327 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
328 'job_uuid' => $Job->{'uuid'},
333 push @jobstep, { 'level' => 0,
335 'arvados_task' => $first_task,
337 push @jobstep_todo, 0;
343 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
350 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
352 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
355 if (!defined $no_clear_tmp) {
356 my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
357 system($clear_tmp_cmd) == 0
358 or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
360 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
361 for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
363 system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
364 or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
365 system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
367 or croak ("setup.py in $src_path failed: exit ".($?>>8));
375 $build_script = <DATA>;
377 Log (undef, "Install revision ".$Job->{script_version});
378 my $nodelist = join(",", @node);
380 if (!defined $no_clear_tmp) {
381 # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
383 my $cleanpid = fork();
386 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
387 ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
392 last if $cleanpid == waitpid (-1, WNOHANG);
393 freeze_if_want_freeze ($cleanpid);
394 select (undef, undef, undef, 0.1);
396 Log (undef, "Clean-work-dir exited $?");
399 # Install requested code version
402 my @srunargs = ("srun",
403 "--nodelist=$nodelist",
404 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
406 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
407 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
411 my $treeish = $Job->{'script_version'};
413 # If we're running under crunch-dispatch, it will have pulled the
414 # appropriate source tree into its own repository, and given us that
415 # repo's path as $git_dir. If we're running a "local" job, and a
416 # script_version was specified, it's up to the user to provide the
417 # full path to a local repository in Job->{repository}.
419 # TODO: Accept URLs too, not just local paths. Use git-ls-remote and
420 # git-archive --remote where appropriate.
422 # TODO: Accept a locally-hosted Arvados repository by name or
423 # UUID. Use arvados.v1.repositories.list or .get to figure out the
424 # appropriate fetch-url.
425 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
427 $ENV{"CRUNCH_SRC_URL"} = $repo;
429 if (-d "$repo/.git") {
430 # We were given a working directory, but we are only interested in
432 $repo = "$repo/.git";
435 # If this looks like a subversion r#, look for it in git-svn commit messages
437 if ($treeish =~ m{^\d{1,4}$}) {
438 my $gitlog = `git --git-dir=\Q$repo\E log --pretty="format:%H" --grep="git-svn-id:.*\@"\Q$treeish\E" " master`;
440 if ($gitlog =~ /^[a-f0-9]{40}$/) {
442 Log (undef, "Using commit $commit for script_version $treeish");
446 # If that didn't work, try asking git to look it up as a tree-ish.
448 if (!defined $commit) {
449 my $found = `git --git-dir=\Q$repo\E rev-list -1 ''\Q$treeish\E`;
451 if ($found =~ /^[0-9a-f]{40}$/s) {
453 if ($commit ne $treeish) {
454 # Make sure we record the real commit id in the database,
455 # frozentokey, logs, etc. -- instead of an abbreviation or a
456 # branch name which can become ambiguous or point to a
457 # different commit in the future.
458 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
459 Log (undef, "Using commit $commit for tree-ish $treeish");
460 if ($commit ne $treeish) {
461 $Job->{'script_version'} = $commit;
463 $Job->update_attributes('script_version' => $commit) or
464 croak("Error while updating job");
470 if (defined $commit) {
471 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
472 @execargs = ("sh", "-c",
473 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
474 $git_archive = `git --git-dir=\Q$repo\E archive ''\Q$commit\E`;
477 croak ("could not figure out commit id for $treeish");
480 my $installpid = fork();
481 if ($installpid == 0)
483 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
488 last if $installpid == waitpid (-1, WNOHANG);
489 freeze_if_want_freeze ($installpid);
490 select (undef, undef, undef, 0.1);
492 Log (undef, "Install exited $?");
497 # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
498 must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
501 # If this job requires a Docker image, install that.
502 my $docker_bin = "/usr/bin/docker.io";
503 my $docker_image = $Job->{runtime_constraints}->{docker_image} || "";
505 my $docker_pid = fork();
506 if ($docker_pid == 0)
508 srun (["srun", "--nodelist=" . join(' ', @node)],
509 [$docker_bin, 'pull', $docker_image]);
514 last if $docker_pid == waitpid (-1, WNOHANG);
515 freeze_if_want_freeze ($docker_pid);
516 select (undef, undef, undef, 0.1);
518 # If the Docker image was specified as a hash, pull will fail.
519 # Ignore that error. We'll see what happens when we try to run later.
520 if (($? != 0) && ($docker_image !~ /^[0-9a-fA-F]{5,64}$/))
522 croak("Installing Docker image $docker_image returned exit code $?");
526 foreach (qw (script script_version script_parameters runtime_constraints))
530 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
532 foreach (split (/\n/, $Job->{knobs}))
534 Log (undef, "knob " . $_);
539 $main::success = undef;
545 my $thisround_succeeded = 0;
546 my $thisround_failed = 0;
547 my $thisround_failed_multiple = 0;
549 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
550 or $a <=> $b } @jobstep_todo;
551 my $level = $jobstep[$jobstep_todo[0]]->{level};
552 Log (undef, "start level $level");
557 my @freeslot = (0..$#slot);
560 my $progress_is_dirty = 1;
561 my $progress_stats_updated = 0;
563 update_progress_stats();
568 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
570 my $id = $jobstep_todo[$todo_ptr];
571 my $Jobstep = $jobstep[$id];
572 if ($Jobstep->{level} != $level)
577 pipe $reader{$id}, "writer" or croak ($!);
578 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
579 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
581 my $childslot = $freeslot[0];
582 my $childnode = $slot[$childslot]->{node};
583 my $childslotname = join (".",
584 $slot[$childslot]->{node}->{name},
585 $slot[$childslot]->{cpu});
586 my $childpid = fork();
589 $SIG{'INT'} = 'DEFAULT';
590 $SIG{'QUIT'} = 'DEFAULT';
591 $SIG{'TERM'} = 'DEFAULT';
593 foreach (values (%reader))
597 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
598 open(STDOUT,">&writer");
599 open(STDERR,">&writer");
604 delete $ENV{"GNUPGHOME"};
605 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
606 $ENV{"TASK_QSEQUENCE"} = $id;
607 $ENV{"TASK_SEQUENCE"} = $level;
608 $ENV{"JOB_SCRIPT"} = $Job->{script};
609 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
610 $param =~ tr/a-z/A-Z/;
611 $ENV{"JOB_PARAMETER_$param"} = $value;
613 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
614 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
615 $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
616 $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
617 $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
618 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
619 $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
625 "--nodelist=".$childnode->{name},
626 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
627 "--job-name=$job_id.$id.$$",
629 my $build_script_to_send = "";
631 "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
632 ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
633 ."&& cd $ENV{CRUNCH_TMP} ";
636 $build_script_to_send = $build_script;
640 $command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
643 $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
644 $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
645 # Dynamically configure the container to use the host system as its
646 # DNS server. Get the host's global addresses from the ip command,
647 # and turn them into docker --dns options using gawk.
649 q{$(ip -o address show scope global |
650 gawk 'match($4, /^([0-9\.:]+)\//, x){print "--dns", x[1]}') };
651 foreach my $env_key (qw(CRUNCH_SRC CRUNCH_TMP TASK_KEEPMOUNT))
653 $command .= "-v \Q$ENV{$env_key}:$ENV{$env_key}:rw\E ";
655 while (my ($env_key, $env_val) = each %ENV)
657 if ($env_key =~ /^(JOB|TASK)_/) {
658 $command .= "-e \Q$env_key=$env_val\E ";
661 $command .= "\Q$docker_image\E ";
663 $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
665 $command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
666 my @execargs = ('bash', '-c', $command);
667 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
671 if (!defined $childpid)
678 $proc{$childpid} = { jobstep => $id,
681 jobstepname => "$job_id.$id.$childpid",
683 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
684 $slot[$childslot]->{pid} = $childpid;
686 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
687 Log ($id, "child $childpid started on $childslotname");
688 $Jobstep->{starttime} = time;
689 $Jobstep->{node} = $childnode->{name};
690 $Jobstep->{slotindex} = $childslot;
691 delete $Jobstep->{stderr};
692 delete $Jobstep->{finishtime};
694 splice @jobstep_todo, $todo_ptr, 1;
697 $progress_is_dirty = 1;
701 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
703 last THISROUND if $main::please_freeze;
704 if ($main::please_info)
706 $main::please_info = 0;
710 update_progress_stats();
717 check_refresh_wanted();
719 update_progress_stats();
720 select (undef, undef, undef, 0.1);
722 elsif (time - $progress_stats_updated >= 30)
724 update_progress_stats();
726 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
727 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
729 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
730 .($thisround_failed+$thisround_succeeded)
731 .") -- giving up on this round";
732 Log (undef, $message);
736 # move slots from freeslot to holdslot (or back to freeslot) if necessary
737 for (my $i=$#freeslot; $i>=0; $i--) {
738 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
739 push @holdslot, (splice @freeslot, $i, 1);
742 for (my $i=$#holdslot; $i>=0; $i--) {
743 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
744 push @freeslot, (splice @holdslot, $i, 1);
748 # give up if no nodes are succeeding
749 if (!grep { $_->{node}->{losing_streak} == 0 &&
750 $_->{node}->{hold_count} < 4 } @slot) {
751 my $message = "Every node has failed -- giving up on this round";
752 Log (undef, $message);
759 push @freeslot, splice @holdslot;
760 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
763 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
766 if ($main::please_continue) {
767 $main::please_continue = 0;
770 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
774 check_refresh_wanted();
776 update_progress_stats();
777 select (undef, undef, undef, 0.1);
778 killem (keys %proc) if $main::please_freeze;
782 update_progress_stats();
783 freeze_if_want_freeze();
786 if (!defined $main::success)
789 $thisround_succeeded == 0 &&
790 ($thisround_failed == 0 || $thisround_failed > 4))
792 my $message = "stop because $thisround_failed tasks failed and none succeeded";
793 Log (undef, $message);
802 goto ONELEVEL if !defined $main::success;
805 release_allocation();
808 $Job->update_attributes('output' => &collate_output(),
810 'success' => $Job->{'output'} && $main::success,
811 'finished_at' => scalar gmtime)
814 if ($Job->{'output'})
817 my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
818 $arv->{'collections'}->{'create'}->execute('collection' => {
819 'uuid' => $Job->{'output'},
820 'manifest_text' => $manifest_text,
822 if ($Job->{'output_is_persistent'}) {
823 $arv->{'links'}->{'create'}->execute('link' => {
824 'tail_kind' => 'arvados#user',
825 'tail_uuid' => $User->{'uuid'},
826 'head_kind' => 'arvados#collection',
827 'head_uuid' => $Job->{'output'},
828 'link_class' => 'resources',
834 Log (undef, "Failed to register output manifest: $@");
838 Log (undef, "finish");
845 sub update_progress_stats
847 $progress_stats_updated = time;
848 return if !$progress_is_dirty;
849 my ($todo, $done, $running) = (scalar @jobstep_todo,
850 scalar @jobstep_done,
851 scalar @slot - scalar @freeslot - scalar @holdslot);
852 $Job->{'tasks_summary'} ||= {};
853 $Job->{'tasks_summary'}->{'todo'} = $todo;
854 $Job->{'tasks_summary'}->{'done'} = $done;
855 $Job->{'tasks_summary'}->{'running'} = $running;
857 $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
859 Log (undef, "status: $done done, $running running, $todo todo");
860 $progress_is_dirty = 0;
867 my $pid = waitpid (-1, WNOHANG);
868 return 0 if $pid <= 0;
870 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
872 . $slot[$proc{$pid}->{slot}]->{cpu});
873 my $jobstepid = $proc{$pid}->{jobstep};
874 my $elapsed = time - $proc{$pid}->{time};
875 my $Jobstep = $jobstep[$jobstepid];
877 my $childstatus = $?;
878 my $exitvalue = $childstatus >> 8;
879 my $exitinfo = sprintf("exit %d signal %d%s",
882 ($childstatus & 128 ? ' core dump' : ''));
883 $Jobstep->{'arvados_task'}->reload;
884 my $task_success = $Jobstep->{'arvados_task'}->{success};
886 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
888 if (!defined $task_success) {
889 # task did not indicate one way or the other --> fail
890 $Jobstep->{'arvados_task'}->{success} = 0;
891 $Jobstep->{'arvados_task'}->save;
898 $temporary_fail ||= $Jobstep->{node_fail};
899 $temporary_fail ||= ($exitvalue == 111);
902 ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
904 # Check for signs of a failed or misconfigured node
905 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
906 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
907 # Don't count this against jobstep failure thresholds if this
908 # node is already suspected faulty and srun exited quickly
909 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
911 Log ($jobstepid, "blaming failure on suspect node " .
912 $slot[$proc{$pid}->{slot}]->{node}->{name});
913 $temporary_fail ||= 1;
915 ban_node_by_slot($proc{$pid}->{slot});
918 Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
919 ++$Jobstep->{'failures'},
920 $temporary_fail ? 'temporary ' : 'permanent',
923 if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
924 # Give up on this task, and the whole job
926 $main::please_freeze = 1;
929 # Put this task back on the todo queue
930 push @jobstep_todo, $jobstepid;
932 $Job->{'tasks_summary'}->{'failed'}++;
936 ++$thisround_succeeded;
937 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
938 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
939 push @jobstep_done, $jobstepid;
940 Log ($jobstepid, "success in $elapsed seconds");
942 $Jobstep->{exitcode} = $childstatus;
943 $Jobstep->{finishtime} = time;
944 process_stderr ($jobstepid, $task_success);
945 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
947 close $reader{$jobstepid};
948 delete $reader{$jobstepid};
949 delete $slot[$proc{$pid}->{slot}]->{pid};
950 push @freeslot, $proc{$pid}->{slot};
954 my $newtask_list = [];
957 $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
959 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
961 'order' => 'qsequence',
962 'offset' => scalar(@$newtask_list),
964 push(@$newtask_list, @{$newtask_results->{items}});
965 } while (@{$newtask_results->{items}});
966 foreach my $arvados_task (@$newtask_list) {
968 'level' => $arvados_task->{'sequence'},
970 'arvados_task' => $arvados_task
972 push @jobstep, $jobstep;
973 push @jobstep_todo, $#jobstep;
976 $progress_is_dirty = 1;
980 sub check_refresh_wanted
982 my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
983 if (@stat && $stat[9] > $latest_refresh) {
984 $latest_refresh = scalar time;
986 my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
987 for my $attr ('cancelled_at',
988 'cancelled_by_user_uuid',
989 'cancelled_by_client_uuid') {
990 $Job->{$attr} = $Job2->{$attr};
992 if ($Job->{'cancelled_at'}) {
993 Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
994 " by user " . $Job->{cancelled_by_user_uuid});
996 $main::please_freeze = 1;
1004 # return if the kill list was checked <4 seconds ago
1005 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
1009 $squeue_kill_checked = time;
1011 # use killem() on procs whose killtime is reached
1014 if (exists $proc{$_}->{killtime}
1015 && $proc{$_}->{killtime} <= time)
1021 # return if the squeue was checked <60 seconds ago
1022 if (defined $squeue_checked && $squeue_checked > time - 60)
1026 $squeue_checked = time;
1030 # here is an opportunity to check for mysterious problems with local procs
1034 # get a list of steps still running
1035 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
1037 if ($squeue[-1] ne "ok")
1043 # which of my jobsteps are running, according to squeue?
1047 if (/^(\d+)\.(\d+) (\S+)/)
1049 if ($1 eq $ENV{SLURM_JOBID})
1056 # which of my active child procs (>60s old) were not mentioned by squeue?
1057 foreach (keys %proc)
1059 if ($proc{$_}->{time} < time - 60
1060 && !exists $ok{$proc{$_}->{jobstepname}}
1061 && !exists $proc{$_}->{killtime})
1063 # kill this proc if it hasn't exited in 30 seconds
1064 $proc{$_}->{killtime} = time + 30;
1070 sub release_allocation
1074 Log (undef, "release job allocation");
1075 system "scancel $ENV{SLURM_JOBID}";
1083 foreach my $job (keys %reader)
1086 while (0 < sysread ($reader{$job}, $buf, 8192))
1088 print STDERR $buf if $ENV{CRUNCH_DEBUG};
1089 $jobstep[$job]->{stderr} .= $buf;
1090 preprocess_stderr ($job);
1091 if (length ($jobstep[$job]->{stderr}) > 16384)
1093 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1102 sub preprocess_stderr
1106 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1108 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1109 Log ($job, "stderr $line");
1110 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1112 $main::please_freeze = 1;
1114 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1115 $jobstep[$job]->{node_fail} = 1;
1116 ban_node_by_slot($jobstep[$job]->{slotindex});
1125 my $task_success = shift;
1126 preprocess_stderr ($job);
1129 Log ($job, "stderr $_");
1130 } split ("\n", $jobstep[$job]->{stderr});
1136 my ($keep, $child_out, $output_block);
1138 my $cmd = "$arv_cli keep get \Q$hash\E";
1139 open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1140 sysread($keep, $output_block, 64 * 1024 * 1024);
1142 return $output_block;
1147 Log (undef, "collate");
1149 my ($child_out, $child_in);
1150 my $pid = open2($child_out, $child_in, $arv_cli, 'keep', 'put', '--raw');
1154 next if (!exists $_->{'arvados_task'}->{output} ||
1155 !$_->{'arvados_task'}->{'success'} ||
1156 $_->{'exitcode'} != 0);
1157 my $output = $_->{'arvados_task'}->{output};
1158 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1160 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1161 print $child_in $output;
1163 elsif (@jobstep == 1)
1165 $joboutput = $output;
1168 elsif (defined (my $outblock = fetch_block ($output)))
1170 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1171 print $child_in $outblock;
1175 Log (undef, "XXX fetch_block($output) failed XXX");
1181 if (!defined $joboutput) {
1182 my $s = IO::Select->new($child_out);
1183 if ($s->can_read(120)) {
1184 sysread($child_out, $joboutput, 64 * 1024 * 1024);
1187 Log (undef, "timed out reading from 'arv keep put'");
1194 Log (undef, "output $joboutput");
1195 $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1199 Log (undef, "output undef");
1209 my $sig = 2; # SIGINT first
1210 if (exists $proc{$_}->{"sent_$sig"} &&
1211 time - $proc{$_}->{"sent_$sig"} > 4)
1213 $sig = 15; # SIGTERM if SIGINT doesn't work
1215 if (exists $proc{$_}->{"sent_$sig"} &&
1216 time - $proc{$_}->{"sent_$sig"} > 4)
1218 $sig = 9; # SIGKILL if SIGTERM doesn't work
1220 if (!exists $proc{$_}->{"sent_$sig"})
1222 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1224 select (undef, undef, undef, 0.1);
1227 kill $sig, $_; # srun wants two SIGINT to really interrupt
1229 $proc{$_}->{"sent_$sig"} = time;
1230 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1240 vec($bits,fileno($_),1) = 1;
1246 sub Log # ($jobstep_id, $logmessage)
1248 if ($_[1] =~ /\n/) {
1249 for my $line (split (/\n/, $_[1])) {
1254 my $fh = select STDERR; $|=1; select $fh;
1255 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1256 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1259 if ($local_logfile || -t STDERR) {
1260 my @gmtime = gmtime;
1261 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1262 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1264 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1266 if ($local_logfile) {
1267 print $local_logfile $datetime . " " . $message;
1274 my ($package, $file, $line) = caller;
1275 my $message = "@_ at $file line $line\n";
1276 Log (undef, $message);
1277 freeze() if @jobstep_todo;
1278 collate_output() if @jobstep_todo;
1280 save_meta() if $local_logfile;
1287 return if !$job_has_uuid;
1288 $Job->update_attributes('running' => 0,
1290 'finished_at' => scalar gmtime);
1296 my $justcheckpoint = shift; # false if this will be the last meta saved
1297 return if $justcheckpoint; # checkpointing is not relevant post-Warehouse.pm
1299 $local_logfile->flush;
1300 my $cmd = "$arv_cli keep put --filename ''\Q$keep_logfile\E "
1301 . quotemeta($local_logfile->filename);
1302 my $loglocator = `$cmd`;
1303 die "system $cmd failed: $?" if $?;
1306 $local_logfile = undef; # the temp file is automatically deleted
1307 Log (undef, "log manifest is $loglocator");
1308 $Job->{'log'} = $loglocator;
1309 $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1313 sub freeze_if_want_freeze
1315 if ($main::please_freeze)
1317 release_allocation();
1320 # kill some srun procs before freeze+stop
1321 map { $proc{$_} = {} } @_;
1324 killem (keys %proc);
1325 select (undef, undef, undef, 0.1);
1327 while (($died = waitpid (-1, WNOHANG)) > 0)
1329 delete $proc{$died};
1344 Log (undef, "Freeze not implemented");
1351 croak ("Thaw not implemented");
1367 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1374 my $srunargs = shift;
1375 my $execargs = shift;
1376 my $opts = shift || {};
1378 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1379 print STDERR (join (" ",
1380 map { / / ? "'$_'" : $_ }
1383 if $ENV{CRUNCH_DEBUG};
1385 if (defined $stdin) {
1386 my $child = open STDIN, "-|";
1387 defined $child or die "no fork: $!";
1389 print $stdin or die $!;
1390 close STDOUT or die $!;
1395 return system (@$args) if $opts->{fork};
1398 warn "ENV size is ".length(join(" ",%ENV));
1399 die "exec failed: $!: @$args";
1403 sub ban_node_by_slot {
1404 # Don't start any new jobsteps on this node for 60 seconds
1406 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1407 $slot[$slotid]->{node}->{hold_count}++;
1408 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1413 my ($lockfile, $error_message) = @_;
1414 open L, ">", $lockfile or croak("$lockfile: $!");
1415 if (!flock L, LOCK_EX|LOCK_NB) {
1416 croak("Can't lock $lockfile: $error_message\n");
1423 # checkout-and-build
1427 my $destdir = $ENV{"CRUNCH_SRC"};
1428 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1429 my $repo = $ENV{"CRUNCH_SRC_URL"};
1431 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1433 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1437 unlink "$destdir.commit";
1438 open STDOUT, ">", "$destdir.log";
1439 open STDERR, ">&STDOUT";
1442 my @git_archive_data = <DATA>;
1443 if (@git_archive_data) {
1444 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1445 print TARX @git_archive_data;
1447 die "'tar -C $destdir -xf -' exited $?: $!";
1452 chomp ($pwd = `pwd`);
1453 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1456 for my $src_path ("$destdir/arvados/sdk/python") {
1458 shell_or_die ("virtualenv", $install_dir);
1459 shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1463 if (-e "$destdir/crunch_scripts/install") {
1464 shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1465 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1467 shell_or_die ("./tests/autotests.sh", $install_dir);
1468 } elsif (-e "./install.sh") {
1469 shell_or_die ("./install.sh", $install_dir);
1473 unlink "$destdir.commit.new";
1474 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1475 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1484 if ($ENV{"DEBUG"}) {
1485 print STDERR "@_\n";
1488 or die "@_ failed: $! exit 0x".sprintf("%x",$?);