port crunch dispatcher from whjobmanager to crunch-job
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100
101
102 $SIG{'HUP'} = sub
103 {
104   1;
105 };
106 $SIG{'USR1'} = sub
107 {
108   $main::ENV{CRUNCH_DEBUG} = 1;
109 };
110 $SIG{'USR2'} = sub
111 {
112   $main::ENV{CRUNCH_DEBUG} = 0;
113 };
114
115
116
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new;
119 $metastream->clear;
120 $metastream->write_start('log.txt');
121
122 my $User = {};
123 my $Job = {};
124 my $job_id;
125 my $dbh;
126 my $sth;
127 if ($job_has_uuid)
128 {
129   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130   $User = $arv->{'users'}->{'current'}->execute;
131   if (!$force_unlock) {
132     if ($Job->{'is_locked_by'}) {
133       croak("Job is locked: " . $Job->{'is_locked_by'});
134     }
135     if ($Job->{'success'} ne undef) {
136       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
137     }
138     if ($Job->{'running'}) {
139       croak("Job 'running' flag is already set");
140     }
141     if ($Job->{'started_at'}) {
142       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
143     }
144   }
145 }
146 else
147 {
148   $Job = JSON::decode_json($jobspec);
149
150   if (!$resume_stash)
151   {
152     map { croak ("No $_ specified") unless $Job->{$_} }
153     qw(script script_version script_parameters);
154   }
155
156   if (!defined $Job->{'uuid'}) {
157     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
158   }
159 }
160 $job_id = $Job->{'uuid'};
161
162
163
164 $Job->{'resource_limits'} ||= {};
165 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
166 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
167
168
169 Log (undef, "check slurm allocation");
170 my @slot;
171 my @node;
172 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
173 my @sinfo;
174 if (!$have_slurm)
175 {
176   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
177   push @sinfo, "$localcpus localhost";
178 }
179 if (exists $ENV{SLURM_NODELIST})
180 {
181   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
182 }
183 foreach (@sinfo)
184 {
185   my ($ncpus, $slurm_nodelist) = split;
186   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
187
188   my @nodelist;
189   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
190   {
191     my $nodelist = $1;
192     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
193     {
194       my $ranges = $1;
195       foreach (split (",", $ranges))
196       {
197         my ($a, $b);
198         if (/(\d+)-(\d+)/)
199         {
200           $a = $1;
201           $b = $2;
202         }
203         else
204         {
205           $a = $_;
206           $b = $_;
207         }
208         push @nodelist, map {
209           my $n = $nodelist;
210           $n =~ s/\[[-,\d]+\]/$_/;
211           $n;
212         } ($a..$b);
213       }
214     }
215     else
216     {
217       push @nodelist, $nodelist;
218     }
219   }
220   foreach my $nodename (@nodelist)
221   {
222     Log (undef, "node $nodename - $ncpus slots");
223     my $node = { name => $nodename,
224                  ncpus => $ncpus,
225                  losing_streak => 0,
226                  hold_until => 0 };
227     foreach my $cpu (1..$ncpus)
228     {
229       push @slot, { node => $node,
230                     cpu => $cpu };
231     }
232   }
233   push @node, @nodelist;
234 }
235
236
237
238 # Ensure that we get one jobstep running on each allocated node before
239 # we start overloading nodes with concurrent steps
240
241 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
242
243
244
245 my $jobmanager_id;
246 if ($job_has_uuid)
247 {
248   # Claim this job, and make sure nobody else does
249
250   $Job->{'is_locked_by'} = $User->{'uuid'};
251   $Job->{'started_at'} = gmtime;
252   $Job->{'running'} = 1;
253   $Job->{'success'} = undef;
254   $Job->{'tasks_summary'} = { 'failed' => 0,
255                               'todo' => 1,
256                               'running' => 0,
257                               'done' => 0 };
258   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261 }
262
263
264 Log (undef, "start");
265 $SIG{'INT'} = sub { $main::please_freeze = 1; };
266 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
267 $SIG{'TERM'} = \&croak;
268 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
269 $SIG{'ALRM'} = sub { $main::please_info = 1; };
270 $SIG{'CONT'} = sub { $main::please_continue = 1; };
271 $main::please_freeze = 0;
272 $main::please_info = 0;
273 $main::please_continue = 0;
274 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
275
276 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
277 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
278 $ENV{"JOB_UUID"} = $job_id;
279
280
281 my @jobstep;
282 my @jobstep_todo = ();
283 my @jobstep_done = ();
284 my @jobstep_tomerge = ();
285 my $jobstep_tomerge_level = 0;
286 my $squeue_checked;
287 my $squeue_kill_checked;
288 my $output_in_keep = 0;
289
290
291
292 if (defined $Job->{thawedfromkey})
293 {
294   thaw ($Job->{thawedfromkey});
295 }
296 else
297 {
298   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
299     'job_uuid' => $Job->{'uuid'},
300     'sequence' => 0,
301     'qsequence' => 0,
302     'parameters' => {},
303                                                           });
304   push @jobstep, { 'level' => 0,
305                    'attempts' => 0,
306                    'arvados_task' => $first_task,
307                  };
308   push @jobstep_todo, 0;
309 }
310
311
312 my $build_script;
313 do {
314   local $/ = undef;
315   $build_script = <DATA>;
316 };
317
318
319 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
320
321 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
322 if ($skip_install)
323 {
324   $ENV{"CRUNCH_SRC"} = $Job->{revision};
325 }
326 else
327 {
328   Log (undef, "Install revision ".$Job->{revision});
329   my $nodelist = join(",", @node);
330
331   # Clean out crunch_tmp/work and crunch_tmp/opt
332
333   my $cleanpid = fork();
334   if ($cleanpid == 0)
335   {
336     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
337           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
338     exit (1);
339   }
340   while (1)
341   {
342     last if $cleanpid == waitpid (-1, WNOHANG);
343     freeze_if_want_freeze ($cleanpid);
344     select (undef, undef, undef, 0.1);
345   }
346   Log (undef, "Clean-work-dir exited $?");
347
348   # Install requested code version
349
350   my @execargs;
351   my @srunargs = ("srun",
352                   "--nodelist=$nodelist",
353                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
354
355   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
356   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
357   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
358
359   my $commit;
360   my $treeish = $Job->{'script_version'};
361   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
362   # Todo: let script_version specify repository instead of expecting
363   # parent process to figure it out.
364   $ENV{"CRUNCH_SRC_URL"} = $repo;
365
366   # Create/update our clone of the remote git repo
367
368   if (!-d $ENV{"CRUNCH_SRC"}) {
369     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
370         or croak ("git clone $repo failed: exit ".($?>>8));
371     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
372   }
373   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
374
375   # If this looks like a subversion r#, look for it in git-svn commit messages
376
377   if ($treeish =~ m{^\d{1,4}$}) {
378     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
379     chomp $gitlog;
380     if ($gitlog =~ /^[a-f0-9]{40}$/) {
381       $commit = $gitlog;
382       Log (undef, "Using commit $commit for revision $treeish");
383     }
384   }
385
386   # If that didn't work, try asking git to look it up as a tree-ish.
387
388   if (!defined $commit) {
389
390     my $cooked_treeish = $treeish;
391     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
392       # Looks like a git branch name -- make sure git knows it's
393       # relative to the remote repo
394       $cooked_treeish = "origin/$treeish";
395     }
396
397     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
398     chomp $found;
399     if ($found =~ /^[0-9a-f]{40}$/s) {
400       $commit = $found;
401       if ($commit ne $treeish) {
402         # Make sure we record the real commit id in the database,
403         # frozentokey, logs, etc. -- instead of an abbreviation or a
404         # branch name which can become ambiguous or point to a
405         # different commit in the future.
406         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
407         Log (undef, "Using commit $commit for tree-ish $treeish");
408         if ($commit ne $treeish) {
409           $Job->{'script_version'} = $commit;
410           $Job->save() or croak("Error while updating job");
411         }
412       }
413     }
414   }
415
416   if (defined $commit) {
417     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
418     @execargs = ("sh", "-c",
419                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
420   }
421   else {
422     croak ("could not figure out commit id for $treeish");
423   }
424
425   my $installpid = fork();
426   if ($installpid == 0)
427   {
428     srun (\@srunargs, \@execargs, {}, $build_script);
429     exit (1);
430   }
431   while (1)
432   {
433     last if $installpid == waitpid (-1, WNOHANG);
434     freeze_if_want_freeze ($installpid);
435     select (undef, undef, undef, 0.1);
436   }
437   Log (undef, "Install exited $?");
438 }
439
440
441
442 foreach (qw (script script_version script_parameters resource_limits))
443 {
444   Log (undef,
445        "$_ " .
446        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
447 }
448 foreach (split (/\n/, $Job->{knobs}))
449 {
450   Log (undef, "knob " . $_);
451 }
452
453
454
455 my $success;
456
457
458
459 ONELEVEL:
460
461 my $thisround_succeeded = 0;
462 my $thisround_failed = 0;
463 my $thisround_failed_multiple = 0;
464
465 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
466                        or $a <=> $b } @jobstep_todo;
467 my $level = $jobstep[$jobstep_todo[0]]->{level};
468 Log (undef, "start level $level");
469
470
471
472 my %proc;
473 my @freeslot = (0..$#slot);
474 my @holdslot;
475 my %reader;
476 my $progress_is_dirty = 1;
477 my $progress_stats_updated = 0;
478
479 update_progress_stats();
480
481
482
483 THISROUND:
484 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
485 {
486   $main::please_continue = 0;
487
488   my $id = $jobstep_todo[$todo_ptr];
489   my $Jobstep = $jobstep[$id];
490   if ($Jobstep->{level} != $level)
491   {
492     next;
493   }
494   if ($Jobstep->{attempts} > 9)
495   {
496     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
497     $success = 0;
498     last THISROUND;
499   }
500
501   pipe $reader{$id}, "writer" or croak ($!);
502   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
503   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
504
505   my $childslot = $freeslot[0];
506   my $childnode = $slot[$childslot]->{node};
507   my $childslotname = join (".",
508                             $slot[$childslot]->{node}->{name},
509                             $slot[$childslot]->{cpu});
510   my $childpid = fork();
511   if ($childpid == 0)
512   {
513     $SIG{'INT'} = 'DEFAULT';
514     $SIG{'QUIT'} = 'DEFAULT';
515     $SIG{'TERM'} = 'DEFAULT';
516
517     foreach (values (%reader))
518     {
519       close($_);
520     }
521     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
522     open(STDOUT,">&writer");
523     open(STDERR,">&writer");
524
525     undef $dbh;
526     undef $sth;
527
528     delete $ENV{"GNUPGHOME"};
529     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
530     $ENV{"TASK_QSEQUENCE"} = $id;
531     $ENV{"TASK_SEQUENCE"} = $level;
532     $ENV{"JOB_SCRIPT"} = $Job->{script};
533     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
534       $param =~ tr/a-z/A-Z/;
535       $ENV{"JOB_PARAMETER_$param"} = $value;
536     }
537     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
538     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
539     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
540     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
541
542     $ENV{"GZIP"} = "-n";
543
544     my @srunargs = (
545       "srun",
546       "--nodelist=".$childnode->{name},
547       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
548       "--job-name=$job_id.$id.$$",
549         );
550     my @execargs = qw(sh);
551     my $build_script_to_send = "";
552     my $command =
553         "mkdir -p $ENV{CRUNCH_TMP}/revision "
554         ."&& cd $ENV{CRUNCH_TMP} ";
555     if ($build_script)
556     {
557       $build_script_to_send = $build_script;
558       $command .=
559           "&& perl -";
560     }
561     elsif (!$skip_install)
562     {
563       $command .=
564           "&& "
565           ."( "
566           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
567           ."|| "
568           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
569           ."    && ./installrevision "
570           ."  ) "
571           .") ";
572     }
573     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
574     $command .=
575         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
576     my @execargs = ('bash', '-c', $command);
577     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
578     exit (1);
579   }
580   close("writer");
581   if (!defined $childpid)
582   {
583     close $reader{$id};
584     delete $reader{$id};
585     next;
586   }
587   shift @freeslot;
588   $proc{$childpid} = { jobstep => $id,
589                        time => time,
590                        slot => $childslot,
591                        jobstepname => "$job_id.$id.$childpid",
592                      };
593   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
594   $slot[$childslot]->{pid} = $childpid;
595
596   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
597   Log ($id, "child $childpid started on $childslotname");
598   $Jobstep->{attempts} ++;
599   $Jobstep->{starttime} = time;
600   $Jobstep->{node} = $childnode->{name};
601   $Jobstep->{slotindex} = $childslot;
602   delete $Jobstep->{stderr};
603   delete $Jobstep->{finishtime};
604
605   splice @jobstep_todo, $todo_ptr, 1;
606   --$todo_ptr;
607
608   $progress_is_dirty = 1;
609
610   while (!@freeslot
611          ||
612          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
613   {
614     last THISROUND if $main::please_freeze;
615     if ($main::please_info)
616     {
617       $main::please_info = 0;
618       freeze();
619       collate_output();
620       save_meta(1);
621       update_progress_stats();
622     }
623     my $gotsome
624         = readfrompipes ()
625         + reapchildren ();
626     if (!$gotsome)
627     {
628       check_squeue();
629       update_progress_stats();
630       select (undef, undef, undef, 0.1);
631     }
632     elsif (time - $progress_stats_updated >= 30)
633     {
634       update_progress_stats();
635     }
636     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
637         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
638     {
639       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
640           .($thisround_failed+$thisround_succeeded)
641           .") -- giving up on this round";
642       Log (undef, $message);
643       last THISROUND;
644     }
645
646     # move slots from freeslot to holdslot (or back to freeslot) if necessary
647     for (my $i=$#freeslot; $i>=0; $i--) {
648       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
649         push @holdslot, (splice @freeslot, $i, 1);
650       }
651     }
652     for (my $i=$#holdslot; $i>=0; $i--) {
653       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
654         push @freeslot, (splice @holdslot, $i, 1);
655       }
656     }
657
658     # give up if no nodes are succeeding
659     if (!grep { $_->{node}->{losing_streak} == 0 &&
660                     $_->{node}->{hold_count} < 4 } @slot) {
661       my $message = "Every node has failed -- giving up on this round";
662       Log (undef, $message);
663       last THISROUND;
664     }
665   }
666 }
667
668
669 push @freeslot, splice @holdslot;
670 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
671
672
673 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
674 while (%proc)
675 {
676   goto THISROUND if $main::please_continue;
677   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
678   readfrompipes ();
679   if (!reapchildren())
680   {
681     check_squeue();
682     update_progress_stats();
683     select (undef, undef, undef, 0.1);
684     killem (keys %proc) if $main::please_freeze;
685   }
686 }
687
688 update_progress_stats();
689 freeze_if_want_freeze();
690
691
692 if (!defined $success)
693 {
694   if (@jobstep_todo &&
695       $thisround_succeeded == 0 &&
696       ($thisround_failed == 0 || $thisround_failed > 4))
697   {
698     my $message = "stop because $thisround_failed tasks failed and none succeeded";
699     Log (undef, $message);
700     $success = 0;
701   }
702   if (!@jobstep_todo)
703   {
704     $success = 1;
705   }
706 }
707
708 goto ONELEVEL if !defined $success;
709
710
711 release_allocation();
712 freeze();
713 $Job->{'output'} = &collate_output();
714 $Job->{'success'} = $Job->{'output'} && $success;
715 $Job->save;
716
717 if ($Job->{'output'})
718 {
719   eval {
720     my $manifest_text = capturex("whget", $Job->{'output'});
721     $arv->{'collections'}->{'create'}->execute('collection' => {
722       'uuid' => $Job->{'output'},
723       'manifest_text' => $manifest_text,
724     });
725   };
726   if ($@) {
727     Log (undef, "Failed to register output manifest: $@");
728   }
729 }
730
731 Log (undef, "finish");
732
733 save_meta();
734 exit 0;
735
736
737
738 sub update_progress_stats
739 {
740   $progress_stats_updated = time;
741   return if !$progress_is_dirty;
742   my ($todo, $done, $running) = (scalar @jobstep_todo,
743                                  scalar @jobstep_done,
744                                  scalar @slot - scalar @freeslot - scalar @holdslot);
745   $Job->{'tasks_summary'} ||= {};
746   $Job->{'tasks_summary'}->{'todo'} = $todo;
747   $Job->{'tasks_summary'}->{'done'} = $done;
748   $Job->{'tasks_summary'}->{'running'} = $running;
749   $Job->save;
750   Log (undef, "status: $done done, $running running, $todo todo");
751   $progress_is_dirty = 0;
752 }
753
754
755
756 sub reapchildren
757 {
758   my $pid = waitpid (-1, WNOHANG);
759   return 0 if $pid <= 0;
760
761   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
762                   . "."
763                   . $slot[$proc{$pid}->{slot}]->{cpu});
764   my $jobstepid = $proc{$pid}->{jobstep};
765   my $elapsed = time - $proc{$pid}->{time};
766   my $Jobstep = $jobstep[$jobstepid];
767
768   my $exitcode = $?;
769   my $exitinfo = "exit $exitcode";
770   $Jobstep->{'arvados_task'}->reload;
771   my $success = $Jobstep->{'arvados_task'}->{success};
772
773   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
774
775   if (!defined $success) {
776     # task did not indicate one way or the other --> fail
777     $Jobstep->{'arvados_task'}->{success} = 0;
778     $Jobstep->{'arvados_task'}->save;
779     $success = 0;
780   }
781
782   if (!$success)
783   {
784     my $no_incr_attempts;
785     $no_incr_attempts = 1 if $Jobstep->{node_fail};
786
787     ++$thisround_failed;
788     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
789
790     # Check for signs of a failed or misconfigured node
791     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
792         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
793       # Don't count this against jobstep failure thresholds if this
794       # node is already suspected faulty and srun exited quickly
795       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
796           $elapsed < 5 &&
797           $Jobstep->{attempts} > 1) {
798         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
799         $no_incr_attempts = 1;
800         --$Jobstep->{attempts};
801       }
802       ban_node_by_slot($proc{$pid}->{slot});
803     }
804
805     push @jobstep_todo, $jobstepid;
806     Log ($jobstepid, "failure in $elapsed seconds");
807
808     --$Jobstep->{attempts} if $no_incr_attempts;
809     $Job->{'tasks_summary'}->{'failed'}++;
810   }
811   else
812   {
813     ++$thisround_succeeded;
814     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
815     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
816     push @jobstep_done, $jobstepid;
817     Log ($jobstepid, "success in $elapsed seconds");
818   }
819   $Jobstep->{exitcode} = $exitcode;
820   $Jobstep->{finishtime} = time;
821   process_stderr ($jobstepid, $success);
822   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
823
824   close $reader{$jobstepid};
825   delete $reader{$jobstepid};
826   delete $slot[$proc{$pid}->{slot}]->{pid};
827   push @freeslot, $proc{$pid}->{slot};
828   delete $proc{$pid};
829
830   # Load new tasks
831   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
832     'where' => {
833       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
834     },
835     'order' => 'qsequence'
836   );
837   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
838     my $jobstep = {
839       'level' => $arvados_task->{'sequence'},
840       'attempts' => 0,
841       'arvados_task' => $arvados_task
842     };
843     push @jobstep, $jobstep;
844     push @jobstep_todo, $#jobstep;
845   }
846
847   $progress_is_dirty = 1;
848   1;
849 }
850
851
852 sub check_squeue
853 {
854   # return if the kill list was checked <4 seconds ago
855   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
856   {
857     return;
858   }
859   $squeue_kill_checked = time;
860
861   # use killem() on procs whose killtime is reached
862   for (keys %proc)
863   {
864     if (exists $proc{$_}->{killtime}
865         && $proc{$_}->{killtime} <= time)
866     {
867       killem ($_);
868     }
869   }
870
871   # return if the squeue was checked <60 seconds ago
872   if (defined $squeue_checked && $squeue_checked > time - 60)
873   {
874     return;
875   }
876   $squeue_checked = time;
877
878   if (!$have_slurm)
879   {
880     # here is an opportunity to check for mysterious problems with local procs
881     return;
882   }
883
884   # get a list of steps still running
885   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
886   chop @squeue;
887   if ($squeue[-1] ne "ok")
888   {
889     return;
890   }
891   pop @squeue;
892
893   # which of my jobsteps are running, according to squeue?
894   my %ok;
895   foreach (@squeue)
896   {
897     if (/^(\d+)\.(\d+) (\S+)/)
898     {
899       if ($1 eq $ENV{SLURM_JOBID})
900       {
901         $ok{$3} = 1;
902       }
903     }
904   }
905
906   # which of my active child procs (>60s old) were not mentioned by squeue?
907   foreach (keys %proc)
908   {
909     if ($proc{$_}->{time} < time - 60
910         && !exists $ok{$proc{$_}->{jobstepname}}
911         && !exists $proc{$_}->{killtime})
912     {
913       # kill this proc if it hasn't exited in 30 seconds
914       $proc{$_}->{killtime} = time + 30;
915     }
916   }
917 }
918
919
920 sub release_allocation
921 {
922   if ($have_slurm)
923   {
924     Log (undef, "release job allocation");
925     system "scancel $ENV{SLURM_JOBID}";
926   }
927 }
928
929
930 sub readfrompipes
931 {
932   my $gotsome = 0;
933   foreach my $job (keys %reader)
934   {
935     my $buf;
936     while (0 < sysread ($reader{$job}, $buf, 8192))
937     {
938       print STDERR $buf if $ENV{CRUNCH_DEBUG};
939       $jobstep[$job]->{stderr} .= $buf;
940       preprocess_stderr ($job);
941       if (length ($jobstep[$job]->{stderr}) > 16384)
942       {
943         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
944       }
945       $gotsome = 1;
946     }
947   }
948   return $gotsome;
949 }
950
951
952 sub preprocess_stderr
953 {
954   my $job = shift;
955
956   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
957     my $line = $1;
958     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
959     Log ($job, "stderr $line");
960     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
961       # whoa.
962       $main::please_freeze = 1;
963     }
964     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
965       $jobstep[$job]->{node_fail} = 1;
966       ban_node_by_slot($jobstep[$job]->{slotindex});
967     }
968   }
969 }
970
971
972 sub process_stderr
973 {
974   my $job = shift;
975   my $success = shift;
976   preprocess_stderr ($job);
977
978   map {
979     Log ($job, "stderr $_");
980   } split ("\n", $jobstep[$job]->{stderr});
981 }
982
983
984 sub collate_output
985 {
986   my $whc = Warehouse->new;
987   Log (undef, "collate");
988   $whc->write_start (1);
989   my $joboutput;
990   for (@jobstep)
991   {
992     next if (!exists $_->{'arvados_task'}->{output} ||
993              !$_->{'arvados_task'}->{'success'} ||
994              $_->{'exitcode'} != 0);
995     my $output = $_->{'arvados_task'}->{output};
996     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
997     {
998       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
999       $whc->write_data ($output);
1000     }
1001     elsif (@jobstep == 1)
1002     {
1003       $joboutput = $output;
1004       $whc->write_finish;
1005     }
1006     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1007     {
1008       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1009       $whc->write_data ($outblock);
1010     }
1011     else
1012     {
1013       my $errstr = $whc->errstr;
1014       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1015       $success = 0;
1016     }
1017   }
1018   $joboutput = $whc->write_finish if !defined $joboutput;
1019   if ($joboutput)
1020   {
1021     Log (undef, "output $joboutput");
1022     $Job->{'output'} = $joboutput;
1023     $Job->save;
1024   }
1025   else
1026   {
1027     Log (undef, "output undef");
1028   }
1029   return $joboutput;
1030 }
1031
1032
1033 sub killem
1034 {
1035   foreach (@_)
1036   {
1037     my $sig = 2;                # SIGINT first
1038     if (exists $proc{$_}->{"sent_$sig"} &&
1039         time - $proc{$_}->{"sent_$sig"} > 4)
1040     {
1041       $sig = 15;                # SIGTERM if SIGINT doesn't work
1042     }
1043     if (exists $proc{$_}->{"sent_$sig"} &&
1044         time - $proc{$_}->{"sent_$sig"} > 4)
1045     {
1046       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1047     }
1048     if (!exists $proc{$_}->{"sent_$sig"})
1049     {
1050       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1051       kill $sig, $_;
1052       select (undef, undef, undef, 0.1);
1053       if ($sig == 2)
1054       {
1055         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1056       }
1057       $proc{$_}->{"sent_$sig"} = time;
1058       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1059     }
1060   }
1061 }
1062
1063
1064 sub fhbits
1065 {
1066   my($bits);
1067   for (@_) {
1068     vec($bits,fileno($_),1) = 1;
1069   }
1070   $bits;
1071 }
1072
1073
1074 sub Log                         # ($jobstep_id, $logmessage)
1075 {
1076   if ($_[1] =~ /\n/) {
1077     for my $line (split (/\n/, $_[1])) {
1078       Log ($_[0], $line);
1079     }
1080     return;
1081   }
1082   my $fh = select STDERR; $|=1; select $fh;
1083   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1084   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1085   $message .= "\n";
1086   my $datetime;
1087   if ($metastream || -t STDERR) {
1088     my @gmtime = gmtime;
1089     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1090                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1091   }
1092   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1093
1094   return if !$metastream;
1095   $metastream->write_data ($datetime . " " . $message);
1096 }
1097
1098
1099 sub croak
1100 {
1101   my ($package, $file, $line) = caller;
1102   my $message = "@_ at $file line $line\n";
1103   Log (undef, $message);
1104   freeze() if @jobstep_todo;
1105   collate_output() if @jobstep_todo;
1106   cleanup();
1107   save_meta() if $metastream;
1108   die;
1109 }
1110
1111
1112 sub cleanup
1113 {
1114   return if !$job_has_uuid;
1115   $Job->reload;
1116   $Job->{'running'} = 0;
1117   $Job->{'success'} = 0;
1118   $Job->{'finished_at'} = gmtime;
1119   $Job->save;
1120 }
1121
1122
1123 sub save_meta
1124 {
1125   my $justcheckpoint = shift; # false if this will be the last meta saved
1126   my $m = $metastream;
1127   $m = $m->copy if $justcheckpoint;
1128   $m->write_finish;
1129   my $loglocator = $m->as_key;
1130   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1131   Log (undef, "meta key is $loglocator");
1132   $Job->{'log'} = $loglocator;
1133   $Job->save;
1134 }
1135
1136
1137 sub freeze_if_want_freeze
1138 {
1139   if ($main::please_freeze)
1140   {
1141     release_allocation();
1142     if (@_)
1143     {
1144       # kill some srun procs before freeze+stop
1145       map { $proc{$_} = {} } @_;
1146       while (%proc)
1147       {
1148         killem (keys %proc);
1149         select (undef, undef, undef, 0.1);
1150         my $died;
1151         while (($died = waitpid (-1, WNOHANG)) > 0)
1152         {
1153           delete $proc{$died};
1154         }
1155       }
1156     }
1157     freeze();
1158     collate_output();
1159     cleanup();
1160     save_meta();
1161     exit 0;
1162   }
1163 }
1164
1165
1166 sub freeze
1167 {
1168   Log (undef, "Freeze not implemented");
1169   return;
1170 }
1171
1172
1173 sub thaw
1174 {
1175   croak ("Thaw not implemented");
1176
1177   my $whc;
1178   my $key = shift;
1179   Log (undef, "thaw from $key");
1180
1181   @jobstep = ();
1182   @jobstep_done = ();
1183   @jobstep_todo = ();
1184   @jobstep_tomerge = ();
1185   $jobstep_tomerge_level = 0;
1186   my $frozenjob = {};
1187
1188   my $stream = new Warehouse::Stream ( whc => $whc,
1189                                        hash => [split (",", $key)] );
1190   $stream->rewind;
1191   while (my $dataref = $stream->read_until (undef, "\n\n"))
1192   {
1193     if ($$dataref =~ /^job /)
1194     {
1195       foreach (split ("\n", $$dataref))
1196       {
1197         my ($k, $v) = split ("=", $_, 2);
1198         $frozenjob->{$k} = freezeunquote ($v);
1199       }
1200       next;
1201     }
1202
1203     if ($$dataref =~ /^merge (\d+) (.*)/)
1204     {
1205       $jobstep_tomerge_level = $1;
1206       @jobstep_tomerge
1207           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1208       next;
1209     }
1210
1211     my $Jobstep = { };
1212     foreach (split ("\n", $$dataref))
1213     {
1214       my ($k, $v) = split ("=", $_, 2);
1215       $Jobstep->{$k} = freezeunquote ($v) if $k;
1216     }
1217     $Jobstep->{attempts} = 0;
1218     push @jobstep, $Jobstep;
1219
1220     if ($Jobstep->{exitcode} eq "0")
1221     {
1222       push @jobstep_done, $#jobstep;
1223     }
1224     else
1225     {
1226       push @jobstep_todo, $#jobstep;
1227     }
1228   }
1229
1230   foreach (qw (script script_version script_parameters))
1231   {
1232     $Job->{$_} = $frozenjob->{$_};
1233   }
1234   $Job->save;
1235 }
1236
1237
1238 sub freezequote
1239 {
1240   my $s = shift;
1241   $s =~ s/\\/\\\\/g;
1242   $s =~ s/\n/\\n/g;
1243   return $s;
1244 }
1245
1246
1247 sub freezeunquote
1248 {
1249   my $s = shift;
1250   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1251   return $s;
1252 }
1253
1254
1255 sub srun
1256 {
1257   my $srunargs = shift;
1258   my $execargs = shift;
1259   my $opts = shift || {};
1260   my $stdin = shift;
1261   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1262   print STDERR (join (" ",
1263                       map { / / ? "'$_'" : $_ }
1264                       (@$args)),
1265                 "\n")
1266       if $ENV{CRUNCH_DEBUG};
1267
1268   if (defined $stdin) {
1269     my $child = open STDIN, "-|";
1270     defined $child or die "no fork: $!";
1271     if ($child == 0) {
1272       print $stdin or die $!;
1273       close STDOUT or die $!;
1274       exit 0;
1275     }
1276   }
1277
1278   return system (@$args) if $opts->{fork};
1279
1280   exec @$args;
1281   warn "ENV size is ".length(join(" ",%ENV));
1282   die "exec failed: $!: @$args";
1283 }
1284
1285
1286 sub ban_node_by_slot {
1287   # Don't start any new jobsteps on this node for 60 seconds
1288   my $slotid = shift;
1289   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1290   $slot[$slotid]->{node}->{hold_count}++;
1291   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1292 }
1293
1294 __DATA__
1295 #!/usr/bin/perl
1296
1297 # checkout-and-build
1298
1299 use Fcntl ':flock';
1300
1301 my $destdir = $ENV{"CRUNCH_SRC"};
1302 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1303 my $repo = $ENV{"CRUNCH_SRC_URL"};
1304
1305 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1306 flock L, LOCK_EX;
1307 if (readlink ("$destdir.commit") eq $commit) {
1308     exit 0;
1309 }
1310
1311 open STDOUT, ">", "$destdir.log";
1312 open STDERR, ">&STDOUT";
1313
1314 if (-d "$destdir/.git") {
1315     chdir $destdir or die "chdir $destdir: $!";
1316     if (0 != system (qw(git remote set-url origin), $repo)) {
1317         # awful... for old versions of git that don't know "remote set-url"
1318         shell_or_die (q(perl -pi~ -e '$_="\turl = ).$repo.q(\n" if /url = /' .git/config));
1319     }
1320 }
1321 elsif ($repo && $commit)
1322 {
1323     shell_or_die('git', 'clone', $repo, $destdir);
1324     chdir $destdir or die "chdir $destdir: $!";
1325     shell_or_die(qw(git config clean.requireForce false));
1326 }
1327 else {
1328     die "$destdir does not exist, and no repo/commit specified -- giving up";
1329 }
1330
1331 if ($commit) {
1332     unlink "$destdir.commit";
1333     shell_or_die (qw(git stash));
1334     shell_or_die (qw(git clean -d -x));
1335     shell_or_die (qw(git fetch origin));
1336     shell_or_die (qw(git checkout), $commit);
1337 }
1338
1339 my $pwd;
1340 chomp ($pwd = `pwd`);
1341 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1342 mkdir $install_dir;
1343 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1344     # Old version
1345     shell_or_die ("./tests/autotests.sh", $install_dir);
1346 } elsif (-e "./install.sh") {
1347     shell_or_die ("./install.sh", $install_dir);
1348 }
1349
1350 if ($commit) {
1351     unlink "$destdir.commit.new";
1352     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1353     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1354 }
1355
1356 close L;
1357
1358 exit 0;
1359
1360 sub shell_or_die
1361 {
1362   if ($ENV{"DEBUG"}) {
1363     print STDERR "@_\n";
1364   }
1365   system (@_) == 0
1366       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1367 }