fix up quoting
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --uuid x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 RUNNING JOBS LOCALLY
21
22 crunch-job's log messages appear on stderr along with the job tasks'
23 stderr streams. The log is saved in Keep at each checkpoint and when
24 the job finishes.
25
26 If the job succeeds, the job's output locator is printed on stdout.
27
28 While the job is running, the following signals are accepted:
29
30 =over
31
32 =item control-C, SIGINT, SIGQUIT
33
34 Save a checkpoint, terminate any job tasks that are running, and stop.
35
36 =item SIGALRM
37
38 Save a checkpoint and continue.
39
40 =item SIGHUP
41
42 Refresh node allocation (i.e., check whether any nodes have been added
43 or unallocated). Currently this is a no-op.
44
45 =back
46
47 =cut
48
49
50 use strict;
51 use DBI;
52 use POSIX ':sys_wait_h';
53 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
54 use Arvados;
55 use Getopt::Long;
56 use Warehouse;
57 use Warehouse::Stream;
58
59 $ENV{"TMPDIR"} ||= "/tmp";
60 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
61 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
62 mkdir ($ENV{"CRUNCH_TMP"});
63
64 my $force_unlock;
65 my $jobspec;
66 my $resume_stash;
67 GetOptions('force-unlock' => \$force_unlock,
68            'job=s' => \$jobspec,
69            'resume-stash=s' => \$resume_stash,
70     );
71
72 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
73 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
74
75
76 $SIG{'HUP'} = sub
77 {
78   1;
79 };
80 $SIG{'USR1'} = sub
81 {
82   $main::ENV{CRUNCH_DEBUG} = 1;
83 };
84 $SIG{'USR2'} = sub
85 {
86   $main::ENV{CRUNCH_DEBUG} = 0;
87 };
88
89
90
91 my $arv = Arvados->new;
92 my $metastream = Warehouse::Stream->new;
93 $metastream->clear;
94 $metastream->write_start('log.txt');
95
96 my $User = {};
97 my $Job = {};
98 my $job_id;
99 my $dbh;
100 my $sth;
101 if ($job_has_uuid)
102 {
103   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
104   $User = $arv->{'users'}->{'current'}->execute;
105   if (!$force_unlock) {
106     if ($Job->{'is_locked_by'}) {
107       croak("Job is locked: " . $Job->{'is_locked_by'});
108     }
109     if ($Job->{'success'} ne undef) {
110       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
111     }
112     if ($Job->{'running'}) {
113       croak("Job 'running' flag is already set");
114     }
115     if ($Job->{'started_at'}) {
116       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
117     }
118   }
119 }
120 else
121 {
122   $Job = JSON::decode_json($jobspec);
123
124   if (!$resume_stash)
125   {
126     map { croak ("No $_ specified") unless $Job->{$_} }
127     qw(script script_version script_parameters);
128   }
129
130   if (!defined $Job->{'uuid'}) {
131     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
132   }
133 }
134 $job_id = $Job->{'uuid'};
135
136
137
138 $Job->{'resource_limits'} ||= {};
139 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
140 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
141
142
143 Log (undef, "check slurm allocation");
144 my @slot;
145 my @node;
146 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
147 my @sinfo;
148 if (!$have_slurm)
149 {
150   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
151   push @sinfo, "$localcpus localhost";
152 }
153 if (exists $ENV{SLURM_NODELIST})
154 {
155   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
156 }
157 foreach (@sinfo)
158 {
159   my ($ncpus, $slurm_nodelist) = split;
160   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
161
162   my @nodelist;
163   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
164   {
165     my $nodelist = $1;
166     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
167     {
168       my $ranges = $1;
169       foreach (split (",", $ranges))
170       {
171         my ($a, $b);
172         if (/(\d+)-(\d+)/)
173         {
174           $a = $1;
175           $b = $2;
176         }
177         else
178         {
179           $a = $_;
180           $b = $_;
181         }
182         push @nodelist, map {
183           my $n = $nodelist;
184           $n =~ s/\[[-,\d]+\]/$_/;
185           $n;
186         } ($a..$b);
187       }
188     }
189     else
190     {
191       push @nodelist, $nodelist;
192     }
193   }
194   foreach my $nodename (@nodelist)
195   {
196     Log (undef, "node $nodename - $ncpus slots");
197     my $node = { name => $nodename,
198                  ncpus => $ncpus,
199                  losing_streak => 0,
200                  hold_until => 0 };
201     foreach my $cpu (1..$ncpus)
202     {
203       push @slot, { node => $node,
204                     cpu => $cpu };
205     }
206   }
207   push @node, @nodelist;
208 }
209
210
211
212 # Ensure that we get one jobstep running on each allocated node before
213 # we start overloading nodes with concurrent steps
214
215 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
216
217
218
219 my $jobmanager_id;
220 if ($job_has_uuid)
221 {
222   # Claim this job, and make sure nobody else does
223
224   $Job->{'is_locked_by'} = $User->{'uuid'};
225   $Job->{'started_at'} = time;
226   $Job->{'running'} = 1;
227   $Job->{'success'} = undef;
228   $Job->{'tasks_summary'} = { 'failed' => 0,
229                               'todo' => 1,
230                               'running' => 0,
231                               'done' => 0 };
232   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
233     croak("Error while updating / locking job");
234   }
235 }
236
237
238 Log (undef, "start");
239 $SIG{'INT'} = sub { $main::please_freeze = 1; };
240 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
241 $SIG{'TERM'} = \&croak;
242 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
243 $SIG{'ALRM'} = sub { $main::please_info = 1; };
244 $SIG{'CONT'} = sub { $main::please_continue = 1; };
245 $main::please_freeze = 0;
246 $main::please_info = 0;
247 $main::please_continue = 0;
248 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
249
250 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
251 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
252 $ENV{"JOB_UUID"} = $job_id;
253
254
255 my @jobstep;
256 my @jobstep_todo = ();
257 my @jobstep_done = ();
258 my @jobstep_tomerge = ();
259 my $jobstep_tomerge_level = 0;
260 my $squeue_checked;
261 my $squeue_kill_checked;
262 my $output_in_keep = 0;
263
264
265
266 if (defined $Job->{thawedfromkey})
267 {
268   thaw ($Job->{thawedfromkey});
269 }
270 else
271 {
272   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
273     'job_uuid' => $Job->{'uuid'},
274     'sequence' => 0,
275     'qsequence' => 0,
276     'parameters' => {},
277                                                           });
278   push @jobstep, { 'level' => 0,
279                    'attempts' => 0,
280                    'arvados_task' => $first_task,
281                  };
282   push @jobstep_todo, 0;
283 }
284
285
286 my $build_script;
287 do {
288   local $/ = undef;
289   $build_script = <DATA>;
290 };
291
292
293 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
294
295 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
296 if ($skip_install)
297 {
298   $ENV{"CRUNCH_SRC"} = $Job->{revision};
299 }
300 else
301 {
302   Log (undef, "Install revision ".$Job->{revision});
303   my $nodelist = join(",", @node);
304
305   # Clean out crunch_tmp/work and crunch_tmp/opt
306
307   my $cleanpid = fork();
308   if ($cleanpid == 0)
309   {
310     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
311           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
312     exit (1);
313   }
314   while (1)
315   {
316     last if $cleanpid == waitpid (-1, WNOHANG);
317     freeze_if_want_freeze ($cleanpid);
318     select (undef, undef, undef, 0.1);
319   }
320   Log (undef, "Clean-work-dir exited $?");
321
322   # Install requested code version
323
324   my @execargs;
325   my @srunargs = ("srun",
326                   "--nodelist=$nodelist",
327                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
328
329   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
331   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
332
333   my $commit;
334   my $treeish = $Job->{'script_version'};
335   my $repo = $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
336   # Todo: let script_version specify alternate repo
337   $ENV{"CRUNCH_SRC_URL"} = $repo;
338
339   # Create/update our clone of the remote git repo
340
341   if (!-d $ENV{"CRUNCH_SRC"}) {
342     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
343         or croak ("git clone $repo failed: exit ".($?>>8));
344     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
345   }
346   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
347
348   # If this looks like a subversion r#, look for it in git-svn commit messages
349
350   if ($treeish =~ m{^\d{1,4}$}) {
351     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
352     chomp $gitlog;
353     if ($gitlog =~ /^[a-f0-9]{40}$/) {
354       $commit = $gitlog;
355       Log (undef, "Using commit $commit for revision $treeish");
356     }
357   }
358
359   # If that didn't work, try asking git to look it up as a tree-ish.
360
361   if (!defined $commit) {
362
363     my $cooked_treeish = $treeish;
364     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
365       # Looks like a git branch name -- make sure git knows it's
366       # relative to the remote repo
367       $cooked_treeish = "origin/$treeish";
368     }
369
370     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
371     chomp $found;
372     if ($found =~ /^[0-9a-f]{40}$/s) {
373       $commit = $found;
374       if ($commit ne $treeish) {
375         # Make sure we record the real commit id in the database,
376         # frozentokey, logs, etc. -- instead of an abbreviation or a
377         # branch name which can become ambiguous or point to a
378         # different commit in the future.
379         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
380         Log (undef, "Using commit $commit for tree-ish $treeish");
381         if ($commit ne $treeish) {
382           $Job->{'script_version'} = $commit;
383           $Job->save() or croak("Error while updating job");
384         }
385       }
386     }
387   }
388
389   if (defined $commit) {
390     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
391     @execargs = ("sh", "-c",
392                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
393   }
394   else {
395     croak ("could not figure out commit id for $treeish");
396   }
397
398   my $installpid = fork();
399   if ($installpid == 0)
400   {
401     srun (\@srunargs, \@execargs, {}, $build_script);
402     exit (1);
403   }
404   while (1)
405   {
406     last if $installpid == waitpid (-1, WNOHANG);
407     freeze_if_want_freeze ($installpid);
408     select (undef, undef, undef, 0.1);
409   }
410   Log (undef, "Install exited $?");
411 }
412
413
414
415 foreach (qw (script script_version script_parameters resource_limits))
416 {
417   Log (undef, $_ . " " . $Job->{$_});
418 }
419 foreach (split (/\n/, $Job->{knobs}))
420 {
421   Log (undef, "knob " . $_);
422 }
423
424
425
426 my $success;
427
428
429
430 ONELEVEL:
431
432 my $thisround_succeeded = 0;
433 my $thisround_failed = 0;
434 my $thisround_failed_multiple = 0;
435
436 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
437                        or $a <=> $b } @jobstep_todo;
438 my $level = $jobstep[$jobstep_todo[0]]->{level};
439 Log (undef, "start level $level");
440
441
442
443 my %proc;
444 my @freeslot = (0..$#slot);
445 my @holdslot;
446 my %reader;
447 my $progress_is_dirty = 1;
448 my $progress_stats_updated = 0;
449
450 update_progress_stats();
451
452
453
454 THISROUND:
455 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
456 {
457   $main::please_continue = 0;
458
459   my $id = $jobstep_todo[$todo_ptr];
460   my $Jobstep = $jobstep[$id];
461   if ($Jobstep->{level} != $level)
462   {
463     next;
464   }
465   if ($Jobstep->{attempts} > 9)
466   {
467     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
468     $success = 0;
469     last THISROUND;
470   }
471
472   pipe $reader{$id}, "writer" or croak ($!);
473   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
474   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
475
476   my $childslot = $freeslot[0];
477   my $childnode = $slot[$childslot]->{node};
478   my $childslotname = join (".",
479                             $slot[$childslot]->{node}->{name},
480                             $slot[$childslot]->{cpu});
481   my $childpid = fork();
482   if ($childpid == 0)
483   {
484     $SIG{'INT'} = 'DEFAULT';
485     $SIG{'QUIT'} = 'DEFAULT';
486     $SIG{'TERM'} = 'DEFAULT';
487
488     foreach (values (%reader))
489     {
490       close($_);
491     }
492     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
493     open(STDOUT,">&writer");
494     open(STDERR,">&writer");
495
496     undef $dbh;
497     undef $sth;
498
499     delete $ENV{"GNUPGHOME"};
500     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
501     $ENV{"TASK_QSEQUENCE"} = $id;
502     $ENV{"TASK_SEQUENCE"} = $level;
503     $ENV{"JOB_SCRIPT"} = $Job->{script};
504     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
505       $param =~ tr/a-z/A-Z/;
506       $ENV{"JOB_PARAMETER_$param"} = $value;
507     }
508     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
509     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
510     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
511     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
512
513     $ENV{"GZIP"} = "-n";
514
515     my @srunargs = (
516       "srun",
517       "--nodelist=".$childnode->{name},
518       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
519       "--job-name=$job_id.$id.$$",
520         );
521     my @execargs = qw(sh);
522     my $build_script_to_send = "";
523     my $command =
524         "mkdir -p $ENV{CRUNCH_TMP}/revision "
525         ."&& cd $ENV{CRUNCH_TMP} ";
526     if ($build_script)
527     {
528       $build_script_to_send = $build_script;
529       $command .=
530           "&& perl -";
531     }
532     elsif (!$skip_install)
533     {
534       $command .=
535           "&& "
536           ."( "
537           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
538           ."|| "
539           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
540           ."    && ./installrevision "
541           ."  ) "
542           .") ";
543     }
544     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
545     $command .=
546         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
547     my @execargs = ('bash', '-c', $command);
548     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
549     exit (1);
550   }
551   close("writer");
552   if (!defined $childpid)
553   {
554     close $reader{$id};
555     delete $reader{$id};
556     next;
557   }
558   shift @freeslot;
559   $proc{$childpid} = { jobstep => $id,
560                        time => time,
561                        slot => $childslot,
562                        jobstepname => "$job_id.$id.$childpid",
563                      };
564   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
565   $slot[$childslot]->{pid} = $childpid;
566
567   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
568   Log ($id, "child $childpid started on $childslotname");
569   $Jobstep->{attempts} ++;
570   $Jobstep->{starttime} = time;
571   $Jobstep->{node} = $childnode->{name};
572   $Jobstep->{slotindex} = $childslot;
573   delete $Jobstep->{stderr};
574   delete $Jobstep->{finishtime};
575
576   splice @jobstep_todo, $todo_ptr, 1;
577   --$todo_ptr;
578
579   $progress_is_dirty = 1;
580
581   while (!@freeslot
582          ||
583          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
584   {
585     last THISROUND if $main::please_freeze;
586     if ($main::please_info)
587     {
588       $main::please_info = 0;
589       freeze();
590       collate_output();
591       save_meta(1);
592       update_progress_stats();
593     }
594     my $gotsome
595         = readfrompipes ()
596         + reapchildren ();
597     if (!$gotsome)
598     {
599       check_squeue();
600       update_progress_stats();
601       select (undef, undef, undef, 0.1);
602     }
603     elsif (time - $progress_stats_updated >= 30)
604     {
605       update_progress_stats();
606     }
607     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
608         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
609     {
610       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
611           .($thisround_failed+$thisround_succeeded)
612           .") -- giving up on this round";
613       Log (undef, $message);
614       last THISROUND;
615     }
616
617     # move slots from freeslot to holdslot (or back to freeslot) if necessary
618     for (my $i=$#freeslot; $i>=0; $i--) {
619       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
620         push @holdslot, (splice @freeslot, $i, 1);
621       }
622     }
623     for (my $i=$#holdslot; $i>=0; $i--) {
624       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
625         push @freeslot, (splice @holdslot, $i, 1);
626       }
627     }
628
629     # give up if no nodes are succeeding
630     if (!grep { $_->{node}->{losing_streak} == 0 } @slot) {
631       my $message = "Every node has failed -- giving up on this round";
632       Log (undef, $message);
633       last THISROUND;
634     }
635   }
636 }
637
638
639 push @freeslot, splice @holdslot;
640 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
641
642
643 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
644 while (%proc)
645 {
646   goto THISROUND if $main::please_continue;
647   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
648   readfrompipes ();
649   if (!reapchildren())
650   {
651     check_squeue();
652     update_progress_stats();
653     select (undef, undef, undef, 0.1);
654     killem (keys %proc) if $main::please_freeze;
655   }
656 }
657
658 update_progress_stats();
659 freeze_if_want_freeze();
660
661
662 if (!defined $success)
663 {
664   if (@jobstep_todo &&
665       $thisround_succeeded == 0 &&
666       ($thisround_failed == 0 || $thisround_failed > 4))
667   {
668     my $message = "stop because $thisround_failed tasks failed and none succeeded";
669     Log (undef, $message);
670     $success = 0;
671   }
672   if (!@jobstep_todo)
673   {
674     $success = 1;
675   }
676 }
677
678 goto ONELEVEL if !defined $success;
679
680
681 release_allocation();
682 freeze();
683 $Job->{'output'} = &collate_output();
684 $Job->{'success'} = $Job->{'output'} && $success;
685 $Job->save;
686
687 if ($Job->{'output'})
688 {
689   $arv->{'collections'}->{'create'}->execute('collection' => {
690     'uuid' => $Job->{'output'},
691     'manifest_text' => system("whget", $Job->{'output'}),
692   });
693 }
694
695 Log (undef, "finish");
696
697 save_meta();
698 exit 0;
699
700
701
702 sub update_progress_stats
703 {
704   $progress_stats_updated = time;
705   return if !$progress_is_dirty;
706   my ($todo, $done, $running) = (scalar @jobstep_todo,
707                                  scalar @jobstep_done,
708                                  scalar @slot - scalar @freeslot - scalar @holdslot);
709   $Job->{'tasks_summary'} ||= {};
710   $Job->{'tasks_summary'}->{'todo'} = $todo;
711   $Job->{'tasks_summary'}->{'done'} = $done;
712   $Job->{'tasks_summary'}->{'running'} = $running;
713   $Job->save;
714   Log (undef, "status: $done done, $running running, $todo todo");
715   $progress_is_dirty = 0;
716 }
717
718
719
720 sub reapchildren
721 {
722   my $pid = waitpid (-1, WNOHANG);
723   return 0 if $pid <= 0;
724
725   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
726                   . "."
727                   . $slot[$proc{$pid}->{slot}]->{cpu});
728   my $jobstepid = $proc{$pid}->{jobstep};
729   my $elapsed = time - $proc{$pid}->{time};
730   my $Jobstep = $jobstep[$jobstepid];
731
732   my $exitcode = $?;
733   my $exitinfo = "exit $exitcode";
734   $Jobstep->{'arvados_task'}->reload;
735   my $success = $Jobstep->{'arvados_task'}->{success};
736
737   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
738
739   if (!defined $success) {
740     # task did not indicate one way or the other --> fail
741     $Jobstep->{'arvados_task'}->{success} = 0;
742     $Jobstep->{'arvados_task'}->save;
743     $success = 0;
744   }
745
746   if (!$success)
747   {
748     --$Jobstep->{attempts} if $Jobstep->{node_fail};
749     ++$thisround_failed;
750     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
751
752     # Check for signs of a failed or misconfigured node
753     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
754         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
755       # Don't count this against jobstep failure thresholds if this
756       # node is already suspected faulty and srun exited quickly
757       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
758           $elapsed < 5 &&
759           $Jobstep->{attempts} > 1) {
760         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
761         --$Jobstep->{attempts};
762       }
763       ban_node_by_slot($proc{$pid}->{slot});
764     }
765
766     push @jobstep_todo, $jobstepid;
767     Log ($jobstepid, "failure in $elapsed seconds");
768     $Job->{'tasks_summary'}->{'failed'}++;
769   }
770   else
771   {
772     ++$thisround_succeeded;
773     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
774     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
775     push @jobstep_done, $jobstepid;
776     Log ($jobstepid, "success in $elapsed seconds");
777   }
778   $Jobstep->{exitcode} = $exitcode;
779   $Jobstep->{finishtime} = time;
780   process_stderr ($jobstepid, $success);
781   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
782
783   close $reader{$jobstepid};
784   delete $reader{$jobstepid};
785   delete $slot[$proc{$pid}->{slot}]->{pid};
786   push @freeslot, $proc{$pid}->{slot};
787   delete $proc{$pid};
788
789   # Load new tasks
790   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
791     'where' => {
792       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
793     },
794     'order' => 'qsequence'
795   );
796   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
797     my $jobstep = {
798       'level' => $arvados_task->{'sequence'},
799       'attempts' => 0,
800       'arvados_task' => $arvados_task
801     };
802     push @jobstep, $jobstep;
803     push @jobstep_todo, $#jobstep;
804   }
805
806   $progress_is_dirty = 1;
807   1;
808 }
809
810
811 sub check_squeue
812 {
813   # return if the kill list was checked <4 seconds ago
814   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
815   {
816     return;
817   }
818   $squeue_kill_checked = time;
819
820   # use killem() on procs whose killtime is reached
821   for (keys %proc)
822   {
823     if (exists $proc{$_}->{killtime}
824         && $proc{$_}->{killtime} <= time)
825     {
826       killem ($_);
827     }
828   }
829
830   # return if the squeue was checked <60 seconds ago
831   if (defined $squeue_checked && $squeue_checked > time - 60)
832   {
833     return;
834   }
835   $squeue_checked = time;
836
837   if (!$have_slurm)
838   {
839     # here is an opportunity to check for mysterious problems with local procs
840     return;
841   }
842
843   # get a list of steps still running
844   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
845   chop @squeue;
846   if ($squeue[-1] ne "ok")
847   {
848     return;
849   }
850   pop @squeue;
851
852   # which of my jobsteps are running, according to squeue?
853   my %ok;
854   foreach (@squeue)
855   {
856     if (/^(\d+)\.(\d+) (\S+)/)
857     {
858       if ($1 eq $ENV{SLURM_JOBID})
859       {
860         $ok{$3} = 1;
861       }
862     }
863   }
864
865   # which of my active child procs (>60s old) were not mentioned by squeue?
866   foreach (keys %proc)
867   {
868     if ($proc{$_}->{time} < time - 60
869         && !exists $ok{$proc{$_}->{jobstepname}}
870         && !exists $proc{$_}->{killtime})
871     {
872       # kill this proc if it hasn't exited in 30 seconds
873       $proc{$_}->{killtime} = time + 30;
874     }
875   }
876 }
877
878
879 sub release_allocation
880 {
881   if ($have_slurm)
882   {
883     Log (undef, "release job allocation");
884     system "scancel $ENV{SLURM_JOBID}";
885   }
886 }
887
888
889 sub readfrompipes
890 {
891   my $gotsome = 0;
892   foreach my $job (keys %reader)
893   {
894     my $buf;
895     while (0 < sysread ($reader{$job}, $buf, 8192))
896     {
897       print STDERR $buf if $ENV{CRUNCH_DEBUG};
898       $jobstep[$job]->{stderr} .= $buf;
899       preprocess_stderr ($job);
900       if (length ($jobstep[$job]->{stderr}) > 16384)
901       {
902         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
903       }
904       $gotsome = 1;
905     }
906   }
907   return $gotsome;
908 }
909
910
911 sub preprocess_stderr
912 {
913   my $job = shift;
914
915   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
916     my $line = $1;
917     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
918     Log ($job, "stderr $line");
919     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired) /) {
920       # whoa.
921       $main::please_freeze = 1;
922     }
923     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
924       $jobstep[$job]->{node_fail} = 1;
925       ban_node_by_slot($jobstep[$job]->{slotindex});
926     }
927   }
928 }
929
930
931 sub process_stderr
932 {
933   my $job = shift;
934   my $success = shift;
935   preprocess_stderr ($job);
936
937   map {
938     Log ($job, "stderr $_");
939   } split ("\n", $jobstep[$job]->{stderr});
940 }
941
942
943 sub collate_output
944 {
945   my $whc = Warehouse->new;
946   Log (undef, "collate");
947   $whc->write_start (1);
948   my $joboutput;
949   for (@jobstep)
950   {
951     next if (!exists $_->{'arvados_task'}->{output} ||
952              !$_->{'arvados_task'}->{'success'} ||
953              $_->{'exitcode'} != 0);
954     my $output = $_->{'arvados_task'}->{output};
955     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
956     {
957       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
958       $whc->write_data ($output);
959     }
960     elsif (@jobstep == 1)
961     {
962       $joboutput = $output;
963       $whc->write_finish;
964     }
965     elsif (defined (my $outblock = $whc->fetch_block ($output)))
966     {
967       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
968       $whc->write_data ($outblock);
969     }
970     else
971     {
972       my $errstr = $whc->errstr;
973       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
974       $success = 0;
975     }
976   }
977   $joboutput = $whc->write_finish if !defined $joboutput;
978   if ($joboutput)
979   {
980     Log (undef, "output $joboutput");
981     $Job->{'output'} = $joboutput;
982     $Job->save;
983   }
984   else
985   {
986     Log (undef, "output undef");
987   }
988   return $joboutput;
989 }
990
991
992 sub killem
993 {
994   foreach (@_)
995   {
996     my $sig = 2;                # SIGINT first
997     if (exists $proc{$_}->{"sent_$sig"} &&
998         time - $proc{$_}->{"sent_$sig"} > 4)
999     {
1000       $sig = 15;                # SIGTERM if SIGINT doesn't work
1001     }
1002     if (exists $proc{$_}->{"sent_$sig"} &&
1003         time - $proc{$_}->{"sent_$sig"} > 4)
1004     {
1005       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1006     }
1007     if (!exists $proc{$_}->{"sent_$sig"})
1008     {
1009       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1010       kill $sig, $_;
1011       select (undef, undef, undef, 0.1);
1012       if ($sig == 2)
1013       {
1014         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1015       }
1016       $proc{$_}->{"sent_$sig"} = time;
1017       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1018     }
1019   }
1020 }
1021
1022
1023 sub fhbits
1024 {
1025   my($bits);
1026   for (@_) {
1027     vec($bits,fileno($_),1) = 1;
1028   }
1029   $bits;
1030 }
1031
1032
1033 sub Log                         # ($jobstep_id, $logmessage)
1034 {
1035   if ($_[1] =~ /\n/) {
1036     for my $line (split (/\n/, $_[1])) {
1037       Log ($_[0], $line);
1038     }
1039     return;
1040   }
1041   my $fh = select STDERR; $|=1; select $fh;
1042   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1043   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1044   $message .= "\n";
1045   my $datetime;
1046   if ($metastream || -t STDERR) {
1047     my @gmtime = gmtime;
1048     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1049                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1050   }
1051   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1052
1053   return if !$metastream;
1054   $metastream->write_data ($datetime . " " . $message);
1055 }
1056
1057
1058 sub reconnect_database
1059 {
1060   return if !$job_has_uuid;
1061   return if ($dbh && $dbh->do ("select now()"));
1062   for (1..16)
1063   {
1064     $dbh = DBI->connect(@$Warehouse::Server::DatabaseDSN);
1065     if ($dbh) {
1066       $dbh->{InactiveDestroy} = 1;
1067       return;
1068     }
1069     warn ($DBI::errstr);
1070     sleep $_;
1071   }
1072   croak ($DBI::errstr) if !$dbh;
1073 }
1074
1075
1076 sub dbh_do
1077 {
1078   return 1 if !$job_has_uuid;
1079   my $ret = $dbh->do (@_);
1080   return $ret unless (!$ret && $DBI::errstr =~ /server has gone away/);
1081   reconnect_database();
1082   return $dbh->do (@_);
1083 }
1084
1085
1086 sub croak
1087 {
1088   my ($package, $file, $line) = caller;
1089   my $message = "@_ at $file line $line\n";
1090   Log (undef, $message);
1091   freeze() if @jobstep_todo;
1092   collate_output() if @jobstep_todo;
1093   cleanup();
1094   save_meta() if $metastream;
1095   die;
1096 }
1097
1098
1099 sub cleanup
1100 {
1101   return if !$job_has_uuid;
1102   $Job->reload;
1103   $Job->{'running'} = 0;
1104   $Job->{'success'} = 0;
1105   $Job->{'finished_at'} = time;
1106   $Job->save;
1107 }
1108
1109
1110 sub save_meta
1111 {
1112   my $justcheckpoint = shift; # false if this will be the last meta saved
1113   my $m = $metastream;
1114   $m = $m->copy if $justcheckpoint;
1115   $m->write_finish;
1116   my $loglocator = $m->as_key;
1117   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1118   Log (undef, "meta key is $loglocator");
1119   $Job->{'log'} = $loglocator;
1120   $Job->save;
1121 }
1122
1123
1124 sub freeze_if_want_freeze
1125 {
1126   if ($main::please_freeze)
1127   {
1128     release_allocation();
1129     if (@_)
1130     {
1131       # kill some srun procs before freeze+stop
1132       map { $proc{$_} = {} } @_;
1133       while (%proc)
1134       {
1135         killem (keys %proc);
1136         select (undef, undef, undef, 0.1);
1137         my $died;
1138         while (($died = waitpid (-1, WNOHANG)) > 0)
1139         {
1140           delete $proc{$died};
1141         }
1142       }
1143     }
1144     freeze();
1145     collate_output();
1146     cleanup();
1147     save_meta();
1148     exit 0;
1149   }
1150 }
1151
1152
1153 sub freeze
1154 {
1155   Log (undef, "Freeze not implemented");
1156   return;
1157
1158   my $whc;                      # todo
1159   Log (undef, "freeze");
1160
1161   my $freezer = new Warehouse::Stream (whc => $whc);
1162   $freezer->clear;
1163   $freezer->name (".");
1164   $freezer->write_start ("state.txt");
1165
1166   $freezer->write_data (join ("\n",
1167                               "job $Job->{uuid}",
1168                               map
1169                               {
1170                                 $_ . "=" . freezequote($Job->{$_})
1171                               } grep { $_ ne "id" } keys %$Job) . "\n\n");
1172
1173   foreach my $Jobstep (@jobstep)
1174   {
1175     my $str = join ("\n",
1176                     map
1177                     {
1178                       $_ . "=" . freezequote ($Jobstep->{$_})
1179                     } grep {
1180                       $_ !~ /^stderr|slotindex|node_fail/
1181                     } keys %$Jobstep);
1182     $freezer->write_data ($str."\n\n");
1183   }
1184   if (@jobstep_tomerge)
1185   {
1186     $freezer->write_data
1187         ("merge $jobstep_tomerge_level "
1188          . freezequote (join ("\n",
1189                               map { freezequote ($_) } @jobstep_tomerge))
1190          . "\n\n");
1191   }
1192
1193   $freezer->write_finish;
1194   my $frozentokey = $freezer->as_key;
1195   undef $freezer;
1196   Log (undef, "frozento key is $frozentokey");
1197   dbh_do ("update mrjob set frozentokey=? where id=?", undef,
1198           $frozentokey, $job_id);
1199   my $kfrozentokey = $whc->store_in_keep (hash => $frozentokey, nnodes => 3);
1200   Log (undef, "frozento+K key is $kfrozentokey");
1201   return $frozentokey;
1202 }
1203
1204
1205 sub thaw
1206 {
1207   croak ("Thaw not implemented");
1208
1209   my $whc;
1210   my $key = shift;
1211   Log (undef, "thaw from $key");
1212
1213   @jobstep = ();
1214   @jobstep_done = ();
1215   @jobstep_todo = ();
1216   @jobstep_tomerge = ();
1217   $jobstep_tomerge_level = 0;
1218   my $frozenjob = {};
1219
1220   my $stream = new Warehouse::Stream ( whc => $whc,
1221                                        hash => [split (",", $key)] );
1222   $stream->rewind;
1223   while (my $dataref = $stream->read_until (undef, "\n\n"))
1224   {
1225     if ($$dataref =~ /^job /)
1226     {
1227       foreach (split ("\n", $$dataref))
1228       {
1229         my ($k, $v) = split ("=", $_, 2);
1230         $frozenjob->{$k} = freezeunquote ($v);
1231       }
1232       next;
1233     }
1234
1235     if ($$dataref =~ /^merge (\d+) (.*)/)
1236     {
1237       $jobstep_tomerge_level = $1;
1238       @jobstep_tomerge
1239           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1240       next;
1241     }
1242
1243     my $Jobstep = { };
1244     foreach (split ("\n", $$dataref))
1245     {
1246       my ($k, $v) = split ("=", $_, 2);
1247       $Jobstep->{$k} = freezeunquote ($v) if $k;
1248     }
1249     $Jobstep->{attempts} = 0;
1250     push @jobstep, $Jobstep;
1251
1252     if ($Jobstep->{exitcode} eq "0")
1253     {
1254       push @jobstep_done, $#jobstep;
1255     }
1256     else
1257     {
1258       push @jobstep_todo, $#jobstep;
1259     }
1260   }
1261
1262   foreach (qw (script script_version script_parameters))
1263   {
1264     $Job->{$_} = $frozenjob->{$_};
1265   }
1266   $Job->save;
1267 }
1268
1269
1270 sub freezequote
1271 {
1272   my $s = shift;
1273   $s =~ s/\\/\\\\/g;
1274   $s =~ s/\n/\\n/g;
1275   return $s;
1276 }
1277
1278
1279 sub freezeunquote
1280 {
1281   my $s = shift;
1282   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1283   return $s;
1284 }
1285
1286
1287 sub srun
1288 {
1289   my $srunargs = shift;
1290   my $execargs = shift;
1291   my $opts = shift || {};
1292   my $stdin = shift;
1293   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1294   print STDERR (join (" ",
1295                       map { / / ? "'$_'" : $_ }
1296                       (@$args)),
1297                 "\n")
1298       if $ENV{CRUNCH_DEBUG};
1299
1300   if (defined $stdin) {
1301     my $child = open STDIN, "-|";
1302     defined $child or die "no fork: $!";
1303     if ($child == 0) {
1304       print $stdin or die $!;
1305       close STDOUT or die $!;
1306       exit 0;
1307     }
1308   }
1309
1310   return system (@$args) if $opts->{fork};
1311
1312   exec @$args;
1313   warn "ENV size is ".length(join(" ",%ENV));
1314   die "exec failed: $!: @$args";
1315 }
1316
1317
1318 sub ban_node_by_slot {
1319   # Don't start any new jobsteps on this node for 60 seconds
1320   my $slotid = shift;
1321   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1322   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1323 }
1324
1325 __DATA__
1326 #!/usr/bin/perl
1327
1328 # checkout-and-build
1329
1330 use Fcntl ':flock';
1331
1332 my $destdir = $ENV{"CRUNCH_SRC"};
1333 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1334 my $repo = $ENV{"CRUNCH_SRC_URL"};
1335
1336 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1337 flock L, LOCK_EX;
1338 if (readlink ("$destdir.commit") eq $commit) {
1339     exit 0;
1340 }
1341
1342 open STDOUT, ">", "$destdir.log";
1343 open STDERR, ">&STDOUT";
1344
1345 if (-d "$destdir/.git") {
1346     chdir $destdir or die "chdir $destdir: $!";
1347     if (0 != system (qw(git remote set-url origin), $repo)) {
1348         # awful... for old versions of git that don't know "remote set-url"
1349         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1350     }
1351 }
1352 elsif ($repo && $commit)
1353 {
1354     shell_or_die('git', 'clone', $repo, $destdir);
1355     chdir $destdir or die "chdir $destdir: $!";
1356     shell_or_die(qw(git config clean.requireForce false));
1357 }
1358 else {
1359     die "$destdir does not exist, and no repo/commit specified -- giving up";
1360 }
1361
1362 if ($commit) {
1363     unlink "$destdir.commit";
1364     shell_or_die (qw(git stash));
1365     shell_or_die (qw(git clean -d -x));
1366     shell_or_die (qw(git fetch origin));
1367     shell_or_die (qw(git checkout), $commit);
1368 }
1369
1370 my $pwd;
1371 chomp ($pwd = `pwd`);
1372 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1373 mkdir $install_dir;
1374 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1375     # Old version
1376     shell_or_die ("./tests/autotests.sh", $install_dir);
1377 } elsif (-e "./install.sh") {
1378     shell_or_die ("./install.sh", $install_dir);
1379 }
1380
1381 if ($commit) {
1382     unlink "$destdir.commit.new";
1383     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1384     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1385 }
1386
1387 close L;
1388
1389 exit 0;
1390
1391 sub shell_or_die
1392 {
1393   if ($ENV{"DEBUG"}) {
1394     print STDERR "@_\n";
1395   }
1396   system (@_) == 0
1397       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1398 }