fix logging of script_parameters and resource_limits
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58 use IPC::System::Simple qw(capturex);
59
60 $ENV{"TMPDIR"} ||= "/tmp";
61 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
62 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
63 mkdir ($ENV{"CRUNCH_TMP"});
64
65 my $force_unlock;
66 my $jobspec;
67 my $resume_stash;
68 GetOptions('force-unlock' => \$force_unlock,
69            'job=s' => \$jobspec,
70            'resume-stash=s' => \$resume_stash,
71     );
72
73 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
74 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
75
76
77 $SIG{'HUP'} = sub
78 {
79   1;
80 };
81 $SIG{'USR1'} = sub
82 {
83   $main::ENV{CRUNCH_DEBUG} = 1;
84 };
85 $SIG{'USR2'} = sub
86 {
87   $main::ENV{CRUNCH_DEBUG} = 0;
88 };
89
90
91
92 my $arv = Arvados->new;
93 my $metastream = Warehouse::Stream->new;
94 $metastream->clear;
95 $metastream->write_start('log.txt');
96
97 my $User = {};
98 my $Job = {};
99 my $job_id;
100 my $dbh;
101 my $sth;
102 if ($job_has_uuid)
103 {
104   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
105   $User = $arv->{'users'}->{'current'}->execute;
106   if (!$force_unlock) {
107     if ($Job->{'is_locked_by'}) {
108       croak("Job is locked: " . $Job->{'is_locked_by'});
109     }
110     if ($Job->{'success'} ne undef) {
111       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
112     }
113     if ($Job->{'running'}) {
114       croak("Job 'running' flag is already set");
115     }
116     if ($Job->{'started_at'}) {
117       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
118     }
119   }
120 }
121 else
122 {
123   $Job = JSON::decode_json($jobspec);
124
125   if (!$resume_stash)
126   {
127     map { croak ("No $_ specified") unless $Job->{$_} }
128     qw(script script_version script_parameters);
129   }
130
131   if (!defined $Job->{'uuid'}) {
132     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
133   }
134 }
135 $job_id = $Job->{'uuid'};
136
137
138
139 $Job->{'resource_limits'} ||= {};
140 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
141 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
142
143
144 Log (undef, "check slurm allocation");
145 my @slot;
146 my @node;
147 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
148 my @sinfo;
149 if (!$have_slurm)
150 {
151   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
152   push @sinfo, "$localcpus localhost";
153 }
154 if (exists $ENV{SLURM_NODELIST})
155 {
156   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
157 }
158 foreach (@sinfo)
159 {
160   my ($ncpus, $slurm_nodelist) = split;
161   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
162
163   my @nodelist;
164   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
165   {
166     my $nodelist = $1;
167     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
168     {
169       my $ranges = $1;
170       foreach (split (",", $ranges))
171       {
172         my ($a, $b);
173         if (/(\d+)-(\d+)/)
174         {
175           $a = $1;
176           $b = $2;
177         }
178         else
179         {
180           $a = $_;
181           $b = $_;
182         }
183         push @nodelist, map {
184           my $n = $nodelist;
185           $n =~ s/\[[-,\d]+\]/$_/;
186           $n;
187         } ($a..$b);
188       }
189     }
190     else
191     {
192       push @nodelist, $nodelist;
193     }
194   }
195   foreach my $nodename (@nodelist)
196   {
197     Log (undef, "node $nodename - $ncpus slots");
198     my $node = { name => $nodename,
199                  ncpus => $ncpus,
200                  losing_streak => 0,
201                  hold_until => 0 };
202     foreach my $cpu (1..$ncpus)
203     {
204       push @slot, { node => $node,
205                     cpu => $cpu };
206     }
207   }
208   push @node, @nodelist;
209 }
210
211
212
213 # Ensure that we get one jobstep running on each allocated node before
214 # we start overloading nodes with concurrent steps
215
216 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
217
218
219
220 my $jobmanager_id;
221 if ($job_has_uuid)
222 {
223   # Claim this job, and make sure nobody else does
224
225   $Job->{'is_locked_by'} = $User->{'uuid'};
226   $Job->{'started_at'} = gmtime;
227   $Job->{'running'} = 1;
228   $Job->{'success'} = undef;
229   $Job->{'tasks_summary'} = { 'failed' => 0,
230                               'todo' => 1,
231                               'running' => 0,
232                               'done' => 0 };
233   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
234     croak("Error while updating / locking job");
235   }
236 }
237
238
239 Log (undef, "start");
240 $SIG{'INT'} = sub { $main::please_freeze = 1; };
241 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
242 $SIG{'TERM'} = \&croak;
243 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
244 $SIG{'ALRM'} = sub { $main::please_info = 1; };
245 $SIG{'CONT'} = sub { $main::please_continue = 1; };
246 $main::please_freeze = 0;
247 $main::please_info = 0;
248 $main::please_continue = 0;
249 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
250
251 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
252 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
253 $ENV{"JOB_UUID"} = $job_id;
254
255
256 my @jobstep;
257 my @jobstep_todo = ();
258 my @jobstep_done = ();
259 my @jobstep_tomerge = ();
260 my $jobstep_tomerge_level = 0;
261 my $squeue_checked;
262 my $squeue_kill_checked;
263 my $output_in_keep = 0;
264
265
266
267 if (defined $Job->{thawedfromkey})
268 {
269   thaw ($Job->{thawedfromkey});
270 }
271 else
272 {
273   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
274     'job_uuid' => $Job->{'uuid'},
275     'sequence' => 0,
276     'qsequence' => 0,
277     'parameters' => {},
278                                                           });
279   push @jobstep, { 'level' => 0,
280                    'attempts' => 0,
281                    'arvados_task' => $first_task,
282                  };
283   push @jobstep_todo, 0;
284 }
285
286
287 my $build_script;
288 do {
289   local $/ = undef;
290   $build_script = <DATA>;
291 };
292
293
294 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
295
296 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
297 if ($skip_install)
298 {
299   $ENV{"CRUNCH_SRC"} = $Job->{revision};
300 }
301 else
302 {
303   Log (undef, "Install revision ".$Job->{revision});
304   my $nodelist = join(",", @node);
305
306   # Clean out crunch_tmp/work and crunch_tmp/opt
307
308   my $cleanpid = fork();
309   if ($cleanpid == 0)
310   {
311     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
312           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
313     exit (1);
314   }
315   while (1)
316   {
317     last if $cleanpid == waitpid (-1, WNOHANG);
318     freeze_if_want_freeze ($cleanpid);
319     select (undef, undef, undef, 0.1);
320   }
321   Log (undef, "Clean-work-dir exited $?");
322
323   # Install requested code version
324
325   my @execargs;
326   my @srunargs = ("srun",
327                   "--nodelist=$nodelist",
328                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
329
330   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
331   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
332   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
333
334   my $commit;
335   my $treeish = $Job->{'script_version'};
336   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
337   # Todo: let script_version specify alternate repo
338   $ENV{"CRUNCH_SRC_URL"} = $repo;
339
340   # Create/update our clone of the remote git repo
341
342   if (!-d $ENV{"CRUNCH_SRC"}) {
343     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
344         or croak ("git clone $repo failed: exit ".($?>>8));
345     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
346   }
347   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
348
349   # If this looks like a subversion r#, look for it in git-svn commit messages
350
351   if ($treeish =~ m{^\d{1,4}$}) {
352     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
353     chomp $gitlog;
354     if ($gitlog =~ /^[a-f0-9]{40}$/) {
355       $commit = $gitlog;
356       Log (undef, "Using commit $commit for revision $treeish");
357     }
358   }
359
360   # If that didn't work, try asking git to look it up as a tree-ish.
361
362   if (!defined $commit) {
363
364     my $cooked_treeish = $treeish;
365     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
366       # Looks like a git branch name -- make sure git knows it's
367       # relative to the remote repo
368       $cooked_treeish = "origin/$treeish";
369     }
370
371     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
372     chomp $found;
373     if ($found =~ /^[0-9a-f]{40}$/s) {
374       $commit = $found;
375       if ($commit ne $treeish) {
376         # Make sure we record the real commit id in the database,
377         # frozentokey, logs, etc. -- instead of an abbreviation or a
378         # branch name which can become ambiguous or point to a
379         # different commit in the future.
380         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
381         Log (undef, "Using commit $commit for tree-ish $treeish");
382         if ($commit ne $treeish) {
383           $Job->{'script_version'} = $commit;
384           $Job->save() or croak("Error while updating job");
385         }
386       }
387     }
388   }
389
390   if (defined $commit) {
391     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
392     @execargs = ("sh", "-c",
393                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
394   }
395   else {
396     croak ("could not figure out commit id for $treeish");
397   }
398
399   my $installpid = fork();
400   if ($installpid == 0)
401   {
402     srun (\@srunargs, \@execargs, {}, $build_script);
403     exit (1);
404   }
405   while (1)
406   {
407     last if $installpid == waitpid (-1, WNOHANG);
408     freeze_if_want_freeze ($installpid);
409     select (undef, undef, undef, 0.1);
410   }
411   Log (undef, "Install exited $?");
412 }
413
414
415
416 foreach (qw (script script_version script_parameters resource_limits))
417 {
418   Log (undef,
419        "$_ " .
420        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
421 }
422 foreach (split (/\n/, $Job->{knobs}))
423 {
424   Log (undef, "knob " . $_);
425 }
426
427
428
429 my $success;
430
431
432
433 ONELEVEL:
434
435 my $thisround_succeeded = 0;
436 my $thisround_failed = 0;
437 my $thisround_failed_multiple = 0;
438
439 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
440                        or $a <=> $b } @jobstep_todo;
441 my $level = $jobstep[$jobstep_todo[0]]->{level};
442 Log (undef, "start level $level");
443
444
445
446 my %proc;
447 my @freeslot = (0..$#slot);
448 my @holdslot;
449 my %reader;
450 my $progress_is_dirty = 1;
451 my $progress_stats_updated = 0;
452
453 update_progress_stats();
454
455
456
457 THISROUND:
458 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
459 {
460   $main::please_continue = 0;
461
462   my $id = $jobstep_todo[$todo_ptr];
463   my $Jobstep = $jobstep[$id];
464   if ($Jobstep->{level} != $level)
465   {
466     next;
467   }
468   if ($Jobstep->{attempts} > 9)
469   {
470     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
471     $success = 0;
472     last THISROUND;
473   }
474
475   pipe $reader{$id}, "writer" or croak ($!);
476   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
477   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
478
479   my $childslot = $freeslot[0];
480   my $childnode = $slot[$childslot]->{node};
481   my $childslotname = join (".",
482                             $slot[$childslot]->{node}->{name},
483                             $slot[$childslot]->{cpu});
484   my $childpid = fork();
485   if ($childpid == 0)
486   {
487     $SIG{'INT'} = 'DEFAULT';
488     $SIG{'QUIT'} = 'DEFAULT';
489     $SIG{'TERM'} = 'DEFAULT';
490
491     foreach (values (%reader))
492     {
493       close($_);
494     }
495     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
496     open(STDOUT,">&writer");
497     open(STDERR,">&writer");
498
499     undef $dbh;
500     undef $sth;
501
502     delete $ENV{"GNUPGHOME"};
503     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
504     $ENV{"TASK_QSEQUENCE"} = $id;
505     $ENV{"TASK_SEQUENCE"} = $level;
506     $ENV{"JOB_SCRIPT"} = $Job->{script};
507     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
508       $param =~ tr/a-z/A-Z/;
509       $ENV{"JOB_PARAMETER_$param"} = $value;
510     }
511     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
512     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
513     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
514     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
515
516     $ENV{"GZIP"} = "-n";
517
518     my @srunargs = (
519       "srun",
520       "--nodelist=".$childnode->{name},
521       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
522       "--job-name=$job_id.$id.$$",
523         );
524     my @execargs = qw(sh);
525     my $build_script_to_send = "";
526     my $command =
527         "mkdir -p $ENV{CRUNCH_TMP}/revision "
528         ."&& cd $ENV{CRUNCH_TMP} ";
529     if ($build_script)
530     {
531       $build_script_to_send = $build_script;
532       $command .=
533           "&& perl -";
534     }
535     elsif (!$skip_install)
536     {
537       $command .=
538           "&& "
539           ."( "
540           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
541           ."|| "
542           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
543           ."    && ./installrevision "
544           ."  ) "
545           .") ";
546     }
547     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
548     $command .=
549         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
550     my @execargs = ('bash', '-c', $command);
551     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
552     exit (1);
553   }
554   close("writer");
555   if (!defined $childpid)
556   {
557     close $reader{$id};
558     delete $reader{$id};
559     next;
560   }
561   shift @freeslot;
562   $proc{$childpid} = { jobstep => $id,
563                        time => time,
564                        slot => $childslot,
565                        jobstepname => "$job_id.$id.$childpid",
566                      };
567   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
568   $slot[$childslot]->{pid} = $childpid;
569
570   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
571   Log ($id, "child $childpid started on $childslotname");
572   $Jobstep->{attempts} ++;
573   $Jobstep->{starttime} = time;
574   $Jobstep->{node} = $childnode->{name};
575   $Jobstep->{slotindex} = $childslot;
576   delete $Jobstep->{stderr};
577   delete $Jobstep->{finishtime};
578
579   splice @jobstep_todo, $todo_ptr, 1;
580   --$todo_ptr;
581
582   $progress_is_dirty = 1;
583
584   while (!@freeslot
585          ||
586          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
587   {
588     last THISROUND if $main::please_freeze;
589     if ($main::please_info)
590     {
591       $main::please_info = 0;
592       freeze();
593       collate_output();
594       save_meta(1);
595       update_progress_stats();
596     }
597     my $gotsome
598         = readfrompipes ()
599         + reapchildren ();
600     if (!$gotsome)
601     {
602       check_squeue();
603       update_progress_stats();
604       select (undef, undef, undef, 0.1);
605     }
606     elsif (time - $progress_stats_updated >= 30)
607     {
608       update_progress_stats();
609     }
610     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
611         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
612     {
613       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
614           .($thisround_failed+$thisround_succeeded)
615           .") -- giving up on this round";
616       Log (undef, $message);
617       last THISROUND;
618     }
619
620     # move slots from freeslot to holdslot (or back to freeslot) if necessary
621     for (my $i=$#freeslot; $i>=0; $i--) {
622       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
623         push @holdslot, (splice @freeslot, $i, 1);
624       }
625     }
626     for (my $i=$#holdslot; $i>=0; $i--) {
627       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
628         push @freeslot, (splice @holdslot, $i, 1);
629       }
630     }
631
632     # give up if no nodes are succeeding
633     if (!grep { $_->{node}->{losing_streak} == 0 &&
634                     $_->{node}->{hold_count} < 4 } @slot) {
635       my $message = "Every node has failed -- giving up on this round";
636       Log (undef, $message);
637       last THISROUND;
638     }
639   }
640 }
641
642
643 push @freeslot, splice @holdslot;
644 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
645
646
647 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
648 while (%proc)
649 {
650   goto THISROUND if $main::please_continue;
651   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
652   readfrompipes ();
653   if (!reapchildren())
654   {
655     check_squeue();
656     update_progress_stats();
657     select (undef, undef, undef, 0.1);
658     killem (keys %proc) if $main::please_freeze;
659   }
660 }
661
662 update_progress_stats();
663 freeze_if_want_freeze();
664
665
666 if (!defined $success)
667 {
668   if (@jobstep_todo &&
669       $thisround_succeeded == 0 &&
670       ($thisround_failed == 0 || $thisround_failed > 4))
671   {
672     my $message = "stop because $thisround_failed tasks failed and none succeeded";
673     Log (undef, $message);
674     $success = 0;
675   }
676   if (!@jobstep_todo)
677   {
678     $success = 1;
679   }
680 }
681
682 goto ONELEVEL if !defined $success;
683
684
685 release_allocation();
686 freeze();
687 $Job->{'output'} = &collate_output();
688 $Job->{'success'} = $Job->{'output'} && $success;
689 $Job->save;
690
691 if ($Job->{'output'})
692 {
693   eval {
694     my $manifest_text = capturex("whget", $Job->{'output'});
695     $arv->{'collections'}->{'create'}->execute('collection' => {
696       'uuid' => $Job->{'output'},
697       'manifest_text' => $manifest_text,
698     });
699   };
700   if ($@) {
701     Log (undef, "Failed to register output manifest: $@");
702   }
703 }
704
705 Log (undef, "finish");
706
707 save_meta();
708 exit 0;
709
710
711
712 sub update_progress_stats
713 {
714   $progress_stats_updated = time;
715   return if !$progress_is_dirty;
716   my ($todo, $done, $running) = (scalar @jobstep_todo,
717                                  scalar @jobstep_done,
718                                  scalar @slot - scalar @freeslot - scalar @holdslot);
719   $Job->{'tasks_summary'} ||= {};
720   $Job->{'tasks_summary'}->{'todo'} = $todo;
721   $Job->{'tasks_summary'}->{'done'} = $done;
722   $Job->{'tasks_summary'}->{'running'} = $running;
723   $Job->save;
724   Log (undef, "status: $done done, $running running, $todo todo");
725   $progress_is_dirty = 0;
726 }
727
728
729
730 sub reapchildren
731 {
732   my $pid = waitpid (-1, WNOHANG);
733   return 0 if $pid <= 0;
734
735   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
736                   . "."
737                   . $slot[$proc{$pid}->{slot}]->{cpu});
738   my $jobstepid = $proc{$pid}->{jobstep};
739   my $elapsed = time - $proc{$pid}->{time};
740   my $Jobstep = $jobstep[$jobstepid];
741
742   my $exitcode = $?;
743   my $exitinfo = "exit $exitcode";
744   $Jobstep->{'arvados_task'}->reload;
745   my $success = $Jobstep->{'arvados_task'}->{success};
746
747   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
748
749   if (!defined $success) {
750     # task did not indicate one way or the other --> fail
751     $Jobstep->{'arvados_task'}->{success} = 0;
752     $Jobstep->{'arvados_task'}->save;
753     $success = 0;
754   }
755
756   if (!$success)
757   {
758     my $no_incr_attempts;
759     $no_incr_attempts = 1 if $Jobstep->{node_fail};
760
761     ++$thisround_failed;
762     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
763
764     # Check for signs of a failed or misconfigured node
765     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
766         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
767       # Don't count this against jobstep failure thresholds if this
768       # node is already suspected faulty and srun exited quickly
769       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
770           $elapsed < 5 &&
771           $Jobstep->{attempts} > 1) {
772         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
773         $no_incr_attempts = 1;
774         --$Jobstep->{attempts};
775       }
776       ban_node_by_slot($proc{$pid}->{slot});
777     }
778
779     push @jobstep_todo, $jobstepid;
780     Log ($jobstepid, "failure in $elapsed seconds");
781
782     --$Jobstep->{attempts} if $no_incr_attempts;
783     $Job->{'tasks_summary'}->{'failed'}++;
784   }
785   else
786   {
787     ++$thisround_succeeded;
788     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
789     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
790     push @jobstep_done, $jobstepid;
791     Log ($jobstepid, "success in $elapsed seconds");
792   }
793   $Jobstep->{exitcode} = $exitcode;
794   $Jobstep->{finishtime} = time;
795   process_stderr ($jobstepid, $success);
796   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
797
798   close $reader{$jobstepid};
799   delete $reader{$jobstepid};
800   delete $slot[$proc{$pid}->{slot}]->{pid};
801   push @freeslot, $proc{$pid}->{slot};
802   delete $proc{$pid};
803
804   # Load new tasks
805   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
806     'where' => {
807       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
808     },
809     'order' => 'qsequence'
810   );
811   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
812     my $jobstep = {
813       'level' => $arvados_task->{'sequence'},
814       'attempts' => 0,
815       'arvados_task' => $arvados_task
816     };
817     push @jobstep, $jobstep;
818     push @jobstep_todo, $#jobstep;
819   }
820
821   $progress_is_dirty = 1;
822   1;
823 }
824
825
826 sub check_squeue
827 {
828   # return if the kill list was checked <4 seconds ago
829   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
830   {
831     return;
832   }
833   $squeue_kill_checked = time;
834
835   # use killem() on procs whose killtime is reached
836   for (keys %proc)
837   {
838     if (exists $proc{$_}->{killtime}
839         && $proc{$_}->{killtime} <= time)
840     {
841       killem ($_);
842     }
843   }
844
845   # return if the squeue was checked <60 seconds ago
846   if (defined $squeue_checked && $squeue_checked > time - 60)
847   {
848     return;
849   }
850   $squeue_checked = time;
851
852   if (!$have_slurm)
853   {
854     # here is an opportunity to check for mysterious problems with local procs
855     return;
856   }
857
858   # get a list of steps still running
859   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
860   chop @squeue;
861   if ($squeue[-1] ne "ok")
862   {
863     return;
864   }
865   pop @squeue;
866
867   # which of my jobsteps are running, according to squeue?
868   my %ok;
869   foreach (@squeue)
870   {
871     if (/^(\d+)\.(\d+) (\S+)/)
872     {
873       if ($1 eq $ENV{SLURM_JOBID})
874       {
875         $ok{$3} = 1;
876       }
877     }
878   }
879
880   # which of my active child procs (>60s old) were not mentioned by squeue?
881   foreach (keys %proc)
882   {
883     if ($proc{$_}->{time} < time - 60
884         && !exists $ok{$proc{$_}->{jobstepname}}
885         && !exists $proc{$_}->{killtime})
886     {
887       # kill this proc if it hasn't exited in 30 seconds
888       $proc{$_}->{killtime} = time + 30;
889     }
890   }
891 }
892
893
894 sub release_allocation
895 {
896   if ($have_slurm)
897   {
898     Log (undef, "release job allocation");
899     system "scancel $ENV{SLURM_JOBID}";
900   }
901 }
902
903
904 sub readfrompipes
905 {
906   my $gotsome = 0;
907   foreach my $job (keys %reader)
908   {
909     my $buf;
910     while (0 < sysread ($reader{$job}, $buf, 8192))
911     {
912       print STDERR $buf if $ENV{CRUNCH_DEBUG};
913       $jobstep[$job]->{stderr} .= $buf;
914       preprocess_stderr ($job);
915       if (length ($jobstep[$job]->{stderr}) > 16384)
916       {
917         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
918       }
919       $gotsome = 1;
920     }
921   }
922   return $gotsome;
923 }
924
925
926 sub preprocess_stderr
927 {
928   my $job = shift;
929
930   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
931     my $line = $1;
932     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
933     Log ($job, "stderr $line");
934     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
935       # whoa.
936       $main::please_freeze = 1;
937     }
938     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
939       $jobstep[$job]->{node_fail} = 1;
940       ban_node_by_slot($jobstep[$job]->{slotindex});
941     }
942   }
943 }
944
945
946 sub process_stderr
947 {
948   my $job = shift;
949   my $success = shift;
950   preprocess_stderr ($job);
951
952   map {
953     Log ($job, "stderr $_");
954   } split ("\n", $jobstep[$job]->{stderr});
955 }
956
957
958 sub collate_output
959 {
960   my $whc = Warehouse->new;
961   Log (undef, "collate");
962   $whc->write_start (1);
963   my $joboutput;
964   for (@jobstep)
965   {
966     next if (!exists $_->{'arvados_task'}->{output} ||
967              !$_->{'arvados_task'}->{'success'} ||
968              $_->{'exitcode'} != 0);
969     my $output = $_->{'arvados_task'}->{output};
970     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
971     {
972       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
973       $whc->write_data ($output);
974     }
975     elsif (@jobstep == 1)
976     {
977       $joboutput = $output;
978       $whc->write_finish;
979     }
980     elsif (defined (my $outblock = $whc->fetch_block ($output)))
981     {
982       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
983       $whc->write_data ($outblock);
984     }
985     else
986     {
987       my $errstr = $whc->errstr;
988       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
989       $success = 0;
990     }
991   }
992   $joboutput = $whc->write_finish if !defined $joboutput;
993   if ($joboutput)
994   {
995     Log (undef, "output $joboutput");
996     $Job->{'output'} = $joboutput;
997     $Job->save;
998   }
999   else
1000   {
1001     Log (undef, "output undef");
1002   }
1003   return $joboutput;
1004 }
1005
1006
1007 sub killem
1008 {
1009   foreach (@_)
1010   {
1011     my $sig = 2;                # SIGINT first
1012     if (exists $proc{$_}->{"sent_$sig"} &&
1013         time - $proc{$_}->{"sent_$sig"} > 4)
1014     {
1015       $sig = 15;                # SIGTERM if SIGINT doesn't work
1016     }
1017     if (exists $proc{$_}->{"sent_$sig"} &&
1018         time - $proc{$_}->{"sent_$sig"} > 4)
1019     {
1020       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1021     }
1022     if (!exists $proc{$_}->{"sent_$sig"})
1023     {
1024       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1025       kill $sig, $_;
1026       select (undef, undef, undef, 0.1);
1027       if ($sig == 2)
1028       {
1029         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1030       }
1031       $proc{$_}->{"sent_$sig"} = time;
1032       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1033     }
1034   }
1035 }
1036
1037
1038 sub fhbits
1039 {
1040   my($bits);
1041   for (@_) {
1042     vec($bits,fileno($_),1) = 1;
1043   }
1044   $bits;
1045 }
1046
1047
1048 sub Log                         # ($jobstep_id, $logmessage)
1049 {
1050   if ($_[1] =~ /\n/) {
1051     for my $line (split (/\n/, $_[1])) {
1052       Log ($_[0], $line);
1053     }
1054     return;
1055   }
1056   my $fh = select STDERR; $|=1; select $fh;
1057   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1058   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1059   $message .= "\n";
1060   my $datetime;
1061   if ($metastream || -t STDERR) {
1062     my @gmtime = gmtime;
1063     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1064                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1065   }
1066   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1067
1068   return if !$metastream;
1069   $metastream->write_data ($datetime . " " . $message);
1070 }
1071
1072
1073 sub reconnect_database
1074 {
1075   return if !$job_has_uuid;
1076   return if ($dbh && $dbh->do ("select now()"));
1077   for (1..16)
1078   {
1079     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1080     if ($dbh) {
1081       $dbh->{InactiveDestroy} = 1;
1082       return;
1083     }
1084     warn ($DBI::errstr);
1085     sleep $_;
1086   }
1087   croak ($DBI::errstr) if !$dbh;
1088 }
1089
1090
1091 sub dbh_do
1092 {
1093   return 1 if !$job_has_uuid;
1094   my $ret = $dbh->do (@_);
1095   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1096   reconnect_database();
1097   return $dbh->do (@_);
1098 }
1099
1100
1101 sub croak
1102 {
1103   my ($package, $file, $line) = caller;
1104   my $message = "@_ at $file line $line\n";
1105   Log (undef, $message);
1106   freeze() if @jobstep_todo;
1107   collate_output() if @jobstep_todo;
1108   cleanup();
1109   save_meta() if $metastream;
1110   die;
1111 }
1112
1113
1114 sub cleanup
1115 {
1116   return if !$job_has_uuid;
1117   $Job->reload;
1118   $Job->{'running'} = 0;
1119   $Job->{'success'} = 0;
1120   $Job->{'finished_at'} = gmtime;
1121   $Job->save;
1122 }
1123
1124
1125 sub save_meta
1126 {
1127   my $justcheckpoint = shift; # false if this will be the last meta saved
1128   my $m = $metastream;
1129   $m = $m->copy if $justcheckpoint;
1130   $m->write_finish;
1131   my $loglocator = $m->as_key;
1132   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1133   Log (undef, "meta key is $loglocator");
1134   $Job->{'log'} = $loglocator;
1135   $Job->save;
1136 }
1137
1138
1139 sub freeze_if_want_freeze
1140 {
1141   if ($main::please_freeze)
1142   {
1143     release_allocation();
1144     if (@_)
1145     {
1146       # kill some srun procs before freeze+stop
1147       map { $proc{$_} = {} } @_;
1148       while (%proc)
1149       {
1150         killem (keys %proc);
1151         select (undef, undef, undef, 0.1);
1152         my $died;
1153         while (($died = waitpid (-1, WNOHANG)) > 0)
1154         {
1155           delete $proc{$died};
1156         }
1157       }
1158     }
1159     freeze();
1160     collate_output();
1161     cleanup();
1162     save_meta();
1163     exit 0;
1164   }
1165 }
1166
1167
1168 sub freeze
1169 {
1170   Log (undef, "Freeze not implemented");
1171   return;
1172
1173   my $whc;                      # todo
1174   Log (undef, "freeze");
1175
1176   my $freezer = new Warehouse::Stream (whc => $whc);
1177   $freezer->clear;
1178   $freezer->name (".");
1179   $freezer->write_start ("state.txt");
1180
1181   $freezer->write_data (join ("\n",
1182                               "job $Job->{uuid}",
1183                               map
1184                               {
1185                                 $_ . "=" . freezequote($Job->{$_})
1186                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1187
1188   foreach my $Jobstep (@jobstep)
1189   {
1190     my $str = join ("\n",
1191                     map
1192                     {
1193                       $_ . "=" . freezequote ($Jobstep->{$_})
1194                     } grep {
1195                       $_ !~ /^stderr|slotindex|node_fail/
1196                     } keys %$Jobstep);
1197     $freezer->write_data ($str."\n\n");
1198   }
1199   if (@jobstep_tomerge)
1200   {
1201     $freezer->write_data
1202         ("merge $jobstep_tomerge_level "
1203          . freezequote (join ("\n",
1204                               map { freezequote ($_) } @jobstep_tomerge))
1205          . "\n\n");
1206   }
1207
1208   $freezer->write_finish;
1209   my $frozentokey = $freezer->as_key;
1210   undef $freezer;
1211   Log (undef, "frozento key is $frozentokey");
1212   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1213           $frozentokey, $job_id);
1214   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1215   Log (undef, "frozento+K key is $kfrozentokey");
1216   return $frozentokey;
1217 }
1218
1219
1220 sub thaw
1221 {
1222   croak ("Thaw not implemented");
1223
1224   my $whc;
1225   my $key = shift;
1226   Log (undef, "thaw from $key");
1227
1228   @jobstep = ();
1229   @jobstep_done = ();
1230   @jobstep_todo = ();
1231   @jobstep_tomerge = ();
1232   $jobstep_tomerge_level = 0;
1233   my $frozenjob = {};
1234
1235   my $stream = new Warehouse::Stream ( whc => $whc,
1236                                        hash => [split (",", $key)] );
1237   $stream->rewind;
1238   while (my $dataref = $stream->read_until (undef, "\n\n"))
1239   {
1240     if ($$dataref =~ /^job /)
1241     {
1242       foreach (split ("\n", $$dataref))
1243       {
1244         my ($k, $v) = split ("=", $_, 2);
1245         $frozenjob->{$k} = freezeunquote ($v);
1246       }
1247       next;
1248     }
1249
1250     if ($$dataref =~ /^merge (\d+) (.*)/)
1251     {
1252       $jobstep_tomerge_level = $1;
1253       @jobstep_tomerge
1254           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1255       next;
1256     }
1257
1258     my $Jobstep = { };
1259     foreach (split ("\n", $$dataref))
1260     {
1261       my ($k, $v) = split ("=", $_, 2);
1262       $Jobstep->{$k} = freezeunquote ($v) if $k;
1263     }
1264     $Jobstep->{attempts} = 0;
1265     push @jobstep, $Jobstep;
1266
1267     if ($Jobstep->{exitcode} eq "0")
1268     {
1269       push @jobstep_done, $#jobstep;
1270     }
1271     else
1272     {
1273       push @jobstep_todo, $#jobstep;
1274     }
1275   }
1276
1277   foreach (qw (script script_version script_parameters))
1278   {
1279     $Job->{$_} = $frozenjob->{$_};
1280   }
1281   $Job->save;
1282 }
1283
1284
1285 sub freezequote
1286 {
1287   my $s = shift;
1288   $s =~ s/\\/\\\\/g;
1289   $s =~ s/\n/\\n/g;
1290   return $s;
1291 }
1292
1293
1294 sub freezeunquote
1295 {
1296   my $s = shift;
1297   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1298   return $s;
1299 }
1300
1301
1302 sub srun
1303 {
1304   my $srunargs = shift;
1305   my $execargs = shift;
1306   my $opts = shift || {};
1307   my $stdin = shift;
1308   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1309   print STDERR (join (" ",
1310                       map { / / ? "'$_'" : $_ }
1311                       (@$args)),
1312                 "\n")
1313       if $ENV{CRUNCH_DEBUG};
1314
1315   if (defined $stdin) {
1316     my $child = open STDIN, "-|";
1317     defined $child or die "no fork: $!";
1318     if ($child == 0) {
1319       print $stdin or die $!;
1320       close STDOUT or die $!;
1321       exit 0;
1322     }
1323   }
1324
1325   return system (@$args) if $opts->{fork};
1326
1327   exec @$args;
1328   warn "ENV size is ".length(join(" ",%ENV));
1329   die "exec failed: $!: @$args";
1330 }
1331
1332
1333 sub ban_node_by_slot {
1334   # Don't start any new jobsteps on this node for 60 seconds
1335   my $slotid = shift;
1336   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1337   $slot[$slotid]->{node}->{hold_count}++;
1338   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1339 }
1340
1341 __DATA__
1342 #!/usr/bin/perl
1343
1344 # checkout-and-build
1345
1346 use Fcntl ':flock';
1347
1348 my $destdir = $ENV{"CRUNCH_SRC"};
1349 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1350 my $repo = $ENV{"CRUNCH_SRC_URL"};
1351
1352 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1353 flock L, LOCK_EX;
1354 if (readlink ("$destdir.commit") eq $commit) {
1355     exit 0;
1356 }
1357
1358 open STDOUT, ">", "$destdir.log";
1359 open STDERR, ">&STDOUT";
1360
1361 if (-d "$destdir/.git") {
1362     chdir $destdir or die "chdir $destdir: $!";
1363     if (0 != system (qw(git remote set-url origin), $repo)) {
1364         # awful... for old versions of git that don't know "remote set-url"
1365         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1366     }
1367 }
1368 elsif ($repo && $commit)
1369 {
1370     shell_or_die('git', 'clone', $repo, $destdir);
1371     chdir $destdir or die "chdir $destdir: $!";
1372     shell_or_die(qw(git config clean.requireForce false));
1373 }
1374 else {
1375     die "$destdir does not exist, and no repo/commit specified -- giving up";
1376 }
1377
1378 if ($commit) {
1379     unlink "$destdir.commit";
1380     shell_or_die (qw(git stash));
1381     shell_or_die (qw(git clean -d -x));
1382     shell_or_die (qw(git fetch origin));
1383     shell_or_die (qw(git checkout), $commit);
1384 }
1385
1386 my $pwd;
1387 chomp ($pwd = `pwd`);
1388 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1389 mkdir $install_dir;
1390 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1391     # Old version
1392     shell_or_die ("./tests/autotests.sh", $install_dir);
1393 } elsif (-e "./install.sh") {
1394     shell_or_die ("./install.sh", $install_dir);
1395 }
1396
1397 if ($commit) {
1398     unlink "$destdir.commit.new";
1399     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1400     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1401 }
1402
1403 close L;
1404
1405 exit 0;
1406
1407 sub shell_or_die
1408 {
1409   if ($ENV{"DEBUG"}) {
1410     print STDERR "@_\n";
1411   }
1412   system (@_) == 0
1413       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1414 }