72563e68adb0254eaf6f262ca8d2e076c8b1aa90
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
59
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
64
65 my $force_unlock;
66 my $jobspec;
67 my $resume_stash;
68 GetOptions('force-unlock' => \$force_unlock,
69            'job=s' => \$jobspec,
70            'resume-stash=s' => \$resume_stash,
71     );
72
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
75
76
77 $SIG{'HUP'} = sub
78 {
79   1;
80 };
81 $SIG{'USR1'} = sub
82 {
83   $main::ENV{CRUNCH_DEBUG} = 1;
84 };
85 $SIG{'USR2'} = sub
86 {
87   $main::ENV{CRUNCH_DEBUG} = 0;
88 };
89
90
91
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
94 $metastream->clear;
95 $metastream->write_start('log.txt');
96
97 my $User = {};
98 my $Job = {};
99 my $job_id;
100 my $dbh;
101 my $sth;
102 if ($job_has_uuid)
103 {
104   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105   $User = $arv->{'users'}->{'current'}->execute;
106   if (!$force_unlock) {
107     if ($Job->{'is_locked_by'}) {
108       croak("Job is locked: " . $Job->{'is_locked_by'});
109     }
110     if ($Job->{'success'} ne undef) {
111       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112     }
113     if ($Job->{'running'}) {
114       croak("Job 'running' flag is already set");
115     }
116     if ($Job->{'started_at'}) {
117       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
118     }
119   }
120 }
121 else
122 {
123   $Job = JSON::decode_json($jobspec);
124
125   if (!$resume_stash)
126   {
127     map { croak ("No $_ specified") unless $Job->{$_} }
128     qw(script script_version script_parameters);
129   }
130
131   if (!defined $Job->{'uuid'}) {
132     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
133   }
134 }
135 $job_id = $Job->{'uuid'};
136
137
138
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
142
143
144 Log (undef, "check slurm allocation");
145 my @slot;
146 my @node;
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
148 my @sinfo;
149 if (!$have_slurm)
150 {
151   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152   push @sinfo, "$localcpus localhost";
153 }
154 if (exists $ENV{SLURM_NODELIST})
155 {
156   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
157 }
158 foreach (@sinfo)
159 {
160   my ($ncpus, $slurm_nodelist) = split;
161   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
162
163   my @nodelist;
164   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
165   {
166     my $nodelist = $1;
167     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
168     {
169       my $ranges = $1;
170       foreach (split (",", $ranges))
171       {
172         my ($a, $b);
173         if (/(\d+)-(\d+)/)
174         {
175           $a = $1;
176           $b = $2;
177         }
178         else
179         {
180           $a = $_;
181           $b = $_;
182         }
183         push @nodelist, map {
184           my $n = $nodelist;
185           $n =~ s/\[[-,\d]+\]/$_/;
186           $n;
187         } ($a..$b);
188       }
189     }
190     else
191     {
192       push @nodelist, $nodelist;
193     }
194   }
195   foreach my $nodename (@nodelist)
196   {
197     Log (undef, "node $nodename - $ncpus slots");
198     my $node = { name => $nodename,
199                  ncpus => $ncpus,
200                  losing_streak => 0,
201                  hold_until => 0 };
202     foreach my $cpu (1..$ncpus)
203     {
204       push @slot, { node => $node,
205                     cpu => $cpu };
206     }
207   }
208   push @node, @nodelist;
209 }
210
211
212
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
215
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
217
218
219
220 my $jobmanager_id;
221 if ($job_has_uuid)
222 {
223   # Claim this job, and make sure nobody else does
224
225   $Job->{'is_locked_by'} = $User->{'uuid'};
226   $Job->{'started_at'} = time;
227   $Job->{'running'} = 1;
228   $Job->{'success'} = undef;
229   $Job->{'tasks_summary'} = { 'failed' => 0,
230                               'todo' => 1,
231                               'running' => 0,
232                               'done' => 0 };
233   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234     croak("Error while updating / locking job");
235   }
236 }
237
238
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
250
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
254
255
256 my @jobstep;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
261 my $squeue_checked;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
264
265
266
267 if (defined $Job->{thawedfromkey})
268 {
269   thaw ($Job->{thawedfromkey});
270 }
271 else
272 {
273   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274     'job_uuid' => $Job->{'uuid'},
275     'sequence' => 0,
276     'qsequence' => 0,
277     'parameters' => {},
278                                                           });
279   push @jobstep, { 'level' => 0,
280                    'attempts' => 0,
281                    'arvados_task' => $first_task,
282                  };
283   push @jobstep_todo, 0;
284 }
285
286
287 my $build_script;
288 do {
289   local $/ = undef;
290   $build_script = <DATA>;
291 };
292
293
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
297 if ($skip_install)
298 {
299   $ENV{"CRUNCH_SRC"} = $Job->{revision};
300 }
301 else
302 {
303   Log (undef, "Install revision ".$Job->{revision});
304   my $nodelist = join(",", @node);
305
306   # Clean out crunch_tmp/work and crunch_tmp/opt
307
308   my $cleanpid = fork();
309   if ($cleanpid == 0)
310   {
311     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
313     exit (1);
314   }
315   while (1)
316   {
317     last if $cleanpid == waitpid (-1, WNOHANG);
318     freeze_if_want_freeze ($cleanpid);
319     select (undef, undef, undef, 0.1);
320   }
321   Log (undef, "Clean-work-dir exited $?");
322
323   # Install requested code version
324
325   my @execargs;
326   my @srunargs = ("srun",
327                   "--nodelist=$nodelist",
328                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329
330   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
333
334   my $commit;
335   my $treeish = $Job->{'script_version'};
336   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337   # Todo: let script_version specify alternate repo
338   $ENV{"CRUNCH_SRC_URL"} = $repo;
339
340   # Create/update our clone of the remote git repo
341
342   if (!-d $ENV{"CRUNCH_SRC"}) {
343     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344         or croak ("git clone $repo failed: exit ".($?>>8));
345     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346   }
347   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348
349   # If this looks like a subversion r#, look for it in git-svn commit messages
350
351   if ($treeish =~ m{^\d{1,4}$}) {
352     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353     chomp $gitlog;
354     if ($gitlog =~ /^[a-f0-9]{40}$/) {
355       $commit = $gitlog;
356       Log (undef, "Using commit $commit for revision $treeish");
357     }
358   }
359
360   # If that didn't work, try asking git to look it up as a tree-ish.
361
362   if (!defined $commit) {
363
364     my $cooked_treeish = $treeish;
365     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366       # Looks like a git branch name -- make sure git knows it's
367       # relative to the remote repo
368       $cooked_treeish = "origin/$treeish";
369     }
370
371     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372     chomp $found;
373     if ($found =~ /^[0-9a-f]{40}$/s) {
374       $commit = $found;
375       if ($commit ne $treeish) {
376         # Make sure we record the real commit id in the database,
377         # frozentokey, logs, etc. -- instead of an abbreviation or a
378         # branch name which can become ambiguous or point to a
379         # different commit in the future.
380         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381         Log (undef, "Using commit $commit for tree-ish $treeish");
382         if ($commit ne $treeish) {
383           $Job->{'script_version'} = $commit;
384           $Job->save() or croak("Error while updating job");
385         }
386       }
387     }
388   }
389
390   if (defined $commit) {
391     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392     @execargs = ("sh", "-c",
393                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
394   }
395   else {
396     croak ("could not figure out commit id for $treeish");
397   }
398
399   my $installpid = fork();
400   if ($installpid == 0)
401   {
402     srun (\@srunargs, \@execargs, {}, $build_script);
403     exit (1);
404   }
405   while (1)
406   {
407     last if $installpid == waitpid (-1, WNOHANG);
408     freeze_if_want_freeze ($installpid);
409     select (undef, undef, undef, 0.1);
410   }
411   Log (undef, "Install exited $?");
412 }
413
414
415
416 foreach (qw (script script_version script_parameters resource_limits))
417 {
418   Log (undef, $_ . " " . $Job->{$_});
419 }
420 foreach (split (/\n/, $Job->{knobs}))
421 {
422   Log (undef, "knob " . $_);
423 }
424
425
426
427 my $success;
428
429
430
431 ONELEVEL:
432
433 my $thisround_succeeded = 0;
434 my $thisround_failed = 0;
435 my $thisround_failed_multiple = 0;
436
437 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
438                        or $a <=> $b } @jobstep_todo;
439 my $level = $jobstep[$jobstep_todo[0]]->{level};
440 Log (undef, "start level $level");
441
442
443
444 my %proc;
445 my @freeslot = (0..$#slot);
446 my @holdslot;
447 my %reader;
448 my $progress_is_dirty = 1;
449 my $progress_stats_updated = 0;
450
451 update_progress_stats();
452
453
454
455 THISROUND:
456 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
457 {
458   $main::please_continue = 0;
459
460   my $id = $jobstep_todo[$todo_ptr];
461   my $Jobstep = $jobstep[$id];
462   if ($Jobstep->{level} != $level)
463   {
464     next;
465   }
466   if ($Jobstep->{attempts} > 9)
467   {
468     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
469     $success = 0;
470     last THISROUND;
471   }
472
473   pipe $reader{$id}, "writer" or croak ($!);
474   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
475   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
476
477   my $childslot = $freeslot[0];
478   my $childnode = $slot[$childslot]->{node};
479   my $childslotname = join (".",
480                             $slot[$childslot]->{node}->{name},
481                             $slot[$childslot]->{cpu});
482   my $childpid = fork();
483   if ($childpid == 0)
484   {
485     $SIG{'INT'} = 'DEFAULT';
486     $SIG{'QUIT'} = 'DEFAULT';
487     $SIG{'TERM'} = 'DEFAULT';
488
489     foreach (values (%reader))
490     {
491       close($_);
492     }
493     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
494     open(STDOUT,">&writer");
495     open(STDERR,">&writer");
496
497     undef $dbh;
498     undef $sth;
499
500     delete $ENV{"GNUPGHOME"};
501     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
502     $ENV{"TASK_QSEQUENCE"} = $id;
503     $ENV{"TASK_SEQUENCE"} = $level;
504     $ENV{"JOB_SCRIPT"} = $Job->{script};
505     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
506       $param =~ tr/a-z/A-Z/;
507       $ENV{"JOB_PARAMETER_$param"} = $value;
508     }
509     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
510     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
511     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
512     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
513
514     $ENV{"GZIP"} = "-n";
515
516     my @srunargs = (
517       "srun",
518       "--nodelist=".$childnode->{name},
519       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
520       "--job-name=$job_id.$id.$$",
521         );
522     my @execargs = qw(sh);
523     my $build_script_to_send = "";
524     my $command =
525         "mkdir -p $ENV{CRUNCH_TMP}/revision "
526         ."&& cd $ENV{CRUNCH_TMP} ";
527     if ($build_script)
528     {
529       $build_script_to_send = $build_script;
530       $command .=
531           "&& perl -";
532     }
533     elsif (!$skip_install)
534     {
535       $command .=
536           "&& "
537           ."( "
538           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
539           ."|| "
540           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
541           ."    && ./installrevision "
542           ."  ) "
543           .") ";
544     }
545     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
546     $command .=
547         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
548     my @execargs = ('bash', '-c', $command);
549     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
550     exit (1);
551   }
552   close("writer");
553   if (!defined $childpid)
554   {
555     close $reader{$id};
556     delete $reader{$id};
557     next;
558   }
559   shift @freeslot;
560   $proc{$childpid} = { jobstep => $id,
561                        time => time,
562                        slot => $childslot,
563                        jobstepname => "$job_id.$id.$childpid",
564                      };
565   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
566   $slot[$childslot]->{pid} = $childpid;
567
568   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
569   Log ($id, "child $childpid started on $childslotname");
570   $Jobstep->{attempts} ++;
571   $Jobstep->{starttime} = time;
572   $Jobstep->{node} = $childnode->{name};
573   $Jobstep->{slotindex} = $childslot;
574   delete $Jobstep->{stderr};
575   delete $Jobstep->{finishtime};
576
577   splice @jobstep_todo, $todo_ptr, 1;
578   --$todo_ptr;
579
580   $progress_is_dirty = 1;
581
582   while (!@freeslot
583          ||
584          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
585   {
586     last THISROUND if $main::please_freeze;
587     if ($main::please_info)
588     {
589       $main::please_info = 0;
590       freeze();
591       collate_output();
592       save_meta(1);
593       update_progress_stats();
594     }
595     my $gotsome
596         = readfrompipes ()
597         + reapchildren ();
598     if (!$gotsome)
599     {
600       check_squeue();
601       update_progress_stats();
602       select (undef, undef, undef, 0.1);
603     }
604     elsif (time - $progress_stats_updated >= 30)
605     {
606       update_progress_stats();
607     }
608     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
609         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
610     {
611       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
612           .($thisround_failed+$thisround_succeeded)
613           .") -- giving up on this round";
614       Log (undef, $message);
615       last THISROUND;
616     }
617
618     # move slots from freeslot to holdslot (or back to freeslot) if necessary
619     for (my $i=$#freeslot; $i>=0; $i--) {
620       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
621         push @holdslot, (splice @freeslot, $i, 1);
622       }
623     }
624     for (my $i=$#holdslot; $i>=0; $i--) {
625       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
626         push @freeslot, (splice @holdslot, $i, 1);
627       }
628     }
629
630     # give up if no nodes are succeeding
631     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
632       my $message = "Every node has failed -- giving up on this round";
633       Log (undef, $message);
634       last THISROUND;
635     }
636   }
637 }
638
639
640 push @freeslot, splice @holdslot;
641 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
642
643
644 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
645 while (%proc)
646 {
647   goto THISROUND if $main::please_continue;
648   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
649   readfrompipes ();
650   if (!reapchildren())
651   {
652     check_squeue();
653     update_progress_stats();
654     select (undef, undef, undef, 0.1);
655     killem (keys %proc) if $main::please_freeze;
656   }
657 }
658
659 update_progress_stats();
660 freeze_if_want_freeze();
661
662
663 if (!defined $success)
664 {
665   if (@jobstep_todo &&
666       $thisround_succeeded == 0 &&
667       ($thisround_failed == 0 || $thisround_failed > 4))
668   {
669     my $message = "stop because $thisround_failed tasks failed and none succeeded";
670     Log (undef, $message);
671     $success = 0;
672   }
673   if (!@jobstep_todo)
674   {
675     $success = 1;
676   }
677 }
678
679 goto ONELEVEL if !defined $success;
680
681
682 release_allocation();
683 freeze();
684 $Job->{'output'} = &collate_output();
685 $Job->{'success'} = $Job->{'output'} && $success;
686 $Job->save;
687
688 if ($Job->{'output'})
689 {
690   eval {
691     my $manifest_text = capturex("whget", $Job->{'output'});
692     $arv->{'collections'}->{'create'}->execute('collection' => {
693       'uuid' => $Job->{'output'},
694       'manifest_text' => $manifest_text,
695     });
696   };
697   if ($@) {
698     Log (undef, "Failed to register output manifest: $@");
699   }
700 }
701
702 Log (undef, "finish");
703
704 save_meta();
705 exit 0;
706
707
708
709 sub update_progress_stats
710 {
711   $progress_stats_updated = time;
712   return if !$progress_is_dirty;
713   my ($todo, $done, $running) = (scalar @jobstep_todo,
714                                  scalar @jobstep_done,
715                                  scalar @slot - scalar @freeslot - scalar @holdslot);
716   $Job->{'tasks_summary'} ||= {};
717   $Job->{'tasks_summary'}->{'todo'} = $todo;
718   $Job->{'tasks_summary'}->{'done'} = $done;
719   $Job->{'tasks_summary'}->{'running'} = $running;
720   $Job->save;
721   Log (undef, "status: $done done, $running running, $todo todo");
722   $progress_is_dirty = 0;
723 }
724
725
726
727 sub reapchildren
728 {
729   my $pid = waitpid (-1, WNOHANG);
730   return 0 if $pid <= 0;
731
732   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
733                   . "."
734                   . $slot[$proc{$pid}->{slot}]->{cpu});
735   my $jobstepid = $proc{$pid}->{jobstep};
736   my $elapsed = time - $proc{$pid}->{time};
737   my $Jobstep = $jobstep[$jobstepid];
738
739   my $exitcode = $?;
740   my $exitinfo = "exit $exitcode";
741   $Jobstep->{'arvados_task'}->reload;
742   my $success = $Jobstep->{'arvados_task'}->{success};
743
744   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
745
746   if (!defined $success) {
747     # task did not indicate one way or the other --> fail
748     $Jobstep->{'arvados_task'}->{success} = 0;
749     $Jobstep->{'arvados_task'}->save;
750     $success = 0;
751   }
752
753   if (!$success)
754   {
755     --$Jobstep->{attempts} if $Jobstep->{node_fail};
756     ++$thisround_failed;
757     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
758
759     # Check for signs of a failed or misconfigured node
760     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
761         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
762       # Don't count this against jobstep failure thresholds if this
763       # node is already suspected faulty and srun exited quickly
764       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
765           $elapsed < 5 &&
766           $Jobstep->{attempts} > 1) {
767         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
768         --$Jobstep->{attempts};
769       }
770       ban_node_by_slot($proc{$pid}->{slot});
771     }
772
773     push @jobstep_todo, $jobstepid;
774     Log ($jobstepid, "failure in $elapsed seconds");
775     $Job->{'tasks_summary'}->{'failed'}++;
776   }
777   else
778   {
779     ++$thisround_succeeded;
780     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
781     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
782     push @jobstep_done, $jobstepid;
783     Log ($jobstepid, "success in $elapsed seconds");
784   }
785   $Jobstep->{exitcode} = $exitcode;
786   $Jobstep->{finishtime} = time;
787   process_stderr ($jobstepid, $success);
788   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
789
790   close $reader{$jobstepid};
791   delete $reader{$jobstepid};
792   delete $slot[$proc{$pid}->{slot}]->{pid};
793   push @freeslot, $proc{$pid}->{slot};
794   delete $proc{$pid};
795
796   # Load new tasks
797   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
798     'where' => {
799       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
800     },
801     'order' => 'qsequence'
802   );
803   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
804     my $jobstep = {
805       'level' => $arvados_task->{'sequence'},
806       'attempts' => 0,
807       'arvados_task' => $arvados_task
808     };
809     push @jobstep, $jobstep;
810     push @jobstep_todo, $#jobstep;
811   }
812
813   $progress_is_dirty = 1;
814   1;
815 }
816
817
818 sub check_squeue
819 {
820   # return if the kill list was checked <4 seconds ago
821   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
822   {
823     return;
824   }
825   $squeue_kill_checked = time;
826
827   # use killem() on procs whose killtime is reached
828   for (keys %proc)
829   {
830     if (exists $proc{$_}->{killtime}
831         && $proc{$_}->{killtime} <= time)
832     {
833       killem ($_);
834     }
835   }
836
837   # return if the squeue was checked <60 seconds ago
838   if (defined $squeue_checked && $squeue_checked > time - 60)
839   {
840     return;
841   }
842   $squeue_checked = time;
843
844   if (!$have_slurm)
845   {
846     # here is an opportunity to check for mysterious problems with local procs
847     return;
848   }
849
850   # get a list of steps still running
851   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
852   chop @squeue;
853   if ($squeue[-1] ne "ok")
854   {
855     return;
856   }
857   pop @squeue;
858
859   # which of my jobsteps are running, according to squeue?
860   my %ok;
861   foreach (@squeue)
862   {
863     if (/^(\d+)\.(\d+) (\S+)/)
864     {
865       if ($1 eq $ENV{SLURM_JOBID})
866       {
867         $ok{$3} = 1;
868       }
869     }
870   }
871
872   # which of my active child procs (>60s old) were not mentioned by squeue?
873   foreach (keys %proc)
874   {
875     if ($proc{$_}->{time} < time - 60
876         && !exists $ok{$proc{$_}->{jobstepname}}
877         && !exists $proc{$_}->{killtime})
878     {
879       # kill this proc if it hasn't exited in 30 seconds
880       $proc{$_}->{killtime} = time + 30;
881     }
882   }
883 }
884
885
886 sub release_allocation
887 {
888   if ($have_slurm)
889   {
890     Log (undef, "release job allocation");
891     system "scancel $ENV{SLURM_JOBID}";
892   }
893 }
894
895
896 sub readfrompipes
897 {
898   my $gotsome = 0;
899   foreach my $job (keys %reader)
900   {
901     my $buf;
902     while (0 < sysread ($reader{$job}, $buf, 8192))
903     {
904       print STDERR $buf if $ENV{CRUNCH_DEBUG};
905       $jobstep[$job]->{stderr} .= $buf;
906       preprocess_stderr ($job);
907       if (length ($jobstep[$job]->{stderr}) > 16384)
908       {
909         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
910       }
911       $gotsome = 1;
912     }
913   }
914   return $gotsome;
915 }
916
917
918 sub preprocess_stderr
919 {
920   my $job = shift;
921
922   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
923     my $line = $1;
924     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
925     Log ($job, "stderr $line");
926     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
927       # whoa.
928       $main::please_freeze = 1;
929     }
930     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
931       $jobstep[$job]->{node_fail} = 1;
932       ban_node_by_slot($jobstep[$job]->{slotindex});
933     }
934   }
935 }
936
937
938 sub process_stderr
939 {
940   my $job = shift;
941   my $success = shift;
942   preprocess_stderr ($job);
943
944   map {
945     Log ($job, "stderr $_");
946   } split ("\n", $jobstep[$job]->{stderr});
947 }
948
949
950 sub collate_output
951 {
952   my $whc = Warehouse->new;
953   Log (undef, "collate");
954   $whc->write_start (1);
955   my $joboutput;
956   for (@jobstep)
957   {
958     next if (!exists $_->{'arvados_task'}->{output} ||
959              !$_->{'arvados_task'}->{'success'} ||
960              $_->{'exitcode'} != 0);
961     my $output = $_->{'arvados_task'}->{output};
962     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
963     {
964       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
965       $whc->write_data ($output);
966     }
967     elsif (@jobstep == 1)
968     {
969       $joboutput = $output;
970       $whc->write_finish;
971     }
972     elsif (defined (my $outblock = $whc->fetch_block ($output)))
973     {
974       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
975       $whc->write_data ($outblock);
976     }
977     else
978     {
979       my $errstr = $whc->errstr;
980       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
981       $success = 0;
982     }
983   }
984   $joboutput = $whc->write_finish if !defined $joboutput;
985   if ($joboutput)
986   {
987     Log (undef, "output $joboutput");
988     $Job->{'output'} = $joboutput;
989     $Job->save;
990   }
991   else
992   {
993     Log (undef, "output undef");
994   }
995   return $joboutput;
996 }
997
998
999 sub killem
1000 {
1001   foreach (@_)
1002   {
1003     my $sig = 2;                # SIGINT first
1004     if (exists $proc{$_}->{"sent_$sig"} &&
1005         time - $proc{$_}->{"sent_$sig"} > 4)
1006     {
1007       $sig = 15;                # SIGTERM if SIGINT doesn't work
1008     }
1009     if (exists $proc{$_}->{"sent_$sig"} &&
1010         time - $proc{$_}->{"sent_$sig"} > 4)
1011     {
1012       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1013     }
1014     if (!exists $proc{$_}->{"sent_$sig"})
1015     {
1016       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1017       kill $sig, $_;
1018       select (undef, undef, undef, 0.1);
1019       if ($sig == 2)
1020       {
1021         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1022       }
1023       $proc{$_}->{"sent_$sig"} = time;
1024       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1025     }
1026   }
1027 }
1028
1029
1030 sub fhbits
1031 {
1032   my($bits);
1033   for (@_) {
1034     vec($bits,fileno($_),1) = 1;
1035   }
1036   $bits;
1037 }
1038
1039
1040 sub Log                         # ($jobstep_id, $logmessage)
1041 {
1042   if ($_[1] =~ /\n/) {
1043     for my $line (split (/\n/, $_[1])) {
1044       Log ($_[0], $line);
1045     }
1046     return;
1047   }
1048   my $fh = select STDERR; $|=1; select $fh;
1049   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1050   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1051   $message .= "\n";
1052   my $datetime;
1053   if ($metastream || -t STDERR) {
1054     my @gmtime = gmtime;
1055     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1056                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1057   }
1058   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1059
1060   return if !$metastream;
1061   $metastream->write_data ($datetime . " " . $message);
1062 }
1063
1064
1065 sub reconnect_database
1066 {
1067   return if !$job_has_uuid;
1068   return if ($dbh && $dbh->do ("select now()"));
1069   for (1..16)
1070   {
1071     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1072     if ($dbh) {
1073       $dbh->{InactiveDestroy} = 1;
1074       return;
1075     }
1076     warn ($DBI::errstr);
1077     sleep $_;
1078   }
1079   croak ($DBI::errstr) if !$dbh;
1080 }
1081
1082
1083 sub dbh_do
1084 {
1085   return 1 if !$job_has_uuid;
1086   my $ret = $dbh->do (@_);
1087   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1088   reconnect_database();
1089   return $dbh->do (@_);
1090 }
1091
1092
1093 sub croak
1094 {
1095   my ($package, $file, $line) = caller;
1096   my $message = "@_ at $file line $line\n";
1097   Log (undef, $message);
1098   freeze() if @jobstep_todo;
1099   collate_output() if @jobstep_todo;
1100   cleanup();
1101   save_meta() if $metastream;
1102   die;
1103 }
1104
1105
1106 sub cleanup
1107 {
1108   return if !$job_has_uuid;
1109   $Job->reload;
1110   $Job->{'running'} = 0;
1111   $Job->{'success'} = 0;
1112   $Job->{'finished_at'} = time;
1113   $Job->save;
1114 }
1115
1116
1117 sub save_meta
1118 {
1119   my $justcheckpoint = shift; # false if this will be the last meta saved
1120   my $m = $metastream;
1121   $m = $m->copy if $justcheckpoint;
1122   $m->write_finish;
1123   my $loglocator = $m->as_key;
1124   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1125   Log (undef, "meta key is $loglocator");
1126   $Job->{'log'} = $loglocator;
1127   $Job->save;
1128 }
1129
1130
1131 sub freeze_if_want_freeze
1132 {
1133   if ($main::please_freeze)
1134   {
1135     release_allocation();
1136     if (@_)
1137     {
1138       # kill some srun procs before freeze+stop
1139       map { $proc{$_} = {} } @_;
1140       while (%proc)
1141       {
1142         killem (keys %proc);
1143         select (undef, undef, undef, 0.1);
1144         my $died;
1145         while (($died = waitpid (-1, WNOHANG)) > 0)
1146         {
1147           delete $proc{$died};
1148         }
1149       }
1150     }
1151     freeze();
1152     collate_output();
1153     cleanup();
1154     save_meta();
1155     exit 0;
1156   }
1157 }
1158
1159
1160 sub freeze
1161 {
1162   Log (undef, "Freeze not implemented");
1163   return;
1164
1165   my $whc;                      # todo
1166   Log (undef, "freeze");
1167
1168   my $freezer = new Warehouse::Stream (whc => $whc);
1169   $freezer->clear;
1170   $freezer->name (".");
1171   $freezer->write_start ("state.txt");
1172
1173   $freezer->write_data (join ("\n",
1174                               "job $Job->{uuid}",
1175                               map
1176                               {
1177                                 $_ . "=" . freezequote($Job->{$_})
1178                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1179
1180   foreach my $Jobstep (@jobstep)
1181   {
1182     my $str = join ("\n",
1183                     map
1184                     {
1185                       $_ . "=" . freezequote ($Jobstep->{$_})
1186                     } grep {
1187                       $_ !~ /^stderr|slotindex|node_fail/
1188                     } keys %$Jobstep);
1189     $freezer->write_data ($str."\n\n");
1190   }
1191   if (@jobstep_tomerge)
1192   {
1193     $freezer->write_data
1194         ("merge $jobstep_tomerge_level "
1195          . freezequote (join ("\n",
1196                               map { freezequote ($_) } @jobstep_tomerge))
1197          . "\n\n");
1198   }
1199
1200   $freezer->write_finish;
1201   my $frozentokey = $freezer->as_key;
1202   undef $freezer;
1203   Log (undef, "frozento key is $frozentokey");
1204   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1205           $frozentokey, $job_id);
1206   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1207   Log (undef, "frozento+K key is $kfrozentokey");
1208   return $frozentokey;
1209 }
1210
1211
1212 sub thaw
1213 {
1214   croak ("Thaw not implemented");
1215
1216   my $whc;
1217   my $key = shift;
1218   Log (undef, "thaw from $key");
1219
1220   @jobstep = ();
1221   @jobstep_done = ();
1222   @jobstep_todo = ();
1223   @jobstep_tomerge = ();
1224   $jobstep_tomerge_level = 0;
1225   my $frozenjob = {};
1226
1227   my $stream = new Warehouse::Stream ( whc => $whc,
1228                                        hash => [split (",", $key)] );
1229   $stream->rewind;
1230   while (my $dataref = $stream->read_until (undef, "\n\n"))
1231   {
1232     if ($$dataref =~ /^job /)
1233     {
1234       foreach (split ("\n", $$dataref))
1235       {
1236         my ($k, $v) = split ("=", $_, 2);
1237         $frozenjob->{$k} = freezeunquote ($v);
1238       }
1239       next;
1240     }
1241
1242     if ($$dataref =~ /^merge (\d+) (.*)/)
1243     {
1244       $jobstep_tomerge_level = $1;
1245       @jobstep_tomerge
1246           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1247       next;
1248     }
1249
1250     my $Jobstep = { };
1251     foreach (split ("\n", $$dataref))
1252     {
1253       my ($k, $v) = split ("=", $_, 2);
1254       $Jobstep->{$k} = freezeunquote ($v) if $k;
1255     }
1256     $Jobstep->{attempts} = 0;
1257     push @jobstep, $Jobstep;
1258
1259     if ($Jobstep->{exitcode} eq "0")
1260     {
1261       push @jobstep_done, $#jobstep;
1262     }
1263     else
1264     {
1265       push @jobstep_todo, $#jobstep;
1266     }
1267   }
1268
1269   foreach (qw (script script_version script_parameters))
1270   {
1271     $Job->{$_} = $frozenjob->{$_};
1272   }
1273   $Job->save;
1274 }
1275
1276
1277 sub freezequote
1278 {
1279   my $s = shift;
1280   $s =~ s/\\/\\\\/g;
1281   $s =~ s/\n/\\n/g;
1282   return $s;
1283 }
1284
1285
1286 sub freezeunquote
1287 {
1288   my $s = shift;
1289   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1290   return $s;
1291 }
1292
1293
1294 sub srun
1295 {
1296   my $srunargs = shift;
1297   my $execargs = shift;
1298   my $opts = shift || {};
1299   my $stdin = shift;
1300   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1301   print STDERR (join (" ",
1302                       map { / / ? "'$_'" : $_ }
1303                       (@$args)),
1304                 "\n")
1305       if $ENV{CRUNCH_DEBUG};
1306
1307   if (defined $stdin) {
1308     my $child = open STDIN, "-|";
1309     defined $child or die "no fork: $!";
1310     if ($child == 0) {
1311       print $stdin or die $!;
1312       close STDOUT or die $!;
1313       exit 0;
1314     }
1315   }
1316
1317   return system (@$args) if $opts->{fork};
1318
1319   exec @$args;
1320   warn "ENV size is ".length(join(" ",%ENV));
1321   die "exec failed: $!: @$args";
1322 }
1323
1324
1325 sub ban_node_by_slot {
1326   # Don't start any new jobsteps on this node for 60 seconds
1327   my $slotid = shift;
1328   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1329   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1330 }
1331
1332 __DATA__
1333 #!/usr/bin/perl
1334
1335 # checkout-and-build
1336
1337 use Fcntl ':flock';
1338
1339 my $destdir = $ENV{"CRUNCH_SRC"};
1340 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1341 my $repo = $ENV{"CRUNCH_SRC_URL"};
1342
1343 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1344 flock L, LOCK_EX;
1345 if (readlink ("$destdir.commit") eq $commit) {
1346     exit 0;
1347 }
1348
1349 open STDOUT, ">", "$destdir.log";
1350 open STDERR, ">&STDOUT";
1351
1352 if (-d "$destdir/.git") {
1353     chdir $destdir or die "chdir $destdir: $!";
1354     if (0 != system (qw(git remote set-url origin), $repo)) {
1355         # awful... for old versions of git that don't know "remote set-url"
1356         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1357     }
1358 }
1359 elsif ($repo && $commit)
1360 {
1361     shell_or_die('git', 'clone', $repo, $destdir);
1362     chdir $destdir or die "chdir $destdir: $!";
1363     shell_or_die(qw(git config clean.requireForce false));
1364 }
1365 else {
1366     die "$destdir does not exist, and no repo/commit specified -- giving up";
1367 }
1368
1369 if ($commit) {
1370     unlink "$destdir.commit";
1371     shell_or_die (qw(git stash));
1372     shell_or_die (qw(git clean -d -x));
1373     shell_or_die (qw(git fetch origin));
1374     shell_or_die (qw(git checkout), $commit);
1375 }
1376
1377 my $pwd;
1378 chomp ($pwd = `pwd`);
1379 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1380 mkdir $install_dir;
1381 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1382     # Old version
1383     shell_or_die ("./tests/autotests.sh", $install_dir);
1384 } elsif (-e "./install.sh") {
1385     shell_or_die ("./install.sh", $install_dir);
1386 }
1387
1388 if ($commit) {
1389     unlink "$destdir.commit.new";
1390     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1391     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1392 }
1393
1394 close L;
1395
1396 exit 0;
1397
1398 sub shell_or_die
1399 {
1400   if ($ENV{"DEBUG"}) {
1401     print STDERR "@_\n";
1402   }
1403   system (@_) == 0
1404       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1405 }