861ac6c8e2ae77786adae00b1eec17be6a44188d
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
63
64 my $force_unlock;
65 my $jobspec;
66 my $resume_stash;
67 GetOptions('force-unlock' => \$force_unlock,
68            'job=s' => \$jobspec,
69            'resume-stash=s' => \$resume_stash,
70     );
71
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
74
75
76 $SIG{'HUP'} = sub
77 {
78   1;
79 };
80 $SIG{'USR1'} = sub
81 {
82   $main::ENV{CRUNCH_DEBUG} = 1;
83 };
84 $SIG{'USR2'} = sub
85 {
86   $main::ENV{CRUNCH_DEBUG} = 0;
87 };
88
89
90
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
93 $metastream->clear;
94 $metastream->write_start('log.txt');
95
96 my $User = {};
97 my $Job = {};
98 my $job_id;
99 my $dbh;
100 my $sth;
101 if ($job_has_uuid)
102 {
103   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104   $User = $arv->{'users'}->{'current'}->execute;
105   if (!$force_unlock) {
106     if ($Job->{'is_locked_by'}) {
107       croak("Job is locked: " . $Job->{'is_locked_by'});
108     }
109     if ($Job->{'success'} ne undef) {
110       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
111     }
112     if ($Job->{'running'}) {
113       croak("Job 'running' flag is already set");
114     }
115     if ($Job->{'started_at'}) {
116       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
117     }
118   }
119 }
120 else
121 {
122   $Job = JSON::decode_json($jobspec);
123
124   if (!$resume_stash)
125   {
126     map { croak ("No $_ specified") unless $Job->{$_} }
127     qw(script script_version script_parameters);
128   }
129
130   if (!defined $Job->{'uuid'}) {
131     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
132   }
133 }
134 $job_id = $Job->{'uuid'};
135
136
137
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
141
142
143 Log (undef, "check slurm allocation");
144 my @slot;
145 my @node;
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
147 my @sinfo;
148 if (!$have_slurm)
149 {
150   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151   push @sinfo, "$localcpus localhost";
152 }
153 if (exists $ENV{SLURM_NODELIST})
154 {
155   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
156 }
157 foreach (@sinfo)
158 {
159   my ($ncpus, $slurm_nodelist) = split;
160   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
161
162   my @nodelist;
163   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
164   {
165     my $nodelist = $1;
166     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
167     {
168       my $ranges = $1;
169       foreach (split (",", $ranges))
170       {
171         my ($a, $b);
172         if (/(\d+)-(\d+)/)
173         {
174           $a = $1;
175           $b = $2;
176         }
177         else
178         {
179           $a = $_;
180           $b = $_;
181         }
182         push @nodelist, map {
183           my $n = $nodelist;
184           $n =~ s/\[[-,\d]+\]/$_/;
185           $n;
186         } ($a..$b);
187       }
188     }
189     else
190     {
191       push @nodelist, $nodelist;
192     }
193   }
194   foreach my $nodename (@nodelist)
195   {
196     Log (undef, "node $nodename - $ncpus slots");
197     my $node = { name => $nodename,
198                  ncpus => $ncpus,
199                  losing_streak => 0,
200                  hold_until => 0 };
201     foreach my $cpu (1..$ncpus)
202     {
203       push @slot, { node => $node,
204                     cpu => $cpu };
205     }
206   }
207   push @node, @nodelist;
208 }
209
210
211
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
214
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
216
217
218
219 my $jobmanager_id;
220 if ($job_has_uuid)
221 {
222   # Claim this job, and make sure nobody else does
223
224   $Job->{'is_locked_by'} = $User->{'uuid'};
225   $Job->{'started_at'} = time;
226   $Job->{'running'} = 1;
227   $Job->{'success'} = undef;
228   $Job->{'tasks_summary'} = { 'failed' => 0,
229                               'todo' => 1,
230                               'running' => 0,
231                               'done' => 0 };
232   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233     croak("Error while updating / locking job");
234   }
235 }
236
237
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
249
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
253
254
255 my @jobstep;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
260 my $squeue_checked;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
263
264
265
266 if (defined $Job->{thawedfromkey})
267 {
268   thaw ($Job->{thawedfromkey});
269 }
270 else
271 {
272   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273     'job_uuid' => $Job->{'uuid'},
274     'sequence' => 0,
275     'qsequence' => 0,
276     'parameters' => {},
277                                                           });
278   push @jobstep, { level => 0,
279                    attempts => 0,
280                    arvados_task => $first_task,
281                  };
282   push @jobstep_todo, 0;
283 }
284
285
286 my $build_script;
287 do {
288   local $/ = undef;
289   $build_script = <DATA>;
290 };
291
292
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
294
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
296 if ($skip_install)
297 {
298   $ENV{"CRUNCH_SRC"} = $Job->{revision};
299 }
300 else
301 {
302   Log (undef, "Install revision ".$Job->{revision});
303   my $nodelist = join(",", @node);
304
305   # Clean out crunch_tmp/work and crunch_tmp/opt
306
307   my $cleanpid = fork();
308   if ($cleanpid == 0)
309   {
310     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
312     exit (1);
313   }
314   while (1)
315   {
316     last if $cleanpid == waitpid (-1, WNOHANG);
317     freeze_if_want_freeze ($cleanpid);
318     select (undef, undef, undef, 0.1);
319   }
320   Log (undef, "Clean-work-dir exited $?");
321
322   # Install requested code version
323
324   my @execargs;
325   my @srunargs = ("srun",
326                   "--nodelist=$nodelist",
327                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
328
329   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
332
333   my $commit;
334   my $treeish = $Job->{'script_version'};
335   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336   # Todo: let script_version specify alternate repo
337   $ENV{"CRUNCH_SRC_URL"} = $repo;
338
339   # Create/update our clone of the remote git repo
340
341   if (!-d $ENV{"CRUNCH_SRC"}) {
342     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343         or croak ("git clone $repo failed: exit ".($?>>8));
344     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
345   }
346   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
347
348   # If this looks like a subversion r#, look for it in git-svn commit messages
349
350   if ($treeish =~ m{^\d{1,4}$}) {
351     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
352     chomp $gitlog;
353     if ($gitlog =~ /^[a-f0-9]{40}$/) {
354       $commit = $gitlog;
355       Log (undef, "Using commit $commit for revision $treeish");
356     }
357   }
358
359   # If that didn't work, try asking git to look it up as a tree-ish.
360
361   if (!defined $commit) {
362
363     my $cooked_treeish = $treeish;
364     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365       # Looks like a git branch name -- make sure git knows it's
366       # relative to the remote repo
367       $cooked_treeish = "origin/$treeish";
368     }
369
370     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
371     chomp $found;
372     if ($found =~ /^[0-9a-f]{40}$/s) {
373       $commit = $found;
374       if ($commit ne $treeish) {
375         # Make sure we record the real commit id in the database,
376         # frozentokey, logs, etc. -- instead of an abbreviation or a
377         # branch name which can become ambiguous or point to a
378         # different commit in the future.
379         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380         Log (undef, "Using commit $commit for tree-ish $treeish");
381         if ($commit ne $treeish) {
382           $Job->{'script_version'} = $commit;
383           $Job->save() or croak("Error while updating job");
384         }
385       }
386     }
387   }
388
389   if (defined $commit) {
390     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391     @execargs = ("sh", "-c",
392                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
393   }
394   else {
395     croak ("could not figure out commit id for $treeish");
396   }
397
398   my $installpid = fork();
399   if ($installpid == 0)
400   {
401     srun (\@srunargs, \@execargs, {}, $build_script);
402     exit (1);
403   }
404   while (1)
405   {
406     last if $installpid == waitpid (-1, WNOHANG);
407     freeze_if_want_freeze ($installpid);
408     select (undef, undef, undef, 0.1);
409   }
410   Log (undef, "Install exited $?");
411 }
412
413
414
415 foreach (qw (script script_version script_parameters resource_limits))
416 {
417   Log (undef, $_ . " " . $Job->{$_});
418 }
419 foreach (split (/\n/, $Job->{knobs}))
420 {
421   Log (undef, "knob " . $_);
422 }
423
424
425
426 my $success;
427
428
429
430 ONELEVEL:
431
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
435
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437                        or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
440
441
442
443 my %proc;
444 my @freeslot = (0..$#slot);
445 my @holdslot;
446 my %reader;
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
449
450 update_progress_stats();
451
452
453
454 THISROUND:
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
456 {
457   $main::please_continue = 0;
458
459   my $id = $jobstep_todo[$todo_ptr];
460   my $Jobstep = $jobstep[$id];
461   if ($Jobstep->{level} != $level)
462   {
463     next;
464   }
465   if ($Jobstep->{attempts} > 9)
466   {
467     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
468     $success = 0;
469     last THISROUND;
470   }
471
472   pipe $reader{$id}, "writer" or croak ($!);
473   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
475
476   my $childslot = $freeslot[0];
477   my $childnode = $slot[$childslot]->{node};
478   my $childslotname = join (".",
479                             $slot[$childslot]->{node}->{name},
480                             $slot[$childslot]->{cpu});
481   my $childpid = fork();
482   if ($childpid == 0)
483   {
484     $SIG{'INT'} = 'DEFAULT';
485     $SIG{'QUIT'} = 'DEFAULT';
486     $SIG{'TERM'} = 'DEFAULT';
487
488     foreach (values (%reader))
489     {
490       close($_);
491     }
492     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493     open(STDOUT,">&writer");
494     open(STDERR,">&writer");
495
496     undef $dbh;
497     undef $sth;
498
499     delete $ENV{"GNUPGHOME"};
500     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501     $ENV{"TASK_QSEQUENCE"} = $id;
502     $ENV{"TASK_SEQUENCE"} = $level;
503     $ENV{"JOB_SCRIPT"} = $Job->{script};
504     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505       $param =~ tr/a-z/A-Z/;
506       $ENV{"JOB_PARAMETER_$param"} = $value;
507     }
508     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
512
513     $ENV{"GZIP"} = "-n";
514
515     my @srunargs = (
516       "srun",
517       "--nodelist=".$childnode->{name},
518       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519       "--job-name=$job_id.$id.$$",
520         );
521     my @execargs = qw(sh);
522     my $build_script_to_send = "";
523     my $command =
524         "mkdir -p $ENV{CRUNCH_TMP}/revision "
525         ."&& cd $ENV{CRUNCH_TMP} ";
526     if ($build_script)
527     {
528       $build_script_to_send = $build_script;
529       $command .=
530           "&& perl -";
531     }
532     elsif (!$skip_install)
533     {
534       $command .=
535           "&& "
536           ."( "
537           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
538           ."|| "
539           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540           ."    && ./installrevision "
541           ."  ) "
542           .") ";
543     }
544     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
545     $command .=
546         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547     my @execargs = ('bash', '-c', $command);
548     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
549     exit (1);
550   }
551   close("writer");
552   if (!defined $childpid)
553   {
554     close $reader{$id};
555     delete $reader{$id};
556     next;
557   }
558   shift @freeslot;
559   $proc{$childpid} = { jobstep => $id,
560                        time => time,
561                        slot => $childslot,
562                        jobstepname => "$job_id.$id.$childpid",
563                      };
564   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565   $slot[$childslot]->{pid} = $childpid;
566
567   Log ($id, "child $childpid started on $childslotname");
568   $Jobstep->{attempts} ++;
569   $Jobstep->{starttime} = time;
570   $Jobstep->{node} = $childnode->{name};
571   $Jobstep->{slotindex} = $childslot;
572   delete $Jobstep->{stderr};
573   delete $Jobstep->{finishtime};
574
575   splice @jobstep_todo, $todo_ptr, 1;
576   --$todo_ptr;
577
578   $progress_is_dirty = 1;
579
580   while (!@freeslot
581          ||
582          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
583   {
584     last THISROUND if $main::please_freeze;
585     if ($main::please_info)
586     {
587       $main::please_info = 0;
588       freeze();
589       collate_output();
590       save_meta(1);
591       update_progress_stats();
592     }
593     my $gotsome
594         = readfrompipes ()
595         + reapchildren ();
596     if (!$gotsome)
597     {
598       check_squeue();
599       update_progress_stats();
600       select (undef, undef, undef, 0.1);
601     }
602     elsif (time - $progress_stats_updated >= 30)
603     {
604       update_progress_stats();
605     }
606     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
607         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
608     {
609       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
610           .($thisround_failed+$thisround_succeeded)
611           .") -- giving up on this round";
612       Log (undef, $message);
613       last THISROUND;
614     }
615
616     # move slots from freeslot to holdslot (or back to freeslot) if necessary
617     for (my $i=$#freeslot; $i>=0; $i--) {
618       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
619         push @holdslot, (splice @freeslot, $i, 1);
620       }
621     }
622     for (my $i=$#holdslot; $i>=0; $i--) {
623       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
624         push @freeslot, (splice @holdslot, $i, 1);
625       }
626     }
627
628     # give up if no nodes are succeeding
629     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
630       my $message = "Every node has failed -- giving up on this round";
631       Log (undef, $message);
632       last THISROUND;
633     }
634   }
635 }
636
637
638 push @freeslot, splice @holdslot;
639 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
640
641
642 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
643 while (%proc)
644 {
645   goto THISROUND if $main::please_continue;
646   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
647   readfrompipes ();
648   if (!reapchildren())
649   {
650     check_squeue();
651     update_progress_stats();
652     select (undef, undef, undef, 0.1);
653     killem (keys %proc) if $main::please_freeze;
654   }
655 }
656
657 update_progress_stats();
658 freeze_if_want_freeze();
659
660
661 if (!defined $success)
662 {
663   if (@jobstep_todo &&
664       $thisround_succeeded == 0 &&
665       ($thisround_failed == 0 || $thisround_failed > 4))
666   {
667     my $message = "stop because $thisround_failed tasks failed and none succeeded";
668     Log (undef, $message);
669     $success = 0;
670   }
671   if (!@jobstep_todo)
672   {
673     $success = 1;
674   }
675 }
676
677 goto ONELEVEL if !defined $success;
678
679
680 release_allocation();
681 freeze();
682 $Job->{'output'} = &collate_output();
683 $Job->{'success'} = 0 if !$Job->{'output'};
684 $Job->save;
685
686 if ($Job->{'output'})
687 {
688   $arv->{'collections'}->{'create'}->execute('collection' => {
689     'uuid' => $Job->{'output'},
690     'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
691                                              });;
692 }
693
694
695 Log (undef, "finish");
696
697 $Job->{'success'} = $Job->{'output'} && $success;
698 $Job->save;
699
700 save_meta();
701 exit 0;
702
703
704
705 sub update_progress_stats
706 {
707   $progress_stats_updated = time;
708   return if !$progress_is_dirty;
709   my ($todo, $done, $running) = (scalar @jobstep_todo,
710                                  scalar @jobstep_done,
711                                  scalar @slot - scalar @freeslot - scalar @holdslot);
712   $Job->{'tasks_summary'} ||= {};
713   $Job->{'tasks_summary'}->{'todo'} = $todo;
714   $Job->{'tasks_summary'}->{'done'} = $done;
715   $Job->{'tasks_summary'}->{'running'} = $running;
716   $Job->save;
717   Log (undef, "status: $done done, $running running, $todo todo");
718   $progress_is_dirty = 0;
719 }
720
721
722
723 sub reapchildren
724 {
725   my $pid = waitpid (-1, WNOHANG);
726   return 0 if $pid <= 0;
727
728   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
729                   . "."
730                   . $slot[$proc{$pid}->{slot}]->{cpu});
731   my $jobstepid = $proc{$pid}->{jobstep};
732   my $elapsed = time - $proc{$pid}->{time};
733   my $Jobstep = $jobstep[$jobstepid];
734
735   my $exitcode = $?;
736   my $exitinfo = "exit $exitcode";
737   $Jobstep->{arvados_task}->reload;
738   my $success = $Jobstep->{arvados_task}->{success};
739
740   if (!defined $success) {
741     # task did not indicate one way or the other --> fail
742     $Jobstep->{arvados_task}->{success} = 0;
743     $Jobstep->{arvados_task}->save;
744     $success = 0;
745   }
746
747   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
748
749   if (!$success)
750   {
751     --$Jobstep->{attempts} if $Jobstep->{node_fail};
752     ++$thisround_failed;
753     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
754
755     # Check for signs of a failed or misconfigured node
756     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
757         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
758       # Don't count this against jobstep failure thresholds if this
759       # node is already suspected faulty and srun exited quickly
760       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
761           $elapsed < 5 &&
762           $Jobstep->{attempts} > 1) {
763         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
764         --$Jobstep->{attempts};
765       }
766       ban_node_by_slot($proc{$pid}->{slot});
767     }
768
769     push @jobstep_todo, $jobstepid;
770     Log ($jobstepid, "failure in $elapsed seconds");
771     $Job->{'tasks_summary'}->{'failed'}++;
772   }
773   else
774   {
775     ++$thisround_succeeded;
776     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
777     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
778     push @jobstep_done, $jobstepid;
779     Log ($jobstepid, "success in $elapsed seconds");
780   }
781   $Jobstep->{exitcode} = $exitcode;
782   $Jobstep->{finishtime} = time;
783   process_stderr ($jobstepid, $success);
784   Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
785
786   close $reader{$jobstepid};
787   delete $reader{$jobstepid};
788   delete $slot[$proc{$pid}->{slot}]->{pid};
789   push @freeslot, $proc{$pid}->{slot};
790   delete $proc{$pid};
791
792   # Load new tasks
793   my $newtask_list = $arv->{'job_tasks'}->{'index'}->execute('where' => {
794     'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
795                                                              });
796   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
797     my $jobstep = {
798       'level' => $arvados_task->{'sequence'},
799       'attempts' => 0,
800       'arvados_task' => $arvados_task
801     };
802     push @jobstep, $jobstep;
803     push @jobstep_todo, $#jobstep;
804   }
805
806   $progress_is_dirty = 1;
807   1;
808 }
809
810
811 sub check_squeue
812 {
813   # return if the kill list was checked <4 seconds ago
814   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
815   {
816     return;
817   }
818   $squeue_kill_checked = time;
819
820   # use killem() on procs whose killtime is reached
821   for (keys %proc)
822   {
823     if (exists $proc{$_}->{killtime}
824         && $proc{$_}->{killtime} <= time)
825     {
826       killem ($_);
827     }
828   }
829
830   # return if the squeue was checked <60 seconds ago
831   if (defined $squeue_checked && $squeue_checked > time - 60)
832   {
833     return;
834   }
835   $squeue_checked = time;
836
837   if (!$have_slurm)
838   {
839     # here is an opportunity to check for mysterious problems with local procs
840     return;
841   }
842
843   # get a list of steps still running
844   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
845   chop @squeue;
846   if ($squeue[-1] ne "ok")
847   {
848     return;
849   }
850   pop @squeue;
851
852   # which of my jobsteps are running, according to squeue?
853   my %ok;
854   foreach (@squeue)
855   {
856     if (/^(\d+)\.(\d+) (\S+)/)
857     {
858       if ($1 eq $ENV{SLURM_JOBID})
859       {
860         $ok{$3} = 1;
861       }
862     }
863   }
864
865   # which of my active child procs (>60s old) were not mentioned by squeue?
866   foreach (keys %proc)
867   {
868     if ($proc{$_}->{time} < time - 60
869         && !exists $ok{$proc{$_}->{jobstepname}}
870         && !exists $proc{$_}->{killtime})
871     {
872       # kill this proc if it hasn't exited in 30 seconds
873       $proc{$_}->{killtime} = time + 30;
874     }
875   }
876 }
877
878
879 sub release_allocation
880 {
881   if ($have_slurm)
882   {
883     Log (undef, "release job allocation");
884     system "scancel $ENV{SLURM_JOBID}";
885   }
886 }
887
888
889 sub readfrompipes
890 {
891   my $gotsome = 0;
892   foreach my $job (keys %reader)
893   {
894     my $buf;
895     while (0 < sysread ($reader{$job}, $buf, 8192))
896     {
897       print STDERR $buf if $ENV{CRUNCH_DEBUG};
898       $jobstep[$job]->{stderr} .= $buf;
899       preprocess_stderr ($job);
900       if (length ($jobstep[$job]->{stderr}) > 16384)
901       {
902         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
903       }
904       $gotsome = 1;
905     }
906   }
907   return $gotsome;
908 }
909
910
911 sub preprocess_stderr
912 {
913   my $job = shift;
914
915   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
916     my $line = $1;
917     if ($line =~ /\+\+\+mr/) {
918       last;
919     }
920     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
921     Log ($job, "stderr $line");
922     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
923       # whoa.
924       $main::please_freeze = 1;
925     }
926     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
927       $jobstep[$job]->{node_fail} = 1;
928       ban_node_by_slot($jobstep[$job]->{slotindex});
929     }
930   }
931 }
932
933
934 sub process_stderr
935 {
936   my $job = shift;
937   my $success = shift;
938   preprocess_stderr ($job);
939
940   map {
941     Log ($job, "stderr $_");
942   } split ("\n", $jobstep[$job]->{stderr});
943 }
944
945
946 sub collate_output
947 {
948   my $whc = Warehouse->new;
949   Log (undef, "collate");
950   $whc->write_start (1);
951   my $joboutput;
952   for (@jobstep)
953   {
954     next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
955     my $output = $_->{arvados_task}->{output};
956     if ($output !~ /^[0-9a-f]{32}/)
957     {
958       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
959       $whc->write_data ($output);
960     }
961     elsif (@jobstep == 1)
962     {
963       $joboutput = $output;
964       $whc->write_finish;
965     }
966     elsif (defined (my $outblock = $whc->fetch_block ($output)))
967     {
968       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
969       $whc->write_data ($outblock);
970     }
971     else
972     {
973       my $errstr = $whc->errstr;
974       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
975       $success = 0;
976     }
977   }
978   $joboutput = $whc->write_finish if !defined $joboutput;
979   if ($joboutput)
980   {
981     Log (undef, "outputkey $joboutput");
982     $Job->{'output'} = $joboutput;
983     $Job->save;
984   }
985   else
986   {
987     Log (undef, "outputkey undef");
988   }
989   return $joboutput;
990 }
991
992
993 sub killem
994 {
995   foreach (@_)
996   {
997     my $sig = 2;                # SIGINT first
998     if (exists $proc{$_}->{"sent_$sig"} &&
999         time - $proc{$_}->{"sent_$sig"} > 4)
1000     {
1001       $sig = 15;                # SIGTERM if SIGINT doesn't work
1002     }
1003     if (exists $proc{$_}->{"sent_$sig"} &&
1004         time - $proc{$_}->{"sent_$sig"} > 4)
1005     {
1006       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1007     }
1008     if (!exists $proc{$_}->{"sent_$sig"})
1009     {
1010       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1011       kill $sig, $_;
1012       select (undef, undef, undef, 0.1);
1013       if ($sig == 2)
1014       {
1015         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1016       }
1017       $proc{$_}->{"sent_$sig"} = time;
1018       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1019     }
1020   }
1021 }
1022
1023
1024 sub fhbits
1025 {
1026   my($bits);
1027   for (@_) {
1028     vec($bits,fileno($_),1) = 1;
1029   }
1030   $bits;
1031 }
1032
1033
1034 sub Log                         # ($jobstep_id, $logmessage)
1035 {
1036   if ($_[1] =~ /\n/) {
1037     for my $line (split (/\n/, $_[1])) {
1038       Log ($_[0], $line);
1039     }
1040     return;
1041   }
1042   my $fh = select STDERR; $|=1; select $fh;
1043   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1044   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1045   $message .= "\n";
1046   my $datetime;
1047   if ($metastream || -t STDERR) {
1048     my @gmtime = gmtime;
1049     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1050                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1051   }
1052   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1053
1054   return if !$metastream;
1055   $metastream->write_data ($datetime . " " . $message);
1056 }
1057
1058
1059 sub reconnect_database
1060 {
1061   return if !$job_has_uuid;
1062   return if ($dbh && $dbh->do ("select now()"));
1063   for (1..16)
1064   {
1065     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1066     if ($dbh) {
1067       $dbh->{InactiveDestroy} = 1;
1068       return;
1069     }
1070     warn ($DBI::errstr);
1071     sleep $_;
1072   }
1073   croak ($DBI::errstr) if !$dbh;
1074 }
1075
1076
1077 sub dbh_do
1078 {
1079   return 1 if !$job_has_uuid;
1080   my $ret = $dbh->do (@_);
1081   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1082   reconnect_database();
1083   return $dbh->do (@_);
1084 }
1085
1086
1087 sub croak
1088 {
1089   my ($package, $file, $line) = caller;
1090   my $message = "@_ at $file line $line\n";
1091   Log (undef, $message);
1092   freeze() if @jobstep_todo;
1093   collate_output() if @jobstep_todo;
1094   cleanup();
1095   save_meta() if $metastream;
1096   die;
1097 }
1098
1099
1100 sub cleanup
1101 {
1102   return if !$job_has_uuid || !$dbh;
1103
1104   reconnect_database();
1105   my $sth;
1106   $sth = $dbh->prepare ("update mrjobmanager set finishtime=now() where id=?");
1107   $sth->execute ($jobmanager_id);
1108   $sth = $dbh->prepare ("update mrjob set success=0, finishtime=now() where id=? and jobmanager_id=? and finishtime is null");
1109   $sth->execute ($job_id, $jobmanager_id);
1110 }
1111
1112
1113 sub save_meta
1114 {
1115   my $justcheckpoint = shift; # false if this will be the last meta saved
1116   my $m = $metastream;
1117   $m = $m->copy if $justcheckpoint;
1118   $m->write_finish;
1119   my $loglocator = $m->as_key;
1120   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1121   Log (undef, "meta key is $loglocator");
1122   $Job->{'log'} = $loglocator;
1123   $Job->save;
1124 }
1125
1126
1127 sub freeze_if_want_freeze
1128 {
1129   if ($main::please_freeze)
1130   {
1131     release_allocation();
1132     if (@_)
1133     {
1134       # kill some srun procs before freeze+stop
1135       map { $proc{$_} = {} } @_;
1136       while (%proc)
1137       {
1138         killem (keys %proc);
1139         select (undef, undef, undef, 0.1);
1140         my $died;
1141         while (($died = waitpid (-1, WNOHANG)) > 0)
1142         {
1143           delete $proc{$died};
1144         }
1145       }
1146     }
1147     freeze();
1148     collate_output();
1149     cleanup();
1150     save_meta();
1151     exit 0;
1152   }
1153 }
1154
1155
1156 sub freeze
1157 {
1158   Log (undef, "Freeze not implemented");
1159   return;
1160
1161   my $whc;                      # todo
1162   Log (undef, "freeze");
1163
1164   my $freezer = new Warehouse::Stream (whc => $whc);
1165   $freezer->clear;
1166   $freezer->name (".");
1167   $freezer->write_start ("state.txt");
1168
1169   $freezer->write_data (join ("\n",
1170                               "job $Job->{uuid}",
1171                               map
1172                               {
1173                                 $_ . "=" . freezequote($Job->{$_})
1174                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1175
1176   foreach my $Jobstep (@jobstep)
1177   {
1178     my $str = join ("\n",
1179                     map
1180                     {
1181                       $_ . "=" . freezequote ($Jobstep->{$_})
1182                     } grep {
1183                       $_ !~ /^stderr|slotindex|node_fail/
1184                     } keys %$Jobstep);
1185     $freezer->write_data ($str."\n\n");
1186   }
1187   if (@jobstep_tomerge)
1188   {
1189     $freezer->write_data
1190         ("merge $jobstep_tomerge_level "
1191          . freezequote (join ("\n",
1192                               map { freezequote ($_) } @jobstep_tomerge))
1193          . "\n\n");
1194   }
1195
1196   $freezer->write_finish;
1197   my $frozentokey = $freezer->as_key;
1198   undef $freezer;
1199   Log (undef, "frozento key is $frozentokey");
1200   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1201           $frozentokey, $job_id);
1202   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1203   Log (undef, "frozento+K key is $kfrozentokey");
1204   return $frozentokey;
1205 }
1206
1207
1208 sub thaw
1209 {
1210   croak ("Thaw not implemented");
1211
1212   my $whc;
1213   my $key = shift;
1214   Log (undef, "thaw from $key");
1215
1216   @jobstep = ();
1217   @jobstep_done = ();
1218   @jobstep_todo = ();
1219   @jobstep_tomerge = ();
1220   $jobstep_tomerge_level = 0;
1221   my $frozenjob = {};
1222
1223   my $stream = new Warehouse::Stream ( whc => $whc,
1224                                        hash => [split (",", $key)] );
1225   $stream->rewind;
1226   while (my $dataref = $stream->read_until (undef, "\n\n"))
1227   {
1228     if ($$dataref =~ /^job /)
1229     {
1230       foreach (split ("\n", $$dataref))
1231       {
1232         my ($k, $v) = split ("=", $_, 2);
1233         $frozenjob->{$k} = freezeunquote ($v);
1234       }
1235       next;
1236     }
1237
1238     if ($$dataref =~ /^merge (\d+) (.*)/)
1239     {
1240       $jobstep_tomerge_level = $1;
1241       @jobstep_tomerge
1242           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1243       next;
1244     }
1245
1246     my $Jobstep = { };
1247     foreach (split ("\n", $$dataref))
1248     {
1249       my ($k, $v) = split ("=", $_, 2);
1250       $Jobstep->{$k} = freezeunquote ($v) if $k;
1251     }
1252     $Jobstep->{attempts} = 0;
1253     push @jobstep, $Jobstep;
1254
1255     if ($Jobstep->{exitcode} eq "0")
1256     {
1257       push @jobstep_done, $#jobstep;
1258     }
1259     else
1260     {
1261       push @jobstep_todo, $#jobstep;
1262     }
1263   }
1264
1265   foreach (qw (script script_version script_parameters))
1266   {
1267     $Job->{$_} = $frozenjob->{$_};
1268   }
1269   $Job->save;
1270 }
1271
1272
1273 sub freezequote
1274 {
1275   my $s = shift;
1276   $s =~ s/\\/\\\\/g;
1277   $s =~ s/\n/\\n/g;
1278   return $s;
1279 }
1280
1281
1282 sub freezeunquote
1283 {
1284   my $s = shift;
1285   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1286   return $s;
1287 }
1288
1289
1290 sub srun
1291 {
1292   my $srunargs = shift;
1293   my $execargs = shift;
1294   my $opts = shift || {};
1295   my $stdin = shift;
1296   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1297   print STDERR (join (" ",
1298                       map { / / ? "'$_'" : $_ }
1299                       (@$args)),
1300                 "\n")
1301       if $ENV{CRUNCH_DEBUG};
1302
1303   if (defined $stdin) {
1304     my $child = open STDIN, "-|";
1305     defined $child or die "no fork: $!";
1306     if ($child == 0) {
1307       print $stdin or die $!;
1308       close STDOUT or die $!;
1309       exit 0;
1310     }
1311   }
1312
1313   return system (@$args) if $opts->{fork};
1314
1315   exec @$args;
1316   warn "ENV size is ".length(join(" ",%ENV));
1317   die "exec failed: $!: @$args";
1318 }
1319
1320
1321 sub ban_node_by_slot {
1322   # Don't start any new jobsteps on this node for 60 seconds
1323   my $slotid = shift;
1324   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1325   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1326 }
1327
1328 __DATA__
1329 #!/usr/bin/perl
1330
1331 # checkout-and-build
1332
1333 use Fcntl ':flock';
1334
1335 my $destdir = $ENV{"CRUNCH_SRC"};
1336 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1337 my $repo = $ENV{"CRUNCH_SRC_URL"};
1338
1339 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1340 flock L, LOCK_EX;
1341 if (readlink ("$destdir.commit") eq $commit) {
1342     exit 0;
1343 }
1344
1345 open STDOUT, ">", "$destdir.log";
1346 open STDERR, ">&STDOUT";
1347
1348 if (-d "$destdir/.git") {
1349     chdir $destdir or die "chdir $destdir: $!";
1350     if (0 != system (qw(git remote set-url origin), $repo)) {
1351         # awful... for old versions of git that don't know "remote set-url"
1352         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1353     }
1354 }
1355 elsif ($repo && $commit)
1356 {
1357     shell_or_die('git', 'clone', $repo, $destdir);
1358     chdir $destdir or die "chdir $destdir: $!";
1359     shell_or_die(qw(git config clean.requireForce false));
1360 }
1361 else {
1362     die "$destdir does not exist, and no repo/commit specified -- giving up";
1363 }
1364
1365 if ($commit) {
1366     unlink "$destdir.commit";
1367     shell_or_die (qw(git stash));
1368     shell_or_die (qw(git clean -d -x));
1369     shell_or_die (qw(git fetch origin));
1370     shell_or_die (qw(git checkout), $commit);
1371 }
1372
1373 my $pwd;
1374 chomp ($pwd = `pwd`);
1375 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1376 mkdir $install_dir;
1377 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1378     # Old version
1379     shell_or_die ("./tests/autotests.sh", $install_dir);
1380 } elsif (-e "./install.sh") {
1381     shell_or_die ("./install.sh", $install_dir);
1382 }
1383
1384 if ($commit) {
1385     unlink "$destdir.commit.new";
1386     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1387     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1388 }
1389
1390 close L;
1391
1392 exit 0;
1393
1394 sub shell_or_die
1395 {
1396   if ($ENV{"DEBUG"}) {
1397     print STDERR "@_\n";
1398   }
1399   system (@_) == 0
1400       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1401 }