tweak node failure detection
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
59
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
64
65 my $force_unlock;
66 my $jobspec;
67 my $resume_stash;
68 GetOptions('force-unlock' => \$force_unlock,
69            'job=s' => \$jobspec,
70            'resume-stash=s' => \$resume_stash,
71     );
72
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
75
76
77 $SIG{'HUP'} = sub
78 {
79   1;
80 };
81 $SIG{'USR1'} = sub
82 {
83   $main::ENV{CRUNCH_DEBUG} = 1;
84 };
85 $SIG{'USR2'} = sub
86 {
87   $main::ENV{CRUNCH_DEBUG} = 0;
88 };
89
90
91
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
94 $metastream->clear;
95 $metastream->write_start('log.txt');
96
97 my $User = {};
98 my $Job = {};
99 my $job_id;
100 my $dbh;
101 my $sth;
102 if ($job_has_uuid)
103 {
104   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105   $User = $arv->{'users'}->{'current'}->execute;
106   if (!$force_unlock) {
107     if ($Job->{'is_locked_by'}) {
108       croak("Job is locked: " . $Job->{'is_locked_by'});
109     }
110     if ($Job->{'success'} ne undef) {
111       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112     }
113     if ($Job->{'running'}) {
114       croak("Job 'running' flag is already set");
115     }
116     if ($Job->{'started_at'}) {
117       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
118     }
119   }
120 }
121 else
122 {
123   $Job = JSON::decode_json($jobspec);
124
125   if (!$resume_stash)
126   {
127     map { croak ("No $_ specified") unless $Job->{$_} }
128     qw(script script_version script_parameters);
129   }
130
131   if (!defined $Job->{'uuid'}) {
132     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
133   }
134 }
135 $job_id = $Job->{'uuid'};
136
137
138
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
142
143
144 Log (undef, "check slurm allocation");
145 my @slot;
146 my @node;
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
148 my @sinfo;
149 if (!$have_slurm)
150 {
151   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152   push @sinfo, "$localcpus localhost";
153 }
154 if (exists $ENV{SLURM_NODELIST})
155 {
156   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
157 }
158 foreach (@sinfo)
159 {
160   my ($ncpus, $slurm_nodelist) = split;
161   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
162
163   my @nodelist;
164   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
165   {
166     my $nodelist = $1;
167     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
168     {
169       my $ranges = $1;
170       foreach (split (",", $ranges))
171       {
172         my ($a, $b);
173         if (/(\d+)-(\d+)/)
174         {
175           $a = $1;
176           $b = $2;
177         }
178         else
179         {
180           $a = $_;
181           $b = $_;
182         }
183         push @nodelist, map {
184           my $n = $nodelist;
185           $n =~ s/\[[-,\d]+\]/$_/;
186           $n;
187         } ($a..$b);
188       }
189     }
190     else
191     {
192       push @nodelist, $nodelist;
193     }
194   }
195   foreach my $nodename (@nodelist)
196   {
197     Log (undef, "node $nodename - $ncpus slots");
198     my $node = { name => $nodename,
199                  ncpus => $ncpus,
200                  losing_streak => 0,
201                  hold_until => 0 };
202     foreach my $cpu (1..$ncpus)
203     {
204       push @slot, { node => $node,
205                     cpu => $cpu };
206     }
207   }
208   push @node, @nodelist;
209 }
210
211
212
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
215
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
217
218
219
220 my $jobmanager_id;
221 if ($job_has_uuid)
222 {
223   # Claim this job, and make sure nobody else does
224
225   $Job->{'is_locked_by'} = $User->{'uuid'};
226   $Job->{'started_at'} = time;
227   $Job->{'running'} = 1;
228   $Job->{'success'} = undef;
229   $Job->{'tasks_summary'} = { 'failed' => 0,
230                               'todo' => 1,
231                               'running' => 0,
232                               'done' => 0 };
233   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234     croak("Error while updating / locking job");
235   }
236 }
237
238
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
250
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
254
255
256 my @jobstep;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
261 my $squeue_checked;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
264
265
266
267 if (defined $Job->{thawedfromkey})
268 {
269   thaw ($Job->{thawedfromkey});
270 }
271 else
272 {
273   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274     'job_uuid' => $Job->{'uuid'},
275     'sequence' => 0,
276     'qsequence' => 0,
277     'parameters' => {},
278                                                           });
279   push @jobstep, { 'level' => 0,
280                    'attempts' => 0,
281                    'arvados_task' => $first_task,
282                  };
283   push @jobstep_todo, 0;
284 }
285
286
287 my $build_script;
288 do {
289   local $/ = undef;
290   $build_script = <DATA>;
291 };
292
293
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
297 if ($skip_install)
298 {
299   $ENV{"CRUNCH_SRC"} = $Job->{revision};
300 }
301 else
302 {
303   Log (undef, "Install revision ".$Job->{revision});
304   my $nodelist = join(",", @node);
305
306   # Clean out crunch_tmp/work and crunch_tmp/opt
307
308   my $cleanpid = fork();
309   if ($cleanpid == 0)
310   {
311     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
313     exit (1);
314   }
315   while (1)
316   {
317     last if $cleanpid == waitpid (-1, WNOHANG);
318     freeze_if_want_freeze ($cleanpid);
319     select (undef, undef, undef, 0.1);
320   }
321   Log (undef, "Clean-work-dir exited $?");
322
323   # Install requested code version
324
325   my @execargs;
326   my @srunargs = ("srun",
327                   "--nodelist=$nodelist",
328                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329
330   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
333
334   my $commit;
335   my $treeish = $Job->{'script_version'};
336   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337   # Todo: let script_version specify alternate repo
338   $ENV{"CRUNCH_SRC_URL"} = $repo;
339
340   # Create/update our clone of the remote git repo
341
342   if (!-d $ENV{"CRUNCH_SRC"}) {
343     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344         or croak ("git clone $repo failed: exit ".($?>>8));
345     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346   }
347   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348
349   # If this looks like a subversion r#, look for it in git-svn commit messages
350
351   if ($treeish =~ m{^\d{1,4}$}) {
352     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353     chomp $gitlog;
354     if ($gitlog =~ /^[a-f0-9]{40}$/) {
355       $commit = $gitlog;
356       Log (undef, "Using commit $commit for revision $treeish");
357     }
358   }
359
360   # If that didn't work, try asking git to look it up as a tree-ish.
361
362   if (!defined $commit) {
363
364     my $cooked_treeish = $treeish;
365     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366       # Looks like a git branch name -- make sure git knows it's
367       # relative to the remote repo
368       $cooked_treeish = "origin/$treeish";
369     }
370
371     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372     chomp $found;
373     if ($found =~ /^[0-9a-f]{40}$/s) {
374       $commit = $found;
375       if ($commit ne $treeish) {
376         # Make sure we record the real commit id in the database,
377         # frozentokey, logs, etc. -- instead of an abbreviation or a
378         # branch name which can become ambiguous or point to a
379         # different commit in the future.
380         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381         Log (undef, "Using commit $commit for tree-ish $treeish");
382         if ($commit ne $treeish) {
383           $Job->{'script_version'} = $commit;
384           $Job->save() or croak("Error while updating job");
385         }
386       }
387     }
388   }
389
390   if (defined $commit) {
391     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392     @execargs = ("sh", "-c",
393                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
394   }
395   else {
396     croak ("could not figure out commit id for $treeish");
397   }
398
399   my $installpid = fork();
400   if ($installpid == 0)
401   {
402     srun (\@srunargs, \@execargs, {}, $build_script);
403     exit (1);
404   }
405   while (1)
406   {
407     last if $installpid == waitpid (-1, WNOHANG);
408     freeze_if_want_freeze ($installpid);
409     select (undef, undef, undef, 0.1);
410   }
411   Log (undef, "Install exited $?");
412 }
413
414
415
416 foreach (qw (script script_version script_parameters resource_limits))
417 {
418   Log (undef, $_ . " " . $Job->{$_});
419 }
420 foreach (split (/\n/, $Job->{knobs}))
421 {
422   Log (undef, "knob " . $_);
423 }
424
425
426
427 my $success;
428
429
430
431 ONELEVEL:
432
433 my $thisround_succeeded = 0;
434 my $thisround_failed = 0;
435 my $thisround_failed_multiple = 0;
436
437 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
438                        or $a <=> $b } @jobstep_todo;
439 my $level = $jobstep[$jobstep_todo[0]]->{level};
440 Log (undef, "start level $level");
441
442
443
444 my %proc;
445 my @freeslot = (0..$#slot);
446 my @holdslot;
447 my %reader;
448 my $progress_is_dirty = 1;
449 my $progress_stats_updated = 0;
450
451 update_progress_stats();
452
453
454
455 THISROUND:
456 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 {
458   $main::please_continue = 0;
459
460   my $id = $jobstep_todo[$todo_ptr];
461   my $Jobstep = $jobstep[$id];
462   if ($Jobstep->{level} != $level)
463   {
464     next;
465   }
466   if ($Jobstep->{attempts} > 9)
467   {
468     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
469     $success = 0;
470     last THISROUND;
471   }
472
473   pipe $reader{$id}, "writer" or croak ($!);
474   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
475   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476
477   my $childslot = $freeslot[0];
478   my $childnode = $slot[$childslot]->{node};
479   my $childslotname = join (".",
480                             $slot[$childslot]->{node}->{name},
481                             $slot[$childslot]->{cpu});
482   my $childpid = fork();
483   if ($childpid == 0)
484   {
485     $SIG{'INT'} = 'DEFAULT';
486     $SIG{'QUIT'} = 'DEFAULT';
487     $SIG{'TERM'} = 'DEFAULT';
488
489     foreach (values (%reader))
490     {
491       close($_);
492     }
493     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
494     open(STDOUT,">&writer");
495     open(STDERR,">&writer");
496
497     undef $dbh;
498     undef $sth;
499
500     delete $ENV{"GNUPGHOME"};
501     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
502     $ENV{"TASK_QSEQUENCE"} = $id;
503     $ENV{"TASK_SEQUENCE"} = $level;
504     $ENV{"JOB_SCRIPT"} = $Job->{script};
505     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
506       $param =~ tr/a-z/A-Z/;
507       $ENV{"JOB_PARAMETER_$param"} = $value;
508     }
509     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
510     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
511     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
512     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
513
514     $ENV{"GZIP"} = "-n";
515
516     my @srunargs = (
517       "srun",
518       "--nodelist=".$childnode->{name},
519       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
520       "--job-name=$job_id.$id.$$",
521         );
522     my @execargs = qw(sh);
523     my $build_script_to_send = "";
524     my $command =
525         "mkdir -p $ENV{CRUNCH_TMP}/revision "
526         ."&& cd $ENV{CRUNCH_TMP} ";
527     if ($build_script)
528     {
529       $build_script_to_send = $build_script;
530       $command .=
531           "&& perl -";
532     }
533     elsif (!$skip_install)
534     {
535       $command .=
536           "&& "
537           ."( "
538           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539           ."|| "
540           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
541           ."    && ./installrevision "
542           ."  ) "
543           .") ";
544     }
545     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546     $command .=
547         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
548     my @execargs = ('bash', '-c', $command);
549     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
550     exit (1);
551   }
552   close("writer");
553   if (!defined $childpid)
554   {
555     close $reader{$id};
556     delete $reader{$id};
557     next;
558   }
559   shift @freeslot;
560   $proc{$childpid} = { jobstep => $id,
561                        time => time,
562                        slot => $childslot,
563                        jobstepname => "$job_id.$id.$childpid",
564                      };
565   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
566   $slot[$childslot]->{pid} = $childpid;
567
568   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
569   Log ($id, "child $childpid started on $childslotname");
570   $Jobstep->{attempts} ++;
571   $Jobstep->{starttime} = time;
572   $Jobstep->{node} = $childnode->{name};
573   $Jobstep->{slotindex} = $childslot;
574   delete $Jobstep->{stderr};
575   delete $Jobstep->{finishtime};
576
577   splice @jobstep_todo, $todo_ptr, 1;
578   --$todo_ptr;
579
580   $progress_is_dirty = 1;
581
582   while (!@freeslot
583          ||
584          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
585   {
586     last THISROUND if $main::please_freeze;
587     if ($main::please_info)
588     {
589       $main::please_info = 0;
590       freeze();
591       collate_output();
592       save_meta(1);
593       update_progress_stats();
594     }
595     my $gotsome
596         = readfrompipes ()
597         + reapchildren ();
598     if (!$gotsome)
599     {
600       check_squeue();
601       update_progress_stats();
602       select (undef, undef, undef, 0.1);
603     }
604     elsif (time - $progress_stats_updated >= 30)
605     {
606       update_progress_stats();
607     }
608     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
609         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
610     {
611       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
612           .($thisround_failed+$thisround_succeeded)
613           .") -- giving up on this round";
614       Log (undef, $message);
615       last THISROUND;
616     }
617
618     # move slots from freeslot to holdslot (or back to freeslot) if necessary
619     for (my $i=$#freeslot; $i>=0; $i--) {
620       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
621         push @holdslot, (splice @freeslot, $i, 1);
622       }
623     }
624     for (my $i=$#holdslot; $i>=0; $i--) {
625       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
626         push @freeslot, (splice @holdslot, $i, 1);
627       }
628     }
629
630     # give up if no nodes are succeeding
631     if (!grep { $_->{node}->{losing_streak} == 0 &&
632                     $_->{node}->{hold_count} < 4 } @slot) {
633       my $message = "Every node has failed -- giving up on this round";
634       Log (undef, $message);
635       last THISROUND;
636     }
637   }
638 }
639
640
641 push @freeslot, splice @holdslot;
642 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
643
644
645 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
646 while (%proc)
647 {
648   goto THISROUND if $main::please_continue;
649   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
650   readfrompipes ();
651   if (!reapchildren())
652   {
653     check_squeue();
654     update_progress_stats();
655     select (undef, undef, undef, 0.1);
656     killem (keys %proc) if $main::please_freeze;
657   }
658 }
659
660 update_progress_stats();
661 freeze_if_want_freeze();
662
663
664 if (!defined $success)
665 {
666   if (@jobstep_todo &&
667       $thisround_succeeded == 0 &&
668       ($thisround_failed == 0 || $thisround_failed > 4))
669   {
670     my $message = "stop because $thisround_failed tasks failed and none succeeded";
671     Log (undef, $message);
672     $success = 0;
673   }
674   if (!@jobstep_todo)
675   {
676     $success = 1;
677   }
678 }
679
680 goto ONELEVEL if !defined $success;
681
682
683 release_allocation();
684 freeze();
685 $Job->{'output'} = &collate_output();
686 $Job->{'success'} = $Job->{'output'} && $success;
687 $Job->save;
688
689 if ($Job->{'output'})
690 {
691   eval {
692     my $manifest_text = capturex("whget", $Job->{'output'});
693     $arv->{'collections'}->{'create'}->execute('collection' => {
694       'uuid' => $Job->{'output'},
695       'manifest_text' => $manifest_text,
696     });
697   };
698   if ($@) {
699     Log (undef, "Failed to register output manifest: $@");
700   }
701 }
702
703 Log (undef, "finish");
704
705 save_meta();
706 exit 0;
707
708
709
710 sub update_progress_stats
711 {
712   $progress_stats_updated = time;
713   return if !$progress_is_dirty;
714   my ($todo, $done, $running) = (scalar @jobstep_todo,
715                                  scalar @jobstep_done,
716                                  scalar @slot - scalar @freeslot - scalar @holdslot);
717   $Job->{'tasks_summary'} ||= {};
718   $Job->{'tasks_summary'}->{'todo'} = $todo;
719   $Job->{'tasks_summary'}->{'done'} = $done;
720   $Job->{'tasks_summary'}->{'running'} = $running;
721   $Job->save;
722   Log (undef, "status: $done done, $running running, $todo todo");
723   $progress_is_dirty = 0;
724 }
725
726
727
728 sub reapchildren
729 {
730   my $pid = waitpid (-1, WNOHANG);
731   return 0 if $pid <= 0;
732
733   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
734                   . "."
735                   . $slot[$proc{$pid}->{slot}]->{cpu});
736   my $jobstepid = $proc{$pid}->{jobstep};
737   my $elapsed = time - $proc{$pid}->{time};
738   my $Jobstep = $jobstep[$jobstepid];
739
740   my $exitcode = $?;
741   my $exitinfo = "exit $exitcode";
742   $Jobstep->{'arvados_task'}->reload;
743   my $success = $Jobstep->{'arvados_task'}->{success};
744
745   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
746
747   if (!defined $success) {
748     # task did not indicate one way or the other --> fail
749     $Jobstep->{'arvados_task'}->{success} = 0;
750     $Jobstep->{'arvados_task'}->save;
751     $success = 0;
752   }
753
754   if (!$success)
755   {
756     my $no_incr_attempts;
757     $no_incr_attempts = 1 if $Jobstep->{node_fail};
758
759     ++$thisround_failed;
760     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
761
762     # Check for signs of a failed or misconfigured node
763     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
764         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
765       # Don't count this against jobstep failure thresholds if this
766       # node is already suspected faulty and srun exited quickly
767       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
768           $elapsed < 5 &&
769           $Jobstep->{attempts} > 1) {
770         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
771         $no_incr_attempts = 1;
772         --$Jobstep->{attempts};
773       }
774       ban_node_by_slot($proc{$pid}->{slot});
775     }
776
777     push @jobstep_todo, $jobstepid;
778     Log ($jobstepid, "failure in $elapsed seconds");
779
780     --$Jobstep->{attempts} if $no_incr_attempts;
781     $Job->{'tasks_summary'}->{'failed'}++;
782   }
783   else
784   {
785     ++$thisround_succeeded;
786     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
787     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
788     push @jobstep_done, $jobstepid;
789     Log ($jobstepid, "success in $elapsed seconds");
790   }
791   $Jobstep->{exitcode} = $exitcode;
792   $Jobstep->{finishtime} = time;
793   process_stderr ($jobstepid, $success);
794   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
795
796   close $reader{$jobstepid};
797   delete $reader{$jobstepid};
798   delete $slot[$proc{$pid}->{slot}]->{pid};
799   push @freeslot, $proc{$pid}->{slot};
800   delete $proc{$pid};
801
802   # Load new tasks
803   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
804     'where' => {
805       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
806     },
807     'order' => 'qsequence'
808   );
809   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
810     my $jobstep = {
811       'level' => $arvados_task->{'sequence'},
812       'attempts' => 0,
813       'arvados_task' => $arvados_task
814     };
815     push @jobstep, $jobstep;
816     push @jobstep_todo, $#jobstep;
817   }
818
819   $progress_is_dirty = 1;
820   1;
821 }
822
823
824 sub check_squeue
825 {
826   # return if the kill list was checked <4 seconds ago
827   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
828   {
829     return;
830   }
831   $squeue_kill_checked = time;
832
833   # use killem() on procs whose killtime is reached
834   for (keys %proc)
835   {
836     if (exists $proc{$_}->{killtime}
837         && $proc{$_}->{killtime} <= time)
838     {
839       killem ($_);
840     }
841   }
842
843   # return if the squeue was checked <60 seconds ago
844   if (defined $squeue_checked && $squeue_checked > time - 60)
845   {
846     return;
847   }
848   $squeue_checked = time;
849
850   if (!$have_slurm)
851   {
852     # here is an opportunity to check for mysterious problems with local procs
853     return;
854   }
855
856   # get a list of steps still running
857   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
858   chop @squeue;
859   if ($squeue[-1] ne "ok")
860   {
861     return;
862   }
863   pop @squeue;
864
865   # which of my jobsteps are running, according to squeue?
866   my %ok;
867   foreach (@squeue)
868   {
869     if (/^(\d+)\.(\d+) (\S+)/)
870     {
871       if ($1 eq $ENV{SLURM_JOBID})
872       {
873         $ok{$3} = 1;
874       }
875     }
876   }
877
878   # which of my active child procs (>60s old) were not mentioned by squeue?
879   foreach (keys %proc)
880   {
881     if ($proc{$_}->{time} < time - 60
882         && !exists $ok{$proc{$_}->{jobstepname}}
883         && !exists $proc{$_}->{killtime})
884     {
885       # kill this proc if it hasn't exited in 30 seconds
886       $proc{$_}->{killtime} = time + 30;
887     }
888   }
889 }
890
891
892 sub release_allocation
893 {
894   if ($have_slurm)
895   {
896     Log (undef, "release job allocation");
897     system "scancel $ENV{SLURM_JOBID}";
898   }
899 }
900
901
902 sub readfrompipes
903 {
904   my $gotsome = 0;
905   foreach my $job (keys %reader)
906   {
907     my $buf;
908     while (0 < sysread ($reader{$job}, $buf, 8192))
909     {
910       print STDERR $buf if $ENV{CRUNCH_DEBUG};
911       $jobstep[$job]->{stderr} .= $buf;
912       preprocess_stderr ($job);
913       if (length ($jobstep[$job]->{stderr}) > 16384)
914       {
915         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
916       }
917       $gotsome = 1;
918     }
919   }
920   return $gotsome;
921 }
922
923
924 sub preprocess_stderr
925 {
926   my $job = shift;
927
928   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
929     my $line = $1;
930     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
931     Log ($job, "stderr $line");
932     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
933       # whoa.
934       $main::please_freeze = 1;
935     }
936     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
937       $jobstep[$job]->{node_fail} = 1;
938       ban_node_by_slot($jobstep[$job]->{slotindex});
939     }
940   }
941 }
942
943
944 sub process_stderr
945 {
946   my $job = shift;
947   my $success = shift;
948   preprocess_stderr ($job);
949
950   map {
951     Log ($job, "stderr $_");
952   } split ("\n", $jobstep[$job]->{stderr});
953 }
954
955
956 sub collate_output
957 {
958   my $whc = Warehouse->new;
959   Log (undef, "collate");
960   $whc->write_start (1);
961   my $joboutput;
962   for (@jobstep)
963   {
964     next if (!exists $_->{'arvados_task'}->{output} ||
965              !$_->{'arvados_task'}->{'success'} ||
966              $_->{'exitcode'} != 0);
967     my $output = $_->{'arvados_task'}->{output};
968     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
969     {
970       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
971       $whc->write_data ($output);
972     }
973     elsif (@jobstep == 1)
974     {
975       $joboutput = $output;
976       $whc->write_finish;
977     }
978     elsif (defined (my $outblock = $whc->fetch_block ($output)))
979     {
980       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
981       $whc->write_data ($outblock);
982     }
983     else
984     {
985       my $errstr = $whc->errstr;
986       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
987       $success = 0;
988     }
989   }
990   $joboutput = $whc->write_finish if !defined $joboutput;
991   if ($joboutput)
992   {
993     Log (undef, "output $joboutput");
994     $Job->{'output'} = $joboutput;
995     $Job->save;
996   }
997   else
998   {
999     Log (undef, "output undef");
1000   }
1001   return $joboutput;
1002 }
1003
1004
1005 sub killem
1006 {
1007   foreach (@_)
1008   {
1009     my $sig = 2;                # SIGINT first
1010     if (exists $proc{$_}->{"sent_$sig"} &&
1011         time - $proc{$_}->{"sent_$sig"} > 4)
1012     {
1013       $sig = 15;                # SIGTERM if SIGINT doesn't work
1014     }
1015     if (exists $proc{$_}->{"sent_$sig"} &&
1016         time - $proc{$_}->{"sent_$sig"} > 4)
1017     {
1018       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1019     }
1020     if (!exists $proc{$_}->{"sent_$sig"})
1021     {
1022       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1023       kill $sig, $_;
1024       select (undef, undef, undef, 0.1);
1025       if ($sig == 2)
1026       {
1027         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1028       }
1029       $proc{$_}->{"sent_$sig"} = time;
1030       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1031     }
1032   }
1033 }
1034
1035
1036 sub fhbits
1037 {
1038   my($bits);
1039   for (@_) {
1040     vec($bits,fileno($_),1) = 1;
1041   }
1042   $bits;
1043 }
1044
1045
1046 sub Log                         # ($jobstep_id, $logmessage)
1047 {
1048   if ($_[1] =~ /\n/) {
1049     for my $line (split (/\n/, $_[1])) {
1050       Log ($_[0], $line);
1051     }
1052     return;
1053   }
1054   my $fh = select STDERR; $|=1; select $fh;
1055   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1056   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1057   $message .= "\n";
1058   my $datetime;
1059   if ($metastream || -t STDERR) {
1060     my @gmtime = gmtime;
1061     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1062                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1063   }
1064   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1065
1066   return if !$metastream;
1067   $metastream->write_data ($datetime . " " . $message);
1068 }
1069
1070
1071 sub reconnect_database
1072 {
1073   return if !$job_has_uuid;
1074   return if ($dbh && $dbh->do ("select now()"));
1075   for (1..16)
1076   {
1077     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1078     if ($dbh) {
1079       $dbh->{InactiveDestroy} = 1;
1080       return;
1081     }
1082     warn ($DBI::errstr);
1083     sleep $_;
1084   }
1085   croak ($DBI::errstr) if !$dbh;
1086 }
1087
1088
1089 sub dbh_do
1090 {
1091   return 1 if !$job_has_uuid;
1092   my $ret = $dbh->do (@_);
1093   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1094   reconnect_database();
1095   return $dbh->do (@_);
1096 }
1097
1098
1099 sub croak
1100 {
1101   my ($package, $file, $line) = caller;
1102   my $message = "@_ at $file line $line\n";
1103   Log (undef, $message);
1104   freeze() if @jobstep_todo;
1105   collate_output() if @jobstep_todo;
1106   cleanup();
1107   save_meta() if $metastream;
1108   die;
1109 }
1110
1111
1112 sub cleanup
1113 {
1114   return if !$job_has_uuid;
1115   $Job->reload;
1116   $Job->{'running'} = 0;
1117   $Job->{'success'} = 0;
1118   $Job->{'finished_at'} = time;
1119   $Job->save;
1120 }
1121
1122
1123 sub save_meta
1124 {
1125   my $justcheckpoint = shift; # false if this will be the last meta saved
1126   my $m = $metastream;
1127   $m = $m->copy if $justcheckpoint;
1128   $m->write_finish;
1129   my $loglocator = $m->as_key;
1130   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1131   Log (undef, "meta key is $loglocator");
1132   $Job->{'log'} = $loglocator;
1133   $Job->save;
1134 }
1135
1136
1137 sub freeze_if_want_freeze
1138 {
1139   if ($main::please_freeze)
1140   {
1141     release_allocation();
1142     if (@_)
1143     {
1144       # kill some srun procs before freeze+stop
1145       map { $proc{$_} = {} } @_;
1146       while (%proc)
1147       {
1148         killem (keys %proc);
1149         select (undef, undef, undef, 0.1);
1150         my $died;
1151         while (($died = waitpid (-1, WNOHANG)) > 0)
1152         {
1153           delete $proc{$died};
1154         }
1155       }
1156     }
1157     freeze();
1158     collate_output();
1159     cleanup();
1160     save_meta();
1161     exit 0;
1162   }
1163 }
1164
1165
1166 sub freeze
1167 {
1168   Log (undef, "Freeze not implemented");
1169   return;
1170
1171   my $whc;                      # todo
1172   Log (undef, "freeze");
1173
1174   my $freezer = new Warehouse::Stream (whc => $whc);
1175   $freezer->clear;
1176   $freezer->name (".");
1177   $freezer->write_start ("state.txt");
1178
1179   $freezer->write_data (join ("\n",
1180                               "job $Job->{uuid}",
1181                               map
1182                               {
1183                                 $_ . "=" . freezequote($Job->{$_})
1184                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1185
1186   foreach my $Jobstep (@jobstep)
1187   {
1188     my $str = join ("\n",
1189                     map
1190                     {
1191                       $_ . "=" . freezequote ($Jobstep->{$_})
1192                     } grep {
1193                       $_ !~ /^stderr|slotindex|node_fail/
1194                     } keys %$Jobstep);
1195     $freezer->write_data ($str."\n\n");
1196   }
1197   if (@jobstep_tomerge)
1198   {
1199     $freezer->write_data
1200         ("merge $jobstep_tomerge_level "
1201          . freezequote (join ("\n",
1202                               map { freezequote ($_) } @jobstep_tomerge))
1203          . "\n\n");
1204   }
1205
1206   $freezer->write_finish;
1207   my $frozentokey = $freezer->as_key;
1208   undef $freezer;
1209   Log (undef, "frozento key is $frozentokey");
1210   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1211           $frozentokey, $job_id);
1212   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1213   Log (undef, "frozento+K key is $kfrozentokey");
1214   return $frozentokey;
1215 }
1216
1217
1218 sub thaw
1219 {
1220   croak ("Thaw not implemented");
1221
1222   my $whc;
1223   my $key = shift;
1224   Log (undef, "thaw from $key");
1225
1226   @jobstep = ();
1227   @jobstep_done = ();
1228   @jobstep_todo = ();
1229   @jobstep_tomerge = ();
1230   $jobstep_tomerge_level = 0;
1231   my $frozenjob = {};
1232
1233   my $stream = new Warehouse::Stream ( whc => $whc,
1234                                        hash => [split (",", $key)] );
1235   $stream->rewind;
1236   while (my $dataref = $stream->read_until (undef, "\n\n"))
1237   {
1238     if ($$dataref =~ /^job /)
1239     {
1240       foreach (split ("\n", $$dataref))
1241       {
1242         my ($k, $v) = split ("=", $_, 2);
1243         $frozenjob->{$k} = freezeunquote ($v);
1244       }
1245       next;
1246     }
1247
1248     if ($$dataref =~ /^merge (\d+) (.*)/)
1249     {
1250       $jobstep_tomerge_level = $1;
1251       @jobstep_tomerge
1252           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1253       next;
1254     }
1255
1256     my $Jobstep = { };
1257     foreach (split ("\n", $$dataref))
1258     {
1259       my ($k, $v) = split ("=", $_, 2);
1260       $Jobstep->{$k} = freezeunquote ($v) if $k;
1261     }
1262     $Jobstep->{attempts} = 0;
1263     push @jobstep, $Jobstep;
1264
1265     if ($Jobstep->{exitcode} eq "0")
1266     {
1267       push @jobstep_done, $#jobstep;
1268     }
1269     else
1270     {
1271       push @jobstep_todo, $#jobstep;
1272     }
1273   }
1274
1275   foreach (qw (script script_version script_parameters))
1276   {
1277     $Job->{$_} = $frozenjob->{$_};
1278   }
1279   $Job->save;
1280 }
1281
1282
1283 sub freezequote
1284 {
1285   my $s = shift;
1286   $s =~ s/\\/\\\\/g;
1287   $s =~ s/\n/\\n/g;
1288   return $s;
1289 }
1290
1291
1292 sub freezeunquote
1293 {
1294   my $s = shift;
1295   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1296   return $s;
1297 }
1298
1299
1300 sub srun
1301 {
1302   my $srunargs = shift;
1303   my $execargs = shift;
1304   my $opts = shift || {};
1305   my $stdin = shift;
1306   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1307   print STDERR (join (" ",
1308                       map { / / ? "'$_'" : $_ }
1309                       (@$args)),
1310                 "\n")
1311       if $ENV{CRUNCH_DEBUG};
1312
1313   if (defined $stdin) {
1314     my $child = open STDIN, "-|";
1315     defined $child or die "no fork: $!";
1316     if ($child == 0) {
1317       print $stdin or die $!;
1318       close STDOUT or die $!;
1319       exit 0;
1320     }
1321   }
1322
1323   return system (@$args) if $opts->{fork};
1324
1325   exec @$args;
1326   warn "ENV size is ".length(join(" ",%ENV));
1327   die "exec failed: $!: @$args";
1328 }
1329
1330
1331 sub ban_node_by_slot {
1332   # Don't start any new jobsteps on this node for 60 seconds
1333   my $slotid = shift;
1334   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1335   $slot[$slotid]->{node}->{hold_count}++;
1336   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1337 }
1338
1339 __DATA__
1340 #!/usr/bin/perl
1341
1342 # checkout-and-build
1343
1344 use Fcntl ':flock';
1345
1346 my $destdir = $ENV{"CRUNCH_SRC"};
1347 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1348 my $repo = $ENV{"CRUNCH_SRC_URL"};
1349
1350 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1351 flock L, LOCK_EX;
1352 if (readlink ("$destdir.commit") eq $commit) {
1353     exit 0;
1354 }
1355
1356 open STDOUT, ">", "$destdir.log";
1357 open STDERR, ">&STDOUT";
1358
1359 if (-d "$destdir/.git") {
1360     chdir $destdir or die "chdir $destdir: $!";
1361     if (0 != system (qw(git remote set-url origin), $repo)) {
1362         # awful... for old versions of git that don't know "remote set-url"
1363         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1364     }
1365 }
1366 elsif ($repo && $commit)
1367 {
1368     shell_or_die('git', 'clone', $repo, $destdir);
1369     chdir $destdir or die "chdir $destdir: $!";
1370     shell_or_die(qw(git config clean.requireForce false));
1371 }
1372 else {
1373     die "$destdir does not exist, and no repo/commit specified -- giving up";
1374 }
1375
1376 if ($commit) {
1377     unlink "$destdir.commit";
1378     shell_or_die (qw(git stash));
1379     shell_or_die (qw(git clean -d -x));
1380     shell_or_die (qw(git fetch origin));
1381     shell_or_die (qw(git checkout), $commit);
1382 }
1383
1384 my $pwd;
1385 chomp ($pwd = `pwd`);
1386 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1387 mkdir $install_dir;
1388 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1389     # Old version
1390     shell_or_die ("./tests/autotests.sh", $install_dir);
1391 } elsif (-e "./install.sh") {
1392     shell_or_die ("./install.sh", $install_dir);
1393 }
1394
1395 if ($commit) {
1396     unlink "$destdir.commit.new";
1397     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1398     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1399 }
1400
1401 close L;
1402
1403 exit 0;
1404
1405 sub shell_or_die
1406 {
1407   if ($ENV{"DEBUG"}) {
1408     print STDERR "@_\n";
1409   }
1410   system (@_) == 0
1411       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1412 }