crunch-job fixes
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
63
64 my $force_unlock;
65 my $jobspec;
66 my $resume_stash;
67 GetOptions('force-unlock' => \$force_unlock,
68            'job=s' => \$jobspec,
69            'resume-stash=s' => \$resume_stash,
70     );
71
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
74
75
76 $SIG{'HUP'} = sub
77 {
78   1;
79 };
80 $SIG{'USR1'} = sub
81 {
82   $main::ENV{CRUNCH_DEBUG} = 1;
83 };
84 $SIG{'USR2'} = sub
85 {
86   $main::ENV{CRUNCH_DEBUG} = 0;
87 };
88
89
90
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
93 $metastream->clear;
94 $metastream->write_start('log.txt');
95
96 my $User = {};
97 my $Job = {};
98 my $job_id;
99 my $dbh;
100 my $sth;
101 if ($job_has_uuid)
102 {
103   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104   $User = $arv->{'users'}->{'current'}->execute;
105   if (!$force_unlock) {
106     if ($Job->{'is_locked_by'}) {
107       croak("Job is locked: " . $Job->{'is_locked_by'});
108     }
109     if ($Job->{'success'} ne undef) {
110       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
111     }
112     if ($Job->{'running'}) {
113       croak("Job 'running' flag is already set");
114     }
115     if ($Job->{'started_at'}) {
116       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
117     }
118   }
119 }
120 else
121 {
122   $Job = JSON::decode_json($jobspec);
123
124   if (!$resume_stash)
125   {
126     map { croak ("No $_ specified") unless $Job->{$_} }
127     qw(script script_version script_parameters);
128   }
129
130   if (!defined $Job->{'uuid'}) {
131     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
132   }
133 }
134 $job_id = $Job->{'uuid'};
135
136
137
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
141
142
143 Log (undef, "check slurm allocation");
144 my @slot;
145 my @node;
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
147 my @sinfo;
148 if (!$have_slurm)
149 {
150   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151   push @sinfo, "$localcpus localhost";
152 }
153 if (exists $ENV{SLURM_NODELIST})
154 {
155   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
156 }
157 foreach (@sinfo)
158 {
159   my ($ncpus, $slurm_nodelist) = split;
160   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
161
162   my @nodelist;
163   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
164   {
165     my $nodelist = $1;
166     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
167     {
168       my $ranges = $1;
169       foreach (split (",", $ranges))
170       {
171         my ($a, $b);
172         if (/(\d+)-(\d+)/)
173         {
174           $a = $1;
175           $b = $2;
176         }
177         else
178         {
179           $a = $_;
180           $b = $_;
181         }
182         push @nodelist, map {
183           my $n = $nodelist;
184           $n =~ s/\[[-,\d]+\]/$_/;
185           $n;
186         } ($a..$b);
187       }
188     }
189     else
190     {
191       push @nodelist, $nodelist;
192     }
193   }
194   foreach my $nodename (@nodelist)
195   {
196     Log (undef, "node $nodename - $ncpus slots");
197     my $node = { name => $nodename,
198                  ncpus => $ncpus,
199                  losing_streak => 0,
200                  hold_until => 0 };
201     foreach my $cpu (1..$ncpus)
202     {
203       push @slot, { node => $node,
204                     cpu => $cpu };
205     }
206   }
207   push @node, @nodelist;
208 }
209
210
211
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
214
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
216
217
218
219 my $jobmanager_id;
220 if ($job_has_uuid)
221 {
222   # Claim this job, and make sure nobody else does
223
224   $Job->{'is_locked_by'} = $User->{'uuid'};
225   $Job->{'started_at'} = time;
226   $Job->{'running'} = 1;
227   $Job->{'success'} = undef;
228   $Job->{'tasks_summary'} = { 'failed' => 0,
229                               'todo' => 1,
230                               'running' => 0,
231                               'done' => 0 };
232   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233     croak("Error while updating / locking job");
234   }
235 }
236
237
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
249
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
253
254
255 my @jobstep;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
260 my $squeue_checked;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
263
264
265
266 if (defined $Job->{thawedfromkey})
267 {
268   thaw ($Job->{thawedfromkey});
269 }
270 else
271 {
272   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273     'job_uuid' => $Job->{'uuid'},
274     'sequence' => 0,
275     'qsequence' => 0,
276     'parameters' => {},
277                                                           });
278   push @jobstep, { level => 0,
279                    attempts => 0,
280                    arvados_task => $first_task,
281                  };
282   push @jobstep_todo, 0;
283 }
284
285
286 my $build_script;
287 do {
288   local $/ = undef;
289   $build_script = <DATA>;
290 };
291
292
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
294
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
296 if ($skip_install)
297 {
298   $ENV{"CRUNCH_SRC"} = $Job->{revision};
299 }
300 else
301 {
302   Log (undef, "Install revision ".$Job->{revision});
303   my $nodelist = join(",", @node);
304
305   # Clean out crunch_tmp/work and crunch_tmp/opt
306
307   my $cleanpid = fork();
308   if ($cleanpid == 0)
309   {
310     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
312     exit (1);
313   }
314   while (1)
315   {
316     last if $cleanpid == waitpid (-1, WNOHANG);
317     freeze_if_want_freeze ($cleanpid);
318     select (undef, undef, undef, 0.1);
319   }
320   Log (undef, "Clean-work-dir exited $?");
321
322   # Install requested code version
323
324   my @execargs;
325   my @srunargs = ("srun",
326                   "--nodelist=$nodelist",
327                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
328
329   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
332
333   my $commit;
334   my $treeish = $Job->{'script_version'};
335   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336   # Todo: let script_version specify alternate repo
337   $ENV{"CRUNCH_SRC_URL"} = $repo;
338
339   # Create/update our clone of the remote git repo
340
341   if (!-d $ENV{"CRUNCH_SRC"}) {
342     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343         or croak ("git clone $repo failed: exit ".($?>>8));
344     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
345   }
346   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
347
348   # If this looks like a subversion r#, look for it in git-svn commit messages
349
350   if ($treeish =~ m{^\d{1,4}$}) {
351     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
352     chomp $gitlog;
353     if ($gitlog =~ /^[a-f0-9]{40}$/) {
354       $commit = $gitlog;
355       Log (undef, "Using commit $commit for revision $treeish");
356     }
357   }
358
359   # If that didn't work, try asking git to look it up as a tree-ish.
360
361   if (!defined $commit) {
362
363     my $cooked_treeish = $treeish;
364     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365       # Looks like a git branch name -- make sure git knows it's
366       # relative to the remote repo
367       $cooked_treeish = "origin/$treeish";
368     }
369
370     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
371     chomp $found;
372     if ($found =~ /^[0-9a-f]{40}$/s) {
373       $commit = $found;
374       if ($commit ne $treeish) {
375         # Make sure we record the real commit id in the database,
376         # frozentokey, logs, etc. -- instead of an abbreviation or a
377         # branch name which can become ambiguous or point to a
378         # different commit in the future.
379         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380         Log (undef, "Using commit $commit for tree-ish $treeish");
381         if ($commit ne $treeish) {
382           $Job->{'script_version'} = $commit;
383           $Job->save() or croak("Error while updating job");
384         }
385       }
386     }
387   }
388
389   if (defined $commit) {
390     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391     @execargs = ("sh", "-c",
392                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
393   }
394   else {
395     croak ("could not figure out commit id for $treeish");
396   }
397
398   my $installpid = fork();
399   if ($installpid == 0)
400   {
401     srun (\@srunargs, \@execargs, {}, $build_script);
402     exit (1);
403   }
404   while (1)
405   {
406     last if $installpid == waitpid (-1, WNOHANG);
407     freeze_if_want_freeze ($installpid);
408     select (undef, undef, undef, 0.1);
409   }
410   Log (undef, "Install exited $?");
411 }
412
413
414
415 foreach (qw (script script_version script_parameters resource_limits))
416 {
417   Log (undef, $_ . " " . $Job->{$_});
418 }
419 foreach (split (/\n/, $Job->{knobs}))
420 {
421   Log (undef, "knob " . $_);
422 }
423
424
425
426 my $success;
427
428
429
430 ONELEVEL:
431
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
435
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437                        or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
440
441
442
443 my %proc;
444 my @freeslot = (0..$#slot);
445 my @holdslot;
446 my %reader;
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
449
450 update_progress_stats();
451
452
453
454 THISROUND:
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
456 {
457   $main::please_continue = 0;
458
459   my $id = $jobstep_todo[$todo_ptr];
460   my $Jobstep = $jobstep[$id];
461   if ($Jobstep->{level} != $level)
462   {
463     next;
464   }
465   if ($Jobstep->{attempts} > 9)
466   {
467     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
468     $success = 0;
469     last THISROUND;
470   }
471
472   pipe $reader{$id}, "writer" or croak ($!);
473   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
475
476   my $childslot = $freeslot[0];
477   my $childnode = $slot[$childslot]->{node};
478   my $childslotname = join (".",
479                             $slot[$childslot]->{node}->{name},
480                             $slot[$childslot]->{cpu});
481   my $childpid = fork();
482   if ($childpid == 0)
483   {
484     $SIG{'INT'} = 'DEFAULT';
485     $SIG{'QUIT'} = 'DEFAULT';
486     $SIG{'TERM'} = 'DEFAULT';
487
488     foreach (values (%reader))
489     {
490       close($_);
491     }
492     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493     open(STDOUT,">&writer");
494     open(STDERR,">&writer");
495
496     undef $dbh;
497     undef $sth;
498
499     delete $ENV{"GNUPGHOME"};
500     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501     $ENV{"TASK_QSEQUENCE"} = $id;
502     $ENV{"TASK_SEQUENCE"} = $level;
503     $ENV{"JOB_SCRIPT"} = $Job->{script};
504     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505       $param =~ tr/a-z/A-Z/;
506       $ENV{"JOB_PARAMETER_$param"} = $value;
507     }
508     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
512
513     $ENV{"GZIP"} = "-n";
514
515     my @srunargs = (
516       "srun",
517       "--nodelist=".$childnode->{name},
518       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519       "--job-name=$job_id.$id.$$",
520         );
521     my @execargs = qw(sh);
522     my $build_script_to_send = "";
523     my $command =
524         "mkdir -p $ENV{CRUNCH_TMP}/revision "
525         ."&& cd $ENV{CRUNCH_TMP} ";
526     if ($build_script)
527     {
528       $build_script_to_send = $build_script;
529       $command .=
530           "&& perl -";
531     }
532     elsif (!$skip_install)
533     {
534       $command .=
535           "&& "
536           ."( "
537           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
538           ."|| "
539           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540           ."    && ./installrevision "
541           ."  ) "
542           .") ";
543     }
544     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
545     $command .=
546         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547     my @execargs = ('bash', '-c', $command);
548     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
549     exit (1);
550   }
551   close("writer");
552   if (!defined $childpid)
553   {
554     close $reader{$id};
555     delete $reader{$id};
556     next;
557   }
558   shift @freeslot;
559   $proc{$childpid} = { jobstep => $id,
560                        time => time,
561                        slot => $childslot,
562                        jobstepname => "$job_id.$id.$childpid",
563                      };
564   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565   $slot[$childslot]->{pid} = $childpid;
566
567   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568   Log ($id, "child $childpid started on $childslotname");
569   $Jobstep->{attempts} ++;
570   $Jobstep->{starttime} = time;
571   $Jobstep->{node} = $childnode->{name};
572   $Jobstep->{slotindex} = $childslot;
573   delete $Jobstep->{stderr};
574   delete $Jobstep->{finishtime};
575
576   splice @jobstep_todo, $todo_ptr, 1;
577   --$todo_ptr;
578
579   $progress_is_dirty = 1;
580
581   while (!@freeslot
582          ||
583          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
584   {
585     last THISROUND if $main::please_freeze;
586     if ($main::please_info)
587     {
588       $main::please_info = 0;
589       freeze();
590       collate_output();
591       save_meta(1);
592       update_progress_stats();
593     }
594     my $gotsome
595         = readfrompipes ()
596         + reapchildren ();
597     if (!$gotsome)
598     {
599       check_squeue();
600       update_progress_stats();
601       select (undef, undef, undef, 0.1);
602     }
603     elsif (time - $progress_stats_updated >= 30)
604     {
605       update_progress_stats();
606     }
607     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
609     {
610       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611           .($thisround_failed+$thisround_succeeded)
612           .") -- giving up on this round";
613       Log (undef, $message);
614       last THISROUND;
615     }
616
617     # move slots from freeslot to holdslot (or back to freeslot) if necessary
618     for (my $i=$#freeslot; $i>=0; $i--) {
619       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620         push @holdslot, (splice @freeslot, $i, 1);
621       }
622     }
623     for (my $i=$#holdslot; $i>=0; $i--) {
624       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625         push @freeslot, (splice @holdslot, $i, 1);
626       }
627     }
628
629     # give up if no nodes are succeeding
630     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631       my $message = "Every node has failed -- giving up on this round";
632       Log (undef, $message);
633       last THISROUND;
634     }
635   }
636 }
637
638
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
641
642
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
644 while (%proc)
645 {
646   goto THISROUND if $main::please_continue;
647   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
648   readfrompipes ();
649   if (!reapchildren())
650   {
651     check_squeue();
652     update_progress_stats();
653     select (undef, undef, undef, 0.1);
654     killem (keys %proc) if $main::please_freeze;
655   }
656 }
657
658 update_progress_stats();
659 freeze_if_want_freeze();
660
661
662 if (!defined $success)
663 {
664   if (@jobstep_todo &&
665       $thisround_succeeded == 0 &&
666       ($thisround_failed == 0 || $thisround_failed > 4))
667   {
668     my $message = "stop because $thisround_failed tasks failed and none succeeded";
669     Log (undef, $message);
670     $success = 0;
671   }
672   if (!@jobstep_todo)
673   {
674     $success = 1;
675   }
676 }
677
678 goto ONELEVEL if !defined $success;
679
680
681 release_allocation();
682 freeze();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = 0 if !$Job->{'output'};
685 $Job->save;
686
687 if ($Job->{'output'})
688 {
689   $arv->{'collections'}->{'create'}->execute('collection' => {
690     'uuid' => $Job->{'output'},
691     'manifest_text' => system("whget", $Job->{arvados_task}->{output}),
692                                              });;
693 }
694
695
696 Log (undef, "finish");
697
698 $Job->{'success'} = $Job->{'output'} && $success;
699 $Job->save;
700
701 save_meta();
702 exit 0;
703
704
705
706 sub update_progress_stats
707 {
708   $progress_stats_updated = time;
709   return if !$progress_is_dirty;
710   my ($todo, $done, $running) = (scalar @jobstep_todo,
711                                  scalar @jobstep_done,
712                                  scalar @slot - scalar @freeslot - scalar @holdslot);
713   $Job->{'tasks_summary'} ||= {};
714   $Job->{'tasks_summary'}->{'todo'} = $todo;
715   $Job->{'tasks_summary'}->{'done'} = $done;
716   $Job->{'tasks_summary'}->{'running'} = $running;
717   $Job->save;
718   Log (undef, "status: $done done, $running running, $todo todo");
719   $progress_is_dirty = 0;
720 }
721
722
723
724 sub reapchildren
725 {
726   my $pid = waitpid (-1, WNOHANG);
727   return 0 if $pid <= 0;
728
729   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
730                   . "."
731                   . $slot[$proc{$pid}->{slot}]->{cpu});
732   my $jobstepid = $proc{$pid}->{jobstep};
733   my $elapsed = time - $proc{$pid}->{time};
734   my $Jobstep = $jobstep[$jobstepid];
735
736   my $exitcode = $?;
737   my $exitinfo = "exit $exitcode";
738   $Jobstep->{arvados_task}->reload;
739   my $success = $Jobstep->{arvados_task}->{success};
740
741   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
742
743   if (!defined $success) {
744     # task did not indicate one way or the other --> fail
745     $Jobstep->{arvados_task}->{success} = 0;
746     $Jobstep->{arvados_task}->save;
747     $success = 0;
748   }
749
750   if (!$success)
751   {
752     --$Jobstep->{attempts} if $Jobstep->{node_fail};
753     ++$thisround_failed;
754     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
755
756     # Check for signs of a failed or misconfigured node
757     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
758         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
759       # Don't count this against jobstep failure thresholds if this
760       # node is already suspected faulty and srun exited quickly
761       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
762           $elapsed < 5 &&
763           $Jobstep->{attempts} > 1) {
764         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
765         --$Jobstep->{attempts};
766       }
767       ban_node_by_slot($proc{$pid}->{slot});
768     }
769
770     push @jobstep_todo, $jobstepid;
771     Log ($jobstepid, "failure in $elapsed seconds");
772     $Job->{'tasks_summary'}->{'failed'}++;
773   }
774   else
775   {
776     ++$thisround_succeeded;
777     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
778     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
779     push @jobstep_done, $jobstepid;
780     Log ($jobstepid, "success in $elapsed seconds");
781   }
782   $Jobstep->{exitcode} = $exitcode;
783   $Jobstep->{finishtime} = time;
784   process_stderr ($jobstepid, $success);
785   Log ($jobstepid, "output " . $Jobstep->{arvados_task}->{output});
786
787   close $reader{$jobstepid};
788   delete $reader{$jobstepid};
789   delete $slot[$proc{$pid}->{slot}]->{pid};
790   push @freeslot, $proc{$pid}->{slot};
791   delete $proc{$pid};
792
793   # Load new tasks
794   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute('where' => {
795     'created_by_job_task' => $Jobstep->{arvados_task}->{uuid}
796                                                              });
797   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
798     my $jobstep = {
799       'level' => $arvados_task->{'sequence'},
800       'attempts' => 0,
801       'arvados_task' => $arvados_task
802     };
803     push @jobstep, $jobstep;
804     push @jobstep_todo, $#jobstep;
805   }
806
807   $progress_is_dirty = 1;
808   1;
809 }
810
811
812 sub check_squeue
813 {
814   # return if the kill list was checked <4 seconds ago
815   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
816   {
817     return;
818   }
819   $squeue_kill_checked = time;
820
821   # use killem() on procs whose killtime is reached
822   for (keys %proc)
823   {
824     if (exists $proc{$_}->{killtime}
825         && $proc{$_}->{killtime} <= time)
826     {
827       killem ($_);
828     }
829   }
830
831   # return if the squeue was checked <60 seconds ago
832   if (defined $squeue_checked && $squeue_checked > time - 60)
833   {
834     return;
835   }
836   $squeue_checked = time;
837
838   if (!$have_slurm)
839   {
840     # here is an opportunity to check for mysterious problems with local procs
841     return;
842   }
843
844   # get a list of steps still running
845   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
846   chop @squeue;
847   if ($squeue[-1] ne "ok")
848   {
849     return;
850   }
851   pop @squeue;
852
853   # which of my jobsteps are running, according to squeue?
854   my %ok;
855   foreach (@squeue)
856   {
857     if (/^(\d+)\.(\d+) (\S+)/)
858     {
859       if ($1 eq $ENV{SLURM_JOBID})
860       {
861         $ok{$3} = 1;
862       }
863     }
864   }
865
866   # which of my active child procs (>60s old) were not mentioned by squeue?
867   foreach (keys %proc)
868   {
869     if ($proc{$_}->{time} < time - 60
870         && !exists $ok{$proc{$_}->{jobstepname}}
871         && !exists $proc{$_}->{killtime})
872     {
873       # kill this proc if it hasn't exited in 30 seconds
874       $proc{$_}->{killtime} = time + 30;
875     }
876   }
877 }
878
879
880 sub release_allocation
881 {
882   if ($have_slurm)
883   {
884     Log (undef, "release job allocation");
885     system "scancel $ENV{SLURM_JOBID}";
886   }
887 }
888
889
890 sub readfrompipes
891 {
892   my $gotsome = 0;
893   foreach my $job (keys %reader)
894   {
895     my $buf;
896     while (0 < sysread ($reader{$job}, $buf, 8192))
897     {
898       print STDERR $buf if $ENV{CRUNCH_DEBUG};
899       $jobstep[$job]->{stderr} .= $buf;
900       preprocess_stderr ($job);
901       if (length ($jobstep[$job]->{stderr}) > 16384)
902       {
903         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
904       }
905       $gotsome = 1;
906     }
907   }
908   return $gotsome;
909 }
910
911
912 sub preprocess_stderr
913 {
914   my $job = shift;
915
916   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
917     my $line = $1;
918     if ($line =~ /\+\+\+mr/) {
919       last;
920     }
921     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
922     Log ($job, "stderr $line");
923     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
924       # whoa.
925       $main::please_freeze = 1;
926     }
927     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
928       $jobstep[$job]->{node_fail} = 1;
929       ban_node_by_slot($jobstep[$job]->{slotindex});
930     }
931   }
932 }
933
934
935 sub process_stderr
936 {
937   my $job = shift;
938   my $success = shift;
939   preprocess_stderr ($job);
940
941   map {
942     Log ($job, "stderr $_");
943   } split ("\n", $jobstep[$job]->{stderr});
944 }
945
946
947 sub collate_output
948 {
949   my $whc = Warehouse->new;
950   Log (undef, "collate");
951   $whc->write_start (1);
952   my $joboutput;
953   for (@jobstep)
954   {
955     next if !exists $_->{arvados_task}->{output} || $_->{exitcode} != 0;
956     my $output = $_->{arvados_task}->{output};
957     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
958     {
959       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
960       $whc->write_data ($output);
961     }
962     elsif (@jobstep == 1)
963     {
964       $joboutput = $output;
965       $whc->write_finish;
966     }
967     elsif (defined (my $outblock = $whc->fetch_block ($output)))
968     {
969       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
970       $whc->write_data ($outblock);
971     }
972     else
973     {
974       my $errstr = $whc->errstr;
975       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
976       $success = 0;
977     }
978   }
979   $joboutput = $whc->write_finish if !defined $joboutput;
980   if ($joboutput)
981   {
982     Log (undef, "outputkey $joboutput");
983     $Job->{'output'} = $joboutput;
984     $Job->save;
985   }
986   else
987   {
988     Log (undef, "outputkey undef");
989   }
990   return $joboutput;
991 }
992
993
994 sub killem
995 {
996   foreach (@_)
997   {
998     my $sig = 2;                # SIGINT first
999     if (exists $proc{$_}->{"sent_$sig"} &&
1000         time - $proc{$_}->{"sent_$sig"} > 4)
1001     {
1002       $sig = 15;                # SIGTERM if SIGINT doesn't work
1003     }
1004     if (exists $proc{$_}->{"sent_$sig"} &&
1005         time - $proc{$_}->{"sent_$sig"} > 4)
1006     {
1007       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1008     }
1009     if (!exists $proc{$_}->{"sent_$sig"})
1010     {
1011       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1012       kill $sig, $_;
1013       select (undef, undef, undef, 0.1);
1014       if ($sig == 2)
1015       {
1016         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1017       }
1018       $proc{$_}->{"sent_$sig"} = time;
1019       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1020     }
1021   }
1022 }
1023
1024
1025 sub fhbits
1026 {
1027   my($bits);
1028   for (@_) {
1029     vec($bits,fileno($_),1) = 1;
1030   }
1031   $bits;
1032 }
1033
1034
1035 sub Log                         # ($jobstep_id, $logmessage)
1036 {
1037   if ($_[1] =~ /\n/) {
1038     for my $line (split (/\n/, $_[1])) {
1039       Log ($_[0], $line);
1040     }
1041     return;
1042   }
1043   my $fh = select STDERR; $|=1; select $fh;
1044   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1045   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1046   $message .= "\n";
1047   my $datetime;
1048   if ($metastream || -t STDERR) {
1049     my @gmtime = gmtime;
1050     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1051                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1052   }
1053   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1054
1055   return if !$metastream;
1056   $metastream->write_data ($datetime . " " . $message);
1057 }
1058
1059
1060 sub reconnect_database
1061 {
1062   return if !$job_has_uuid;
1063   return if ($dbh && $dbh->do ("select now()"));
1064   for (1..16)
1065   {
1066     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1067     if ($dbh) {
1068       $dbh->{InactiveDestroy} = 1;
1069       return;
1070     }
1071     warn ($DBI::errstr);
1072     sleep $_;
1073   }
1074   croak ($DBI::errstr) if !$dbh;
1075 }
1076
1077
1078 sub dbh_do
1079 {
1080   return 1 if !$job_has_uuid;
1081   my $ret = $dbh->do (@_);
1082   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1083   reconnect_database();
1084   return $dbh->do (@_);
1085 }
1086
1087
1088 sub croak
1089 {
1090   my ($package, $file, $line) = caller;
1091   my $message = "@_ at $file line $line\n";
1092   Log (undef, $message);
1093   freeze() if @jobstep_todo;
1094   collate_output() if @jobstep_todo;
1095   cleanup();
1096   save_meta() if $metastream;
1097   die;
1098 }
1099
1100
1101 sub cleanup
1102 {
1103   return if !$job_has_uuid;
1104   $Job->reload;
1105   $Job->{'running'} = 0;
1106   $Job->{'success'} = 0;
1107   $Job->{'finished_at'} = time;
1108   $Job->save;
1109 }
1110
1111
1112 sub save_meta
1113 {
1114   my $justcheckpoint = shift; # false if this will be the last meta saved
1115   my $m = $metastream;
1116   $m = $m->copy if $justcheckpoint;
1117   $m->write_finish;
1118   my $loglocator = $m->as_key;
1119   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1120   Log (undef, "meta key is $loglocator");
1121   $Job->{'log'} = $loglocator;
1122   $Job->save;
1123 }
1124
1125
1126 sub freeze_if_want_freeze
1127 {
1128   if ($main::please_freeze)
1129   {
1130     release_allocation();
1131     if (@_)
1132     {
1133       # kill some srun procs before freeze+stop
1134       map { $proc{$_} = {} } @_;
1135       while (%proc)
1136       {
1137         killem (keys %proc);
1138         select (undef, undef, undef, 0.1);
1139         my $died;
1140         while (($died = waitpid (-1, WNOHANG)) > 0)
1141         {
1142           delete $proc{$died};
1143         }
1144       }
1145     }
1146     freeze();
1147     collate_output();
1148     cleanup();
1149     save_meta();
1150     exit 0;
1151   }
1152 }
1153
1154
1155 sub freeze
1156 {
1157   Log (undef, "Freeze not implemented");
1158   return;
1159
1160   my $whc;                      # todo
1161   Log (undef, "freeze");
1162
1163   my $freezer = new Warehouse::Stream (whc => $whc);
1164   $freezer->clear;
1165   $freezer->name (".");
1166   $freezer->write_start ("state.txt");
1167
1168   $freezer->write_data (join ("\n",
1169                               "job $Job->{uuid}",
1170                               map
1171                               {
1172                                 $_ . "=" . freezequote($Job->{$_})
1173                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1174
1175   foreach my $Jobstep (@jobstep)
1176   {
1177     my $str = join ("\n",
1178                     map
1179                     {
1180                       $_ . "=" . freezequote ($Jobstep->{$_})
1181                     } grep {
1182                       $_ !~ /^stderr|slotindex|node_fail/
1183                     } keys %$Jobstep);
1184     $freezer->write_data ($str."\n\n");
1185   }
1186   if (@jobstep_tomerge)
1187   {
1188     $freezer->write_data
1189         ("merge $jobstep_tomerge_level "
1190          . freezequote (join ("\n",
1191                               map { freezequote ($_) } @jobstep_tomerge))
1192          . "\n\n");
1193   }
1194
1195   $freezer->write_finish;
1196   my $frozentokey = $freezer->as_key;
1197   undef $freezer;
1198   Log (undef, "frozento key is $frozentokey");
1199   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1200           $frozentokey, $job_id);
1201   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1202   Log (undef, "frozento+K key is $kfrozentokey");
1203   return $frozentokey;
1204 }
1205
1206
1207 sub thaw
1208 {
1209   croak ("Thaw not implemented");
1210
1211   my $whc;
1212   my $key = shift;
1213   Log (undef, "thaw from $key");
1214
1215   @jobstep = ();
1216   @jobstep_done = ();
1217   @jobstep_todo = ();
1218   @jobstep_tomerge = ();
1219   $jobstep_tomerge_level = 0;
1220   my $frozenjob = {};
1221
1222   my $stream = new Warehouse::Stream ( whc => $whc,
1223                                        hash => [split (",", $key)] );
1224   $stream->rewind;
1225   while (my $dataref = $stream->read_until (undef, "\n\n"))
1226   {
1227     if ($$dataref =~ /^job /)
1228     {
1229       foreach (split ("\n", $$dataref))
1230       {
1231         my ($k, $v) = split ("=", $_, 2);
1232         $frozenjob->{$k} = freezeunquote ($v);
1233       }
1234       next;
1235     }
1236
1237     if ($$dataref =~ /^merge (\d+) (.*)/)
1238     {
1239       $jobstep_tomerge_level = $1;
1240       @jobstep_tomerge
1241           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1242       next;
1243     }
1244
1245     my $Jobstep = { };
1246     foreach (split ("\n", $$dataref))
1247     {
1248       my ($k, $v) = split ("=", $_, 2);
1249       $Jobstep->{$k} = freezeunquote ($v) if $k;
1250     }
1251     $Jobstep->{attempts} = 0;
1252     push @jobstep, $Jobstep;
1253
1254     if ($Jobstep->{exitcode} eq "0")
1255     {
1256       push @jobstep_done, $#jobstep;
1257     }
1258     else
1259     {
1260       push @jobstep_todo, $#jobstep;
1261     }
1262   }
1263
1264   foreach (qw (script script_version script_parameters))
1265   {
1266     $Job->{$_} = $frozenjob->{$_};
1267   }
1268   $Job->save;
1269 }
1270
1271
1272 sub freezequote
1273 {
1274   my $s = shift;
1275   $s =~ s/\\/\\\\/g;
1276   $s =~ s/\n/\\n/g;
1277   return $s;
1278 }
1279
1280
1281 sub freezeunquote
1282 {
1283   my $s = shift;
1284   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1285   return $s;
1286 }
1287
1288
1289 sub srun
1290 {
1291   my $srunargs = shift;
1292   my $execargs = shift;
1293   my $opts = shift || {};
1294   my $stdin = shift;
1295   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1296   print STDERR (join (" ",
1297                       map { / / ? "'$_'" : $_ }
1298                       (@$args)),
1299                 "\n")
1300       if $ENV{CRUNCH_DEBUG};
1301
1302   if (defined $stdin) {
1303     my $child = open STDIN, "-|";
1304     defined $child or die "no fork: $!";
1305     if ($child == 0) {
1306       print $stdin or die $!;
1307       close STDOUT or die $!;
1308       exit 0;
1309     }
1310   }
1311
1312   return system (@$args) if $opts->{fork};
1313
1314   exec @$args;
1315   warn "ENV size is ".length(join(" ",%ENV));
1316   die "exec failed: $!: @$args";
1317 }
1318
1319
1320 sub ban_node_by_slot {
1321   # Don't start any new jobsteps on this node for 60 seconds
1322   my $slotid = shift;
1323   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1324   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1325 }
1326
1327 __DATA__
1328 #!/usr/bin/perl
1329
1330 # checkout-and-build
1331
1332 use Fcntl ':flock';
1333
1334 my $destdir = $ENV{"CRUNCH_SRC"};
1335 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1336 my $repo = $ENV{"CRUNCH_SRC_URL"};
1337
1338 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1339 flock L, LOCK_EX;
1340 if (readlink ("$destdir.commit") eq $commit) {
1341     exit 0;
1342 }
1343
1344 open STDOUT, ">", "$destdir.log";
1345 open STDERR, ">&STDOUT";
1346
1347 if (-d "$destdir/.git") {
1348     chdir $destdir or die "chdir $destdir: $!";
1349     if (0 != system (qw(git remote set-url origin), $repo)) {
1350         # awful... for old versions of git that don't know "remote set-url"
1351         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1352     }
1353 }
1354 elsif ($repo && $commit)
1355 {
1356     shell_or_die('git', 'clone', $repo, $destdir);
1357     chdir $destdir or die "chdir $destdir: $!";
1358     shell_or_die(qw(git config clean.requireForce false));
1359 }
1360 else {
1361     die "$destdir does not exist, and no repo/commit specified -- giving up";
1362 }
1363
1364 if ($commit) {
1365     unlink "$destdir.commit";
1366     shell_or_die (qw(git stash));
1367     shell_or_die (qw(git clean -d -x));
1368     shell_or_die (qw(git fetch origin));
1369     shell_or_die (qw(git checkout), $commit);
1370 }
1371
1372 my $pwd;
1373 chomp ($pwd = `pwd`);
1374 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1375 mkdir $install_dir;
1376 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1377     # Old version
1378     shell_or_die ("./tests/autotests.sh", $install_dir);
1379 } elsif (-e "./install.sh") {
1380     shell_or_die ("./install.sh", $install_dir);
1381 }
1382
1383 if ($commit) {
1384     unlink "$destdir.commit.new";
1385     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1386     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1387 }
1388
1389 close L;
1390
1391 exit 0;
1392
1393 sub shell_or_die
1394 {
1395   if ($ENV{"DEBUG"}) {
1396     print STDERR "@_\n";
1397   }
1398   system (@_) == 0
1399       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1400 }