2 # -*- mode: perl; perl-indent-level: 2; -*-
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
13 crunch-job --job x-y-z
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
18 crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
26 If the job is already locked, steal the lock and run it anyway.
30 Path to .git directory where the specified commit is found.
34 Arvados API authorization token to use during the course of the job.
38 =head1 RUNNING JOBS LOCALLY
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
44 If the job succeeds, the job's output locator is printed on stdout.
46 While the job is running, the following signals are accepted:
50 =item control-C, SIGINT, SIGQUIT
52 Save a checkpoint, terminate any job tasks that are running, and stop.
56 Save a checkpoint and continue.
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
87 GetOptions('force-unlock' => \$force_unlock,
88 'git-dir=s' => \$git_dir,
90 'job-api-token=s' => \$job_api_token,
91 'resume-stash=s' => \$resume_stash,
94 if (defined $job_api_token) {
95 $ENV{ARVADOS_API_TOKEN} = $job_api_token;
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100 my $local_job = !$job_has_uuid;
109 $main::ENV{CRUNCH_DEBUG} = 1;
113 $main::ENV{CRUNCH_DEBUG} = 0;
118 my $arv = Arvados->new;
119 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
121 $metastream->write_start('log.txt');
123 my $User = $arv->{'users'}->{'current'}->execute;
131 $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
132 if (!$force_unlock) {
133 if ($Job->{'is_locked_by'}) {
134 croak("Job is locked: " . $Job->{'is_locked_by'});
136 if ($Job->{'success'} ne undef) {
137 croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
139 if ($Job->{'running'}) {
140 croak("Job 'running' flag is already set");
142 if ($Job->{'started_at'}) {
143 croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
149 $Job = JSON::decode_json($jobspec);
153 map { croak ("No $_ specified") unless $Job->{$_} }
154 qw(script script_version script_parameters);
157 $Job->{'is_locked_by'} = $User->{'uuid'};
158 $Job->{'started_at'} = gmtime;
160 $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164 $job_id = $Job->{'uuid'};
168 $Job->{'resource_limits'} ||= {};
169 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
170 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
173 Log (undef, "check slurm allocation");
176 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
180 my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
181 push @sinfo, "$localcpus localhost";
183 if (exists $ENV{SLURM_NODELIST})
185 push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
189 my ($ncpus, $slurm_nodelist) = split;
190 $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
193 while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
196 if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
199 foreach (split (",", $ranges))
212 push @nodelist, map {
214 $n =~ s/\[[-,\d]+\]/$_/;
221 push @nodelist, $nodelist;
224 foreach my $nodename (@nodelist)
226 Log (undef, "node $nodename - $ncpus slots");
227 my $node = { name => $nodename,
231 foreach my $cpu (1..$ncpus)
233 push @slot, { node => $node,
237 push @node, @nodelist;
242 # Ensure that we get one jobstep running on each allocated node before
243 # we start overloading nodes with concurrent steps
245 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
252 # Claim this job, and make sure nobody else does
254 $Job->{'is_locked_by'} = $User->{'uuid'};
255 $Job->{'started_at'} = gmtime;
256 $Job->{'running'} = 1;
257 $Job->{'success'} = undef;
258 $Job->{'tasks_summary'} = { 'failed' => 0,
263 unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
264 croak("Error while updating / locking job");
270 Log (undef, "start");
271 $SIG{'INT'} = sub { $main::please_freeze = 1; };
272 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
273 $SIG{'TERM'} = \&croak;
274 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
275 $SIG{'ALRM'} = sub { $main::please_info = 1; };
276 $SIG{'CONT'} = sub { $main::please_continue = 1; };
277 $main::please_freeze = 0;
278 $main::please_info = 0;
279 $main::please_continue = 0;
280 my $jobsteps_must_output_keys = 0; # becomes 1 when any task outputs a key
282 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
283 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
284 $ENV{"JOB_UUID"} = $job_id;
288 my @jobstep_todo = ();
289 my @jobstep_done = ();
290 my @jobstep_tomerge = ();
291 my $jobstep_tomerge_level = 0;
293 my $squeue_kill_checked;
294 my $output_in_keep = 0;
298 if (defined $Job->{thawedfromkey})
300 thaw ($Job->{thawedfromkey});
304 my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
305 'job_uuid' => $Job->{'uuid'},
310 push @jobstep, { 'level' => 0,
312 'arvados_task' => $first_task,
314 push @jobstep_todo, 0;
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
323 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
326 $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 $build_script = <DATA>;
334 Log (undef, "Install revision ".$Job->{script_version});
335 my $nodelist = join(",", @node);
337 # Clean out crunch_tmp/work and crunch_tmp/opt
339 my $cleanpid = fork();
342 srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
343 ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
348 last if $cleanpid == waitpid (-1, WNOHANG);
349 freeze_if_want_freeze ($cleanpid);
350 select (undef, undef, undef, 0.1);
352 Log (undef, "Clean-work-dir exited $?");
354 # Install requested code version
357 my @srunargs = ("srun",
358 "--nodelist=$nodelist",
359 "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
361 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362 $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
363 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
367 my $treeish = $Job->{'script_version'};
368 my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
369 # Todo: let script_version specify repository instead of expecting
370 # parent process to figure it out.
371 $ENV{"CRUNCH_SRC_URL"} = $repo;
373 # Create/update our clone of the remote git repo
375 if (!-d $ENV{"CRUNCH_SRC"}) {
376 system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
377 or croak ("git clone $repo failed: exit ".($?>>8));
378 system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
380 `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
382 # If this looks like a subversion r#, look for it in git-svn commit messages
384 if ($treeish =~ m{^\d{1,4}$}) {
385 my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
387 if ($gitlog =~ /^[a-f0-9]{40}$/) {
389 Log (undef, "Using commit $commit for script_version $treeish");
393 # If that didn't work, try asking git to look it up as a tree-ish.
395 if (!defined $commit) {
397 my $cooked_treeish = $treeish;
398 if ($treeish !~ m{^[0-9a-f]{5,}$}) {
399 # Looks like a git branch name -- make sure git knows it's
400 # relative to the remote repo
401 $cooked_treeish = "origin/$treeish";
404 my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
406 if ($found =~ /^[0-9a-f]{40}$/s) {
408 if ($commit ne $treeish) {
409 # Make sure we record the real commit id in the database,
410 # frozentokey, logs, etc. -- instead of an abbreviation or a
411 # branch name which can become ambiguous or point to a
412 # different commit in the future.
413 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
414 Log (undef, "Using commit $commit for tree-ish $treeish");
415 if ($commit ne $treeish) {
416 $Job->{'script_version'} = $commit;
417 !$job_has_uuid or $Job->save() or croak("Error while updating job");
423 if (defined $commit) {
424 $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
425 @execargs = ("sh", "-c",
426 "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
427 $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
430 croak ("could not figure out commit id for $treeish");
433 my $installpid = fork();
434 if ($installpid == 0)
436 srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
441 last if $installpid == waitpid (-1, WNOHANG);
442 freeze_if_want_freeze ($installpid);
443 select (undef, undef, undef, 0.1);
445 Log (undef, "Install exited $?");
450 foreach (qw (script script_version script_parameters resource_limits))
454 (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
456 foreach (split (/\n/, $Job->{knobs}))
458 Log (undef, "knob " . $_);
469 my $thisround_succeeded = 0;
470 my $thisround_failed = 0;
471 my $thisround_failed_multiple = 0;
473 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
474 or $a <=> $b } @jobstep_todo;
475 my $level = $jobstep[$jobstep_todo[0]]->{level};
476 Log (undef, "start level $level");
481 my @freeslot = (0..$#slot);
484 my $progress_is_dirty = 1;
485 my $progress_stats_updated = 0;
487 update_progress_stats();
492 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
494 $main::please_continue = 0;
496 my $id = $jobstep_todo[$todo_ptr];
497 my $Jobstep = $jobstep[$id];
498 if ($Jobstep->{level} != $level)
502 if ($Jobstep->{attempts} > 9)
504 Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
509 pipe $reader{$id}, "writer" or croak ($!);
510 my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
511 fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
513 my $childslot = $freeslot[0];
514 my $childnode = $slot[$childslot]->{node};
515 my $childslotname = join (".",
516 $slot[$childslot]->{node}->{name},
517 $slot[$childslot]->{cpu});
518 my $childpid = fork();
521 $SIG{'INT'} = 'DEFAULT';
522 $SIG{'QUIT'} = 'DEFAULT';
523 $SIG{'TERM'} = 'DEFAULT';
525 foreach (values (%reader))
529 fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
530 open(STDOUT,">&writer");
531 open(STDERR,">&writer");
536 delete $ENV{"GNUPGHOME"};
537 $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
538 $ENV{"TASK_QSEQUENCE"} = $id;
539 $ENV{"TASK_SEQUENCE"} = $level;
540 $ENV{"JOB_SCRIPT"} = $Job->{script};
541 while (my ($param, $value) = each %{$Job->{script_parameters}}) {
542 $param =~ tr/a-z/A-Z/;
543 $ENV{"JOB_PARAMETER_$param"} = $value;
545 $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
546 $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
547 $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
548 $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
554 "--nodelist=".$childnode->{name},
555 qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556 "--job-name=$job_id.$id.$$",
558 my @execargs = qw(sh);
559 my $build_script_to_send = "";
561 "mkdir -p $ENV{CRUNCH_WORK} $ENV{CRUNCH_TMP} "
562 ."&& cd $ENV{CRUNCH_TMP} ";
565 $build_script_to_send = $build_script;
569 $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
571 "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
572 my @execargs = ('bash', '-c', $command);
573 srun (\@srunargs, \@execargs, undef, $build_script_to_send);
577 if (!defined $childpid)
584 $proc{$childpid} = { jobstep => $id,
587 jobstepname => "$job_id.$id.$childpid",
589 croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
590 $slot[$childslot]->{pid} = $childpid;
592 Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
593 Log ($id, "child $childpid started on $childslotname");
594 $Jobstep->{attempts} ++;
595 $Jobstep->{starttime} = time;
596 $Jobstep->{node} = $childnode->{name};
597 $Jobstep->{slotindex} = $childslot;
598 delete $Jobstep->{stderr};
599 delete $Jobstep->{finishtime};
601 splice @jobstep_todo, $todo_ptr, 1;
604 $progress_is_dirty = 1;
608 (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
610 last THISROUND if $main::please_freeze;
611 if ($main::please_info)
613 $main::please_info = 0;
617 update_progress_stats();
625 update_progress_stats();
626 select (undef, undef, undef, 0.1);
628 elsif (time - $progress_stats_updated >= 30)
630 update_progress_stats();
632 if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
633 ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
635 my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
636 .($thisround_failed+$thisround_succeeded)
637 .") -- giving up on this round";
638 Log (undef, $message);
642 # move slots from freeslot to holdslot (or back to freeslot) if necessary
643 for (my $i=$#freeslot; $i>=0; $i--) {
644 if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
645 push @holdslot, (splice @freeslot, $i, 1);
648 for (my $i=$#holdslot; $i>=0; $i--) {
649 if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
650 push @freeslot, (splice @holdslot, $i, 1);
654 # give up if no nodes are succeeding
655 if (!grep { $_->{node}->{losing_streak} == 0 &&
656 $_->{node}->{hold_count} < 4 } @slot) {
657 my $message = "Every node has failed -- giving up on this round";
658 Log (undef, $message);
665 push @freeslot, splice @holdslot;
666 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
669 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
672 goto THISROUND if $main::please_continue;
673 $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
678 update_progress_stats();
679 select (undef, undef, undef, 0.1);
680 killem (keys %proc) if $main::please_freeze;
684 update_progress_stats();
685 freeze_if_want_freeze();
688 if (!defined $success)
691 $thisround_succeeded == 0 &&
692 ($thisround_failed == 0 || $thisround_failed > 4))
694 my $message = "stop because $thisround_failed tasks failed and none succeeded";
695 Log (undef, $message);
704 goto ONELEVEL if !defined $success;
707 release_allocation();
709 $Job->{'output'} = &collate_output();
710 $Job->{'success'} = $Job->{'output'} && $success;
711 $Job->save if $job_has_uuid;
713 if ($Job->{'output'})
716 my $manifest_text = capturex("whget", $Job->{'output'});
717 $arv->{'collections'}->{'create'}->execute('collection' => {
718 'uuid' => $Job->{'output'},
719 'manifest_text' => $manifest_text,
723 Log (undef, "Failed to register output manifest: $@");
727 Log (undef, "finish");
734 sub update_progress_stats
736 $progress_stats_updated = time;
737 return if !$progress_is_dirty;
738 my ($todo, $done, $running) = (scalar @jobstep_todo,
739 scalar @jobstep_done,
740 scalar @slot - scalar @freeslot - scalar @holdslot);
741 $Job->{'tasks_summary'} ||= {};
742 $Job->{'tasks_summary'}->{'todo'} = $todo;
743 $Job->{'tasks_summary'}->{'done'} = $done;
744 $Job->{'tasks_summary'}->{'running'} = $running;
745 $Job->save if $job_has_uuid;
746 Log (undef, "status: $done done, $running running, $todo todo");
747 $progress_is_dirty = 0;
754 my $pid = waitpid (-1, WNOHANG);
755 return 0 if $pid <= 0;
757 my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
759 . $slot[$proc{$pid}->{slot}]->{cpu});
760 my $jobstepid = $proc{$pid}->{jobstep};
761 my $elapsed = time - $proc{$pid}->{time};
762 my $Jobstep = $jobstep[$jobstepid];
765 my $exitinfo = "exit $exitcode";
766 $Jobstep->{'arvados_task'}->reload;
767 my $success = $Jobstep->{'arvados_task'}->{success};
769 Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
771 if (!defined $success) {
772 # task did not indicate one way or the other --> fail
773 $Jobstep->{'arvados_task'}->{success} = 0;
774 $Jobstep->{'arvados_task'}->save;
780 my $no_incr_attempts;
781 $no_incr_attempts = 1 if $Jobstep->{node_fail};
784 ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
786 # Check for signs of a failed or misconfigured node
787 if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
788 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
789 # Don't count this against jobstep failure thresholds if this
790 # node is already suspected faulty and srun exited quickly
791 if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
793 $Jobstep->{attempts} > 1) {
794 Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
795 $no_incr_attempts = 1;
796 --$Jobstep->{attempts};
798 ban_node_by_slot($proc{$pid}->{slot});
801 push @jobstep_todo, $jobstepid;
802 Log ($jobstepid, "failure in $elapsed seconds");
804 --$Jobstep->{attempts} if $no_incr_attempts;
805 $Job->{'tasks_summary'}->{'failed'}++;
809 ++$thisround_succeeded;
810 $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
811 $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
812 push @jobstep_done, $jobstepid;
813 Log ($jobstepid, "success in $elapsed seconds");
815 $Jobstep->{exitcode} = $exitcode;
816 $Jobstep->{finishtime} = time;
817 process_stderr ($jobstepid, $success);
818 Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
820 close $reader{$jobstepid};
821 delete $reader{$jobstepid};
822 delete $slot[$proc{$pid}->{slot}]->{pid};
823 push @freeslot, $proc{$pid}->{slot};
827 my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
829 'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
831 'order' => 'qsequence'
833 foreach my $arvados_task (@{$newtask_list->{'items'}}) {
835 'level' => $arvados_task->{'sequence'},
837 'arvados_task' => $arvados_task
839 push @jobstep, $jobstep;
840 push @jobstep_todo, $#jobstep;
843 $progress_is_dirty = 1;
850 # return if the kill list was checked <4 seconds ago
851 if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
855 $squeue_kill_checked = time;
857 # use killem() on procs whose killtime is reached
860 if (exists $proc{$_}->{killtime}
861 && $proc{$_}->{killtime} <= time)
867 # return if the squeue was checked <60 seconds ago
868 if (defined $squeue_checked && $squeue_checked > time - 60)
872 $squeue_checked = time;
876 # here is an opportunity to check for mysterious problems with local procs
880 # get a list of steps still running
881 my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
883 if ($squeue[-1] ne "ok")
889 # which of my jobsteps are running, according to squeue?
893 if (/^(\d+)\.(\d+) (\S+)/)
895 if ($1 eq $ENV{SLURM_JOBID})
902 # which of my active child procs (>60s old) were not mentioned by squeue?
905 if ($proc{$_}->{time} < time - 60
906 && !exists $ok{$proc{$_}->{jobstepname}}
907 && !exists $proc{$_}->{killtime})
909 # kill this proc if it hasn't exited in 30 seconds
910 $proc{$_}->{killtime} = time + 30;
916 sub release_allocation
920 Log (undef, "release job allocation");
921 system "scancel $ENV{SLURM_JOBID}";
929 foreach my $job (keys %reader)
932 while (0 < sysread ($reader{$job}, $buf, 8192))
934 print STDERR $buf if $ENV{CRUNCH_DEBUG};
935 $jobstep[$job]->{stderr} .= $buf;
936 preprocess_stderr ($job);
937 if (length ($jobstep[$job]->{stderr}) > 16384)
939 substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
948 sub preprocess_stderr
952 while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
954 substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
955 Log ($job, "stderr $line");
956 if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
958 $main::please_freeze = 1;
960 elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
961 $jobstep[$job]->{node_fail} = 1;
962 ban_node_by_slot($jobstep[$job]->{slotindex});
972 preprocess_stderr ($job);
975 Log ($job, "stderr $_");
976 } split ("\n", $jobstep[$job]->{stderr});
982 my $whc = Warehouse->new;
983 Log (undef, "collate");
984 $whc->write_start (1);
988 next if (!exists $_->{'arvados_task'}->{output} ||
989 !$_->{'arvados_task'}->{'success'} ||
990 $_->{'exitcode'} != 0);
991 my $output = $_->{'arvados_task'}->{output};
992 if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
994 $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
995 $whc->write_data ($output);
997 elsif (@jobstep == 1)
999 $joboutput = $output;
1002 elsif (defined (my $outblock = $whc->fetch_block ($output)))
1004 $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1005 $whc->write_data ($outblock);
1009 my $errstr = $whc->errstr;
1010 $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1014 $joboutput = $whc->write_finish if !defined $joboutput;
1017 Log (undef, "output $joboutput");
1018 $Job->{'output'} = $joboutput;
1019 $Job->save if $job_has_uuid;
1023 Log (undef, "output undef");
1033 my $sig = 2; # SIGINT first
1034 if (exists $proc{$_}->{"sent_$sig"} &&
1035 time - $proc{$_}->{"sent_$sig"} > 4)
1037 $sig = 15; # SIGTERM if SIGINT doesn't work
1039 if (exists $proc{$_}->{"sent_$sig"} &&
1040 time - $proc{$_}->{"sent_$sig"} > 4)
1042 $sig = 9; # SIGKILL if SIGTERM doesn't work
1044 if (!exists $proc{$_}->{"sent_$sig"})
1046 Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1048 select (undef, undef, undef, 0.1);
1051 kill $sig, $_; # srun wants two SIGINT to really interrupt
1053 $proc{$_}->{"sent_$sig"} = time;
1054 $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1064 vec($bits,fileno($_),1) = 1;
1070 sub Log # ($jobstep_id, $logmessage)
1072 if ($_[1] =~ /\n/) {
1073 for my $line (split (/\n/, $_[1])) {
1078 my $fh = select STDERR; $|=1; select $fh;
1079 my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1080 $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1083 if ($metastream || -t STDERR) {
1084 my @gmtime = gmtime;
1085 $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1086 $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1088 print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1090 return if !$metastream;
1091 $metastream->write_data ($datetime . " " . $message);
1097 my ($package, $file, $line) = caller;
1098 my $message = "@_ at $file line $line\n";
1099 Log (undef, $message);
1100 freeze() if @jobstep_todo;
1101 collate_output() if @jobstep_todo;
1103 save_meta() if $metastream;
1110 return if !$job_has_uuid;
1111 $Job->reload if $job_has_uuid;
1112 $Job->{'running'} = 0;
1113 $Job->{'success'} = 0;
1114 $Job->{'finished_at'} = gmtime;
1115 $Job->save if $job_has_uuid;
1121 my $justcheckpoint = shift; # false if this will be the last meta saved
1122 my $m = $metastream;
1123 $m = $m->copy if $justcheckpoint;
1125 my $loglocator = $m->as_key;
1126 undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1127 Log (undef, "meta key is $loglocator");
1128 $Job->{'log'} = $loglocator;
1129 $Job->save if $job_has_uuid;
1133 sub freeze_if_want_freeze
1135 if ($main::please_freeze)
1137 release_allocation();
1140 # kill some srun procs before freeze+stop
1141 map { $proc{$_} = {} } @_;
1144 killem (keys %proc);
1145 select (undef, undef, undef, 0.1);
1147 while (($died = waitpid (-1, WNOHANG)) > 0)
1149 delete $proc{$died};
1164 Log (undef, "Freeze not implemented");
1171 croak ("Thaw not implemented");
1175 Log (undef, "thaw from $key");
1180 @jobstep_tomerge = ();
1181 $jobstep_tomerge_level = 0;
1184 my $stream = new Warehouse::Stream ( whc => $whc,
1185 hash => [split (",", $key)] );
1187 while (my $dataref = $stream->read_until (undef, "\n\n"))
1189 if ($$dataref =~ /^job /)
1191 foreach (split ("\n", $$dataref))
1193 my ($k, $v) = split ("=", $_, 2);
1194 $frozenjob->{$k} = freezeunquote ($v);
1199 if ($$dataref =~ /^merge (\d+) (.*)/)
1201 $jobstep_tomerge_level = $1;
1203 = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1208 foreach (split ("\n", $$dataref))
1210 my ($k, $v) = split ("=", $_, 2);
1211 $Jobstep->{$k} = freezeunquote ($v) if $k;
1213 $Jobstep->{attempts} = 0;
1214 push @jobstep, $Jobstep;
1216 if ($Jobstep->{exitcode} eq "0")
1218 push @jobstep_done, $#jobstep;
1222 push @jobstep_todo, $#jobstep;
1226 foreach (qw (script script_version script_parameters))
1228 $Job->{$_} = $frozenjob->{$_};
1230 $Job->save if $job_has_uuid;
1246 $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1253 my $srunargs = shift;
1254 my $execargs = shift;
1255 my $opts = shift || {};
1257 my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1258 print STDERR (join (" ",
1259 map { / / ? "'$_'" : $_ }
1262 if $ENV{CRUNCH_DEBUG};
1264 if (defined $stdin) {
1265 my $child = open STDIN, "-|";
1266 defined $child or die "no fork: $!";
1268 print $stdin or die $!;
1269 close STDOUT or die $!;
1274 return system (@$args) if $opts->{fork};
1277 warn "ENV size is ".length(join(" ",%ENV));
1278 die "exec failed: $!: @$args";
1282 sub ban_node_by_slot {
1283 # Don't start any new jobsteps on this node for 60 seconds
1285 $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1286 $slot[$slotid]->{node}->{hold_count}++;
1287 Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1293 # checkout-and-build
1297 my $destdir = $ENV{"CRUNCH_SRC"};
1298 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1299 my $repo = $ENV{"CRUNCH_SRC_URL"};
1301 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1303 if (readlink ("$destdir.commit") eq $commit) {
1307 unlink "$destdir.commit";
1308 open STDOUT, ">", "$destdir.log";
1309 open STDERR, ">&STDOUT";
1312 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1315 die "'tar -C $destdir -xf -' exited $?: $!";
1319 chomp ($pwd = `pwd`);
1320 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1322 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1324 shell_or_die ("./tests/autotests.sh", $install_dir);
1325 } elsif (-e "./install.sh") {
1326 shell_or_die ("./install.sh", $install_dir);
1330 unlink "$destdir.commit.new";
1331 symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1332 rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1341 if ($ENV{"DEBUG"}) {
1342 print STDERR "@_\n";
1345 or die "@_ failed: $! exit 0x".sprintf("%x",$?);