run job tasks in qsequence order in first pass
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
63
64 my $force_unlock;
65 my $jobspec;
66 my $resume_stash;
67 GetOptions('force-unlock' => \$force_unlock,
68            'job=s' => \$jobspec,
69            'resume-stash=s' => \$resume_stash,
70     );
71
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
74
75
76 $SIG{'HUP'} = sub
77 {
78   1;
79 };
80 $SIG{'USR1'} = sub
81 {
82   $main::ENV{CRUNCH_DEBUG} = 1;
83 };
84 $SIG{'USR2'} = sub
85 {
86   $main::ENV{CRUNCH_DEBUG} = 0;
87 };
88
89
90
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
93 $metastream->clear;
94 $metastream->write_start('log.txt');
95
96 my $User = {};
97 my $Job = {};
98 my $job_id;
99 my $dbh;
100 my $sth;
101 if ($job_has_uuid)
102 {
103   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104   $User = $arv->{'users'}->{'current'}->execute;
105   if (!$force_unlock) {
106     if ($Job->{'is_locked_by'}) {
107       croak("Job is locked: " . $Job->{'is_locked_by'});
108     }
109     if ($Job->{'success'} ne undef) {
110       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
111     }
112     if ($Job->{'running'}) {
113       croak("Job 'running' flag is already set");
114     }
115     if ($Job->{'started_at'}) {
116       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
117     }
118   }
119 }
120 else
121 {
122   $Job = JSON::decode_json($jobspec);
123
124   if (!$resume_stash)
125   {
126     map { croak ("No $_ specified") unless $Job->{$_} }
127     qw(script script_version script_parameters);
128   }
129
130   if (!defined $Job->{'uuid'}) {
131     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
132   }
133 }
134 $job_id = $Job->{'uuid'};
135
136
137
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
141
142
143 Log (undef, "check slurm allocation");
144 my @slot;
145 my @node;
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
147 my @sinfo;
148 if (!$have_slurm)
149 {
150   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151   push @sinfo, "$localcpus localhost";
152 }
153 if (exists $ENV{SLURM_NODELIST})
154 {
155   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
156 }
157 foreach (@sinfo)
158 {
159   my ($ncpus, $slurm_nodelist) = split;
160   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
161
162   my @nodelist;
163   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
164   {
165     my $nodelist = $1;
166     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
167     {
168       my $ranges = $1;
169       foreach (split (",", $ranges))
170       {
171         my ($a, $b);
172         if (/(\d+)-(\d+)/)
173         {
174           $a = $1;
175           $b = $2;
176         }
177         else
178         {
179           $a = $_;
180           $b = $_;
181         }
182         push @nodelist, map {
183           my $n = $nodelist;
184           $n =~ s/\[[-,\d]+\]/$_/;
185           $n;
186         } ($a..$b);
187       }
188     }
189     else
190     {
191       push @nodelist, $nodelist;
192     }
193   }
194   foreach my $nodename (@nodelist)
195   {
196     Log (undef, "node $nodename - $ncpus slots");
197     my $node = { name => $nodename,
198                  ncpus => $ncpus,
199                  losing_streak => 0,
200                  hold_until => 0 };
201     foreach my $cpu (1..$ncpus)
202     {
203       push @slot, { node => $node,
204                     cpu => $cpu };
205     }
206   }
207   push @node, @nodelist;
208 }
209
210
211
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
214
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
216
217
218
219 my $jobmanager_id;
220 if ($job_has_uuid)
221 {
222   # Claim this job, and make sure nobody else does
223
224   $Job->{'is_locked_by'} = $User->{'uuid'};
225   $Job->{'started_at'} = time;
226   $Job->{'running'} = 1;
227   $Job->{'success'} = undef;
228   $Job->{'tasks_summary'} = { 'failed' => 0,
229                               'todo' => 1,
230                               'running' => 0,
231                               'done' => 0 };
232   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233     croak("Error while updating / locking job");
234   }
235 }
236
237
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
249
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
253
254
255 my @jobstep;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
260 my $squeue_checked;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
263
264
265
266 if (defined $Job->{thawedfromkey})
267 {
268   thaw ($Job->{thawedfromkey});
269 }
270 else
271 {
272   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273     'job_uuid' => $Job->{'uuid'},
274     'sequence' => 0,
275     'qsequence' => 0,
276     'parameters' => {},
277                                                           });
278   push @jobstep, { level => 0,
279                    attempts => 0,
280                    arvados_task => $first_task,
281                  };
282   push @jobstep_todo, 0;
283 }
284
285
286 my $build_script;
287 do {
288   local $/ = undef;
289   $build_script = <DATA>;
290 };
291
292
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
294
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
296 if ($skip_install)
297 {
298   $ENV{"CRUNCH_SRC"} = $Job->{revision};
299 }
300 else
301 {
302   Log (undef, "Install revision ".$Job->{revision});
303   my $nodelist = join(",", @node);
304
305   # Clean out crunch_tmp/work and crunch_tmp/opt
306
307   my $cleanpid = fork();
308   if ($cleanpid == 0)
309   {
310     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
312     exit (1);
313   }
314   while (1)
315   {
316     last if $cleanpid == waitpid (-1, WNOHANG);
317     freeze_if_want_freeze ($cleanpid);
318     select (undef, undef, undef, 0.1);
319   }
320   Log (undef, "Clean-work-dir exited $?");
321
322   # Install requested code version
323
324   my @execargs;
325   my @srunargs = ("srun",
326                   "--nodelist=$nodelist",
327                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
328
329   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
332
333   my $commit;
334   my $treeish = $Job->{'script_version'};
335   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336   # Todo: let script_version specify alternate repo
337   $ENV{"CRUNCH_SRC_URL"} = $repo;
338
339   # Create/update our clone of the remote git repo
340
341   if (!-d $ENV{"CRUNCH_SRC"}) {
342     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343         or croak ("git clone $repo failed: exit ".($?>>8));
344     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
345   }
346   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
347
348   # If this looks like a subversion r#, look for it in git-svn commit messages
349
350   if ($treeish =~ m{^\d{1,4}$}) {
351     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
352     chomp $gitlog;
353     if ($gitlog =~ /^[a-f0-9]{40}$/) {
354       $commit = $gitlog;
355       Log (undef, "Using commit $commit for revision $treeish");
356     }
357   }
358
359   # If that didn't work, try asking git to look it up as a tree-ish.
360
361   if (!defined $commit) {
362
363     my $cooked_treeish = $treeish;
364     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365       # Looks like a git branch name -- make sure git knows it's
366       # relative to the remote repo
367       $cooked_treeish = "origin/$treeish";
368     }
369
370     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
371     chomp $found;
372     if ($found =~ /^[0-9a-f]{40}$/s) {
373       $commit = $found;
374       if ($commit ne $treeish) {
375         # Make sure we record the real commit id in the database,
376         # frozentokey, logs, etc. -- instead of an abbreviation or a
377         # branch name which can become ambiguous or point to a
378         # different commit in the future.
379         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380         Log (undef, "Using commit $commit for tree-ish $treeish");
381         if ($commit ne $treeish) {
382           $Job->{'script_version'} = $commit;
383           $Job->save() or croak("Error while updating job");
384         }
385       }
386     }
387   }
388
389   if (defined $commit) {
390     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391     @execargs = ("sh", "-c",
392                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
393   }
394   else {
395     croak ("could not figure out commit id for $treeish");
396   }
397
398   my $installpid = fork();
399   if ($installpid == 0)
400   {
401     srun (\@srunargs, \@execargs, {}, $build_script);
402     exit (1);
403   }
404   while (1)
405   {
406     last if $installpid == waitpid (-1, WNOHANG);
407     freeze_if_want_freeze ($installpid);
408     select (undef, undef, undef, 0.1);
409   }
410   Log (undef, "Install exited $?");
411 }
412
413
414
415 foreach (qw (script script_version script_parameters resource_limits))
416 {
417   Log (undef, $_ . " " . $Job->{$_});
418 }
419 foreach (split (/\n/, $Job->{knobs}))
420 {
421   Log (undef, "knob " . $_);
422 }
423
424
425
426 my $success;
427
428
429
430 ONELEVEL:
431
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
435
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437                        or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
440
441
442
443 my %proc;
444 my @freeslot = (0..$#slot);
445 my @holdslot;
446 my %reader;
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
449
450 update_progress_stats();
451
452
453
454 THISROUND:
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
456 {
457   $main::please_continue = 0;
458
459   my $id = $jobstep_todo[$todo_ptr];
460   my $Jobstep = $jobstep[$id];
461   if ($Jobstep->{level} != $level)
462   {
463     next;
464   }
465   if ($Jobstep->{attempts} > 9)
466   {
467     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
468     $success = 0;
469     last THISROUND;
470   }
471
472   pipe $reader{$id}, "writer" or croak ($!);
473   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
475
476   my $childslot = $freeslot[0];
477   my $childnode = $slot[$childslot]->{node};
478   my $childslotname = join (".",
479                             $slot[$childslot]->{node}->{name},
480                             $slot[$childslot]->{cpu});
481   my $childpid = fork();
482   if ($childpid == 0)
483   {
484     $SIG{'INT'} = 'DEFAULT';
485     $SIG{'QUIT'} = 'DEFAULT';
486     $SIG{'TERM'} = 'DEFAULT';
487
488     foreach (values (%reader))
489     {
490       close($_);
491     }
492     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493     open(STDOUT,">&writer");
494     open(STDERR,">&writer");
495
496     undef $dbh;
497     undef $sth;
498
499     delete $ENV{"GNUPGHOME"};
500     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501     $ENV{"TASK_QSEQUENCE"} = $id;
502     $ENV{"TASK_SEQUENCE"} = $level;
503     $ENV{"JOB_SCRIPT"} = $Job->{script};
504     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505       $param =~ tr/a-z/A-Z/;
506       $ENV{"JOB_PARAMETER_$param"} = $value;
507     }
508     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
512
513     $ENV{"GZIP"} = "-n";
514
515     my @srunargs = (
516       "srun",
517       "--nodelist=".$childnode->{name},
518       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519       "--job-name=$job_id.$id.$$",
520         );
521     my @execargs = qw(sh);
522     my $build_script_to_send = "";
523     my $command =
524         "mkdir -p $ENV{CRUNCH_TMP}/revision "
525         ."&& cd $ENV{CRUNCH_TMP} ";
526     if ($build_script)
527     {
528       $build_script_to_send = $build_script;
529       $command .=
530           "&& perl -";
531     }
532     elsif (!$skip_install)
533     {
534       $command .=
535           "&& "
536           ."( "
537           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
538           ."|| "
539           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540           ."    && ./installrevision "
541           ."  ) "
542           .") ";
543     }
544     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
545     $command .=
546         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547     my @execargs = ('bash', '-c', $command);
548     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
549     exit (1);
550   }
551   close("writer");
552   if (!defined $childpid)
553   {
554     close $reader{$id};
555     delete $reader{$id};
556     next;
557   }
558   shift @freeslot;
559   $proc{$childpid} = { jobstep => $id,
560                        time => time,
561                        slot => $childslot,
562                        jobstepname => "$job_id.$id.$childpid",
563                      };
564   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565   $slot[$childslot]->{pid} = $childpid;
566
567   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568   Log ($id, "child $childpid started on $childslotname");
569   $Jobstep->{attempts} ++;
570   $Jobstep->{starttime} = time;
571   $Jobstep->{node} = $childnode->{name};
572   $Jobstep->{slotindex} = $childslot;
573   delete $Jobstep->{stderr};
574   delete $Jobstep->{finishtime};
575
576   splice @jobstep_todo, $todo_ptr, 1;
577   --$todo_ptr;
578
579   $progress_is_dirty = 1;
580
581   while (!@freeslot
582          ||
583          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
584   {
585     last THISROUND if $main::please_freeze;
586     if ($main::please_info)
587     {
588       $main::please_info = 0;
589       freeze();
590       collate_output();
591       save_meta(1);
592       update_progress_stats();
593     }
594     my $gotsome
595         = readfrompipes ()
596         + reapchildren ();
597     if (!$gotsome)
598     {
599       check_squeue();
600       update_progress_stats();
601       select (undef, undef, undef, 0.1);
602     }
603     elsif (time - $progress_stats_updated >= 30)
604     {
605       update_progress_stats();
606     }
607     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
609     {
610       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611           .($thisround_failed+$thisround_succeeded)
612           .") -- giving up on this round";
613       Log (undef, $message);
614       last THISROUND;
615     }
616
617     # move slots from freeslot to holdslot (or back to freeslot) if necessary
618     for (my $i=$#freeslot; $i>=0; $i--) {
619       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620         push @holdslot, (splice @freeslot, $i, 1);
621       }
622     }
623     for (my $i=$#holdslot; $i>=0; $i--) {
624       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625         push @freeslot, (splice @holdslot, $i, 1);
626       }
627     }
628
629     # give up if no nodes are succeeding
630     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631       my $message = "Every node has failed -- giving up on this round";
632       Log (undef, $message);
633       last THISROUND;
634     }
635   }
636 }
637
638
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
641
642
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
644 while (%proc)
645 {
646   goto THISROUND if $main::please_continue;
647   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
648   readfrompipes ();
649   if (!reapchildren())
650   {
651     check_squeue();
652     update_progress_stats();
653     select (undef, undef, undef, 0.1);
654     killem (keys %proc) if $main::please_freeze;
655   }
656 }
657
658 update_progress_stats();
659 freeze_if_want_freeze();
660
661
662 if (!defined $success)
663 {
664   if (@jobstep_todo &&
665       $thisround_succeeded == 0 &&
666       ($thisround_failed == 0 || $thisround_failed > 4))
667   {
668     my $message = "stop because $thisround_failed tasks failed and none succeeded";
669     Log (undef, $message);
670     $success = 0;
671   }
672   if (!@jobstep_todo)
673   {
674     $success = 1;
675   }
676 }
677
678 goto ONELEVEL if !defined $success;
679
680
681 release_allocation();
682 freeze();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = 0 if !$Job->{'output'};
685 $Job->save;
686
687 if ($Job->{'output'})
688 {
689   $arv->{'collections'}->{'create'}->execute('collection' => {
690     'uuid' => $Job->{'output'},
691     'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
692                                              });;
693 }
694
695
696 Log (undef, "finish");
697
698 $Job->{'success'} = $Job->{'output'} && $success;
699 $Job->save;
700
701 save_meta();
702 exit 0;
703
704
705
706 sub update_progress_stats
707 {
708   $progress_stats_updated = time;
709   return if !$progress_is_dirty;
710   my ($todo, $done, $running) = (scalar @jobstep_todo,
711                                  scalar @jobstep_done,
712                                  scalar @slot - scalar @freeslot - scalar @holdslot);
713   $Job->{'tasks_summary'} ||= {};
714   $Job->{'tasks_summary'}->{'todo'} = $todo;
715   $Job->{'tasks_summary'}->{'done'} = $done;
716   $Job->{'tasks_summary'}->{'running'} = $running;
717   $Job->save;
718   Log (undef, "status: $done done, $running running, $todo todo");
719   $progress_is_dirty = 0;
720 }
721
722
723
724 sub reapchildren
725 {
726   my $pid = waitpid (-1, WNOHANG);
727   return 0 if $pid <= 0;
728
729   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
730                   . "."
731                   . $slot[$proc{$pid}->{slot}]->{cpu});
732   my $jobstepid = $proc{$pid}->{jobstep};
733   my $elapsed = time - $proc{$pid}->{time};
734   my $Jobstep = $jobstep[$jobstepid];
735
736   my $exitcode = $?;
737   my $exitinfo = "exit $exitcode";
738   $Jobstep->{arvados_task}->reload;
739   my $success = $Jobstep->{arvados_task}->{success};
740
741   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
742
743   if (!defined $success) {
744     # task did not indicate one way or the other --> fail
745     $Jobstep->{arvados_task}->{success} = 0;
746     $Jobstep->{arvados_task}->save;
747     $success = 0;
748   }
749
750   if (!$success)
751   {
752     --$Jobstep->{attempts} if $Jobstep->{node_fail};
753     ++$thisround_failed;
754     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
755
756     # Check for signs of a failed or misconfigured node
757     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
758         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
759       # Don't count this against jobstep failure thresholds if this
760       # node is already suspected faulty and srun exited quickly
761       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
762           $elapsed < 5 &&
763           $Jobstep->{attempts} > 1) {
764         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
765         --$Jobstep->{attempts};
766       }
767       ban_node_by_slot($proc{$pid}->{slot});
768     }
769
770     push @jobstep_todo, $jobstepid;
771     Log ($jobstepid, "failure in $elapsed seconds");
772     $Job->{'tasks_summary'}->{'failed'}++;
773   }
774   else
775   {
776     ++$thisround_succeeded;
777     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
778     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
779     push @jobstep_done, $jobstepid;
780     Log ($jobstepid, "success in $elapsed seconds");
781   }
782   $Jobstep->{exitcode} = $exitcode;
783   $Jobstep->{finishtime} = time;
784   process_stderr ($jobstepid, $success);
785   Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
786
787   close $reader{$jobstepid};
788   delete $reader{$jobstepid};
789   delete $slot[$proc{$pid}->{slot}]->{pid};
790   push @freeslot, $proc{$pid}->{slot};
791   delete $proc{$pid};
792
793   # Load new tasks
794   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
795     'where' => {
796       'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
797     },
798     'order' => 'qsequence'
799   );
800   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
801     my $jobstep = {
802       'level' => $arvados_task->{'sequence'},
803       'attempts' => 0,
804       'arvados_task' => $arvados_task
805     };
806     push @jobstep, $jobstep;
807     push @jobstep_todo, $#jobstep;
808   }
809
810   $progress_is_dirty = 1;
811   1;
812 }
813
814
815 sub check_squeue
816 {
817   # return if the kill list was checked <4 seconds ago
818   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
819   {
820     return;
821   }
822   $squeue_kill_checked = time;
823
824   # use killem() on procs whose killtime is reached
825   for (keys %proc)
826   {
827     if (exists $proc{$_}->{killtime}
828         && $proc{$_}->{killtime} <= time)
829     {
830       killem ($_);
831     }
832   }
833
834   # return if the squeue was checked <60 seconds ago
835   if (defined $squeue_checked && $squeue_checked > time - 60)
836   {
837     return;
838   }
839   $squeue_checked = time;
840
841   if (!$have_slurm)
842   {
843     # here is an opportunity to check for mysterious problems with local procs
844     return;
845   }
846
847   # get a list of steps still running
848   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
849   chop @squeue;
850   if ($squeue[-1] ne "ok")
851   {
852     return;
853   }
854   pop @squeue;
855
856   # which of my jobsteps are running, according to squeue?
857   my %ok;
858   foreach (@squeue)
859   {
860     if (/^(\d+)\.(\d+) (\S+)/)
861     {
862       if ($1 eq $ENV{SLURM_JOBID})
863       {
864         $ok{$3} = 1;
865       }
866     }
867   }
868
869   # which of my active child procs (>60s old) were not mentioned by squeue?
870   foreach (keys %proc)
871   {
872     if ($proc{$_}->{time} < time - 60
873         && !exists $ok{$proc{$_}->{jobstepname}}
874         && !exists $proc{$_}->{killtime})
875     {
876       # kill this proc if it hasn't exited in 30 seconds
877       $proc{$_}->{killtime} = time + 30;
878     }
879   }
880 }
881
882
883 sub release_allocation
884 {
885   if ($have_slurm)
886   {
887     Log (undef, "release job allocation");
888     system "scancel $ENV{SLURM_JOBID}";
889   }
890 }
891
892
893 sub readfrompipes
894 {
895   my $gotsome = 0;
896   foreach my $job (keys %reader)
897   {
898     my $buf;
899     while (0 < sysread ($reader{$job}, $buf, 8192))
900     {
901       print STDERR $buf if $ENV{CRUNCH_DEBUG};
902       $jobstep[$job]->{stderr} .= $buf;
903       preprocess_stderr ($job);
904       if (length ($jobstep[$job]->{stderr}) > 16384)
905       {
906         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
907       }
908       $gotsome = 1;
909     }
910   }
911   return $gotsome;
912 }
913
914
915 sub preprocess_stderr
916 {
917   my $job = shift;
918
919   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
920     my $line = $1;
921     if ($line =~ /\+\+\+mr/) {
922       last;
923     }
924     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
925     Log ($job, "stderr $line");
926     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
927       # whoa.
928       $main::please_freeze = 1;
929     }
930     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
931       $jobstep[$job]->{node_fail} = 1;
932       ban_node_by_slot($jobstep[$job]->{slotindex});
933     }
934   }
935 }
936
937
938 sub process_stderr
939 {
940   my $job = shift;
941   my $success = shift;
942   preprocess_stderr ($job);
943
944   map {
945     Log ($job, "stderr $_");
946   } split ("\n", $jobstep[$job]->{stderr});
947 }
948
949
950 sub collate_output
951 {
952   my $whc = Warehouse->new;
953   Log (undef, "collate");
954   $whc->write_start (1);
955   my $joboutput;
956   for (@jobstep)
957   {
958     next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
959     my $output = $_->{arvados_task}->{output};
960     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
961     {
962       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
963       $whc->write_data ($output);
964     }
965     elsif (@jobstep == 1)
966     {
967       $joboutput = $output;
968       $whc->write_finish;
969     }
970     elsif (defined (my $outblock = $whc->fetch_block ($output)))
971     {
972       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
973       $whc->write_data ($outblock);
974     }
975     else
976     {
977       my $errstr = $whc->errstr;
978       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
979       $success = 0;
980     }
981   }
982   $joboutput = $whc->write_finish if !defined $joboutput;
983   if ($joboutput)
984   {
985     Log (undef, "output $joboutput");
986     $Job->{'output'} = $joboutput;
987     $Job->save;
988   }
989   else
990   {
991     Log (undef, "output undef");
992   }
993   return $joboutput;
994 }
995
996
997 sub killem
998 {
999   foreach (@_)
1000   {
1001     my $sig = 2;                # SIGINT first
1002     if (exists $proc{$_}->{"sent_$sig"} &&
1003         time - $proc{$_}->{"sent_$sig"} > 4)
1004     {
1005       $sig = 15;                # SIGTERM if SIGINT doesn't work
1006     }
1007     if (exists $proc{$_}->{"sent_$sig"} &&
1008         time - $proc{$_}->{"sent_$sig"} > 4)
1009     {
1010       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1011     }
1012     if (!exists $proc{$_}->{"sent_$sig"})
1013     {
1014       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1015       kill $sig, $_;
1016       select (undef, undef, undef, 0.1);
1017       if ($sig == 2)
1018       {
1019         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1020       }
1021       $proc{$_}->{"sent_$sig"} = time;
1022       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1023     }
1024   }
1025 }
1026
1027
1028 sub fhbits
1029 {
1030   my($bits);
1031   for (@_) {
1032     vec($bits,fileno($_),1) = 1;
1033   }
1034   $bits;
1035 }
1036
1037
1038 sub Log                         # ($jobstep_id, $logmessage)
1039 {
1040   if ($_[1] =~ /\n/) {
1041     for my $line (split (/\n/, $_[1])) {
1042       Log ($_[0], $line);
1043     }
1044     return;
1045   }
1046   my $fh = select STDERR; $|=1; select $fh;
1047   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1048   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1049   $message .= "\n";
1050   my $datetime;
1051   if ($metastream || -t STDERR) {
1052     my @gmtime = gmtime;
1053     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1054                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1055   }
1056   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1057
1058   return if !$metastream;
1059   $metastream->write_data ($datetime . " " . $message);
1060 }
1061
1062
1063 sub reconnect_database
1064 {
1065   return if !$job_has_uuid;
1066   return if ($dbh && $dbh->do ("select now()"));
1067   for (1..16)
1068   {
1069     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1070     if ($dbh) {
1071       $dbh->{InactiveDestroy} = 1;
1072       return;
1073     }
1074     warn ($DBI::errstr);
1075     sleep $_;
1076   }
1077   croak ($DBI::errstr) if !$dbh;
1078 }
1079
1080
1081 sub dbh_do
1082 {
1083   return 1 if !$job_has_uuid;
1084   my $ret = $dbh->do (@_);
1085   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1086   reconnect_database();
1087   return $dbh->do (@_);
1088 }
1089
1090
1091 sub croak
1092 {
1093   my ($package, $file, $line) = caller;
1094   my $message = "@_ at $file line $line\n";
1095   Log (undef, $message);
1096   freeze() if @jobstep_todo;
1097   collate_output() if @jobstep_todo;
1098   cleanup();
1099   save_meta() if $metastream;
1100   die;
1101 }
1102
1103
1104 sub cleanup
1105 {
1106   return if !$job_has_uuid;
1107   $Job->reload;
1108   $Job->{'running'} = 0;
1109   $Job->{'success'} = 0;
1110   $Job->{'finished_at'} = time;
1111   $Job->save;
1112 }
1113
1114
1115 sub save_meta
1116 {
1117   my $justcheckpoint = shift; # false if this will be the last meta saved
1118   my $m = $metastream;
1119   $m = $m->copy if $justcheckpoint;
1120   $m->write_finish;
1121   my $loglocator = $m->as_key;
1122   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1123   Log (undef, "meta key is $loglocator");
1124   $Job->{'log'} = $loglocator;
1125   $Job->save;
1126 }
1127
1128
1129 sub freeze_if_want_freeze
1130 {
1131   if ($main::please_freeze)
1132   {
1133     release_allocation();
1134     if (@_)
1135     {
1136       # kill some srun procs before freeze+stop
1137       map { $proc{$_} = {} } @_;
1138       while (%proc)
1139       {
1140         killem (keys %proc);
1141         select (undef, undef, undef, 0.1);
1142         my $died;
1143         while (($died = waitpid (-1, WNOHANG)) > 0)
1144         {
1145           delete $proc{$died};
1146         }
1147       }
1148     }
1149     freeze();
1150     collate_output();
1151     cleanup();
1152     save_meta();
1153     exit 0;
1154   }
1155 }
1156
1157
1158 sub freeze
1159 {
1160   Log (undef, "Freeze not implemented");
1161   return;
1162
1163   my $whc;                      # todo
1164   Log (undef, "freeze");
1165
1166   my $freezer = new Warehouse::Stream (whc => $whc);
1167   $freezer->clear;
1168   $freezer->name (".");
1169   $freezer->write_start ("state.txt");
1170
1171   $freezer->write_data (join ("\n",
1172                               "job $Job->{uuid}",
1173                               map
1174                               {
1175                                 $_ . "=" . freezequote($Job->{$_})
1176                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1177
1178   foreach my $Jobstep (@jobstep)
1179   {
1180     my $str = join ("\n",
1181                     map
1182                     {
1183                       $_ . "=" . freezequote ($Jobstep->{$_})
1184                     } grep {
1185                       $_ !~ /^stderr|slotindex|node_fail/
1186                     } keys %$Jobstep);
1187     $freezer->write_data ($str."\n\n");
1188   }
1189   if (@jobstep_tomerge)
1190   {
1191     $freezer->write_data
1192         ("merge $jobstep_tomerge_level "
1193          . freezequote (join ("\n",
1194                               map { freezequote ($_) } @jobstep_tomerge))
1195          . "\n\n");
1196   }
1197
1198   $freezer->write_finish;
1199   my $frozentokey = $freezer->as_key;
1200   undef $freezer;
1201   Log (undef, "frozento key is $frozentokey");
1202   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1203           $frozentokey, $job_id);
1204   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1205   Log (undef, "frozento+K key is $kfrozentokey");
1206   return $frozentokey;
1207 }
1208
1209
1210 sub thaw
1211 {
1212   croak ("Thaw not implemented");
1213
1214   my $whc;
1215   my $key = shift;
1216   Log (undef, "thaw from $key");
1217
1218   @jobstep = ();
1219   @jobstep_done = ();
1220   @jobstep_todo = ();
1221   @jobstep_tomerge = ();
1222   $jobstep_tomerge_level = 0;
1223   my $frozenjob = {};
1224
1225   my $stream = new Warehouse::Stream ( whc => $whc,
1226                                        hash => [split (",", $key)] );
1227   $stream->rewind;
1228   while (my $dataref = $stream->read_until (undef, "\n\n"))
1229   {
1230     if ($$dataref =~ /^job /)
1231     {
1232       foreach (split ("\n", $$dataref))
1233       {
1234         my ($k, $v) = split ("=", $_, 2);
1235         $frozenjob->{$k} = freezeunquote ($v);
1236       }
1237       next;
1238     }
1239
1240     if ($$dataref =~ /^merge (\d+) (.*)/)
1241     {
1242       $jobstep_tomerge_level = $1;
1243       @jobstep_tomerge
1244           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1245       next;
1246     }
1247
1248     my $Jobstep = { };
1249     foreach (split ("\n", $$dataref))
1250     {
1251       my ($k, $v) = split ("=", $_, 2);
1252       $Jobstep->{$k} = freezeunquote ($v) if $k;
1253     }
1254     $Jobstep->{attempts} = 0;
1255     push @jobstep, $Jobstep;
1256
1257     if ($Jobstep->{exitcode} eq "0")
1258     {
1259       push @jobstep_done, $#jobstep;
1260     }
1261     else
1262     {
1263       push @jobstep_todo, $#jobstep;
1264     }
1265   }
1266
1267   foreach (qw (script script_version script_parameters))
1268   {
1269     $Job->{$_} = $frozenjob->{$_};
1270   }
1271   $Job->save;
1272 }
1273
1274
1275 sub freezequote
1276 {
1277   my $s = shift;
1278   $s =~ s/\\/\\\\/g;
1279   $s =~ s/\n/\\n/g;
1280   return $s;
1281 }
1282
1283
1284 sub freezeunquote
1285 {
1286   my $s = shift;
1287   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1288   return $s;
1289 }
1290
1291
1292 sub srun
1293 {
1294   my $srunargs = shift;
1295   my $execargs = shift;
1296   my $opts = shift || {};
1297   my $stdin = shift;
1298   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1299   print STDERR (join (" ",
1300                       map { / / ? "'$_'" : $_ }
1301                       (@$args)),
1302                 "\n")
1303       if $ENV{CRUNCH_DEBUG};
1304
1305   if (defined $stdin) {
1306     my $child = open STDIN, "-|";
1307     defined $child or die "no fork: $!";
1308     if ($child == 0) {
1309       print $stdin or die $!;
1310       close STDOUT or die $!;
1311       exit 0;
1312     }
1313   }
1314
1315   return system (@$args) if $opts->{fork};
1316
1317   exec @$args;
1318   warn "ENV size is ".length(join(" ",%ENV));
1319   die "exec failed: $!: @$args";
1320 }
1321
1322
1323 sub ban_node_by_slot {
1324   # Don't start any new jobsteps on this node for 60 seconds
1325   my $slotid = shift;
1326   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1327   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1328 }
1329
1330 __DATA__
1331 #!/usr/bin/perl
1332
1333 # checkout-and-build
1334
1335 use Fcntl ':flock';
1336
1337 my $destdir = $ENV{"CRUNCH_SRC"};
1338 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1339 my $repo = $ENV{"CRUNCH_SRC_URL"};
1340
1341 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1342 flock L, LOCK_EX;
1343 if (readlink ("$destdir.commit") eq $commit) {
1344     exit 0;
1345 }
1346
1347 open STDOUT, ">", "$destdir.log";
1348 open STDERR, ">&STDOUT";
1349
1350 if (-d "$destdir/.git") {
1351     chdir $destdir or die "chdir $destdir: $!";
1352     if (0 != system (qw(git remote set-url origin), $repo)) {
1353         # awful... for old versions of git that don't know "remote set-url"
1354         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1355     }
1356 }
1357 elsif ($repo && $commit)
1358 {
1359     shell_or_die('git', 'clone', $repo, $destdir);
1360     chdir $destdir or die "chdir $destdir: $!";
1361     shell_or_die(qw(git config clean.requireForce false));
1362 }
1363 else {
1364     die "$destdir does not exist, and no repo/commit specified -- giving up";
1365 }
1366
1367 if ($commit) {
1368     unlink "$destdir.commit";
1369     shell_or_die (qw(git stash));
1370     shell_or_die (qw(git clean -d -x));
1371     shell_or_die (qw(git fetch origin));
1372     shell_or_die (qw(git checkout), $commit);
1373 }
1374
1375 my $pwd;
1376 chomp ($pwd = `pwd`);
1377 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1378 mkdir $install_dir;
1379 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1380     # Old version
1381     shell_or_die ("./tests/autotests.sh", $install_dir);
1382 } elsif (-e "./install.sh") {
1383     shell_or_die ("./install.sh", $install_dir);
1384 }
1385
1386 if ($commit) {
1387     unlink "$destdir.commit.new";
1388     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1389     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1390 }
1391
1392 close L;
1393
1394 exit 0;
1395
1396 sub shell_or_die
1397 {
1398   if ($ENV{"DEBUG"}) {
1399     print STDERR "@_\n";
1400   }
1401   system (@_) == 0
1402       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1403 }