add util functions, fix up tmp dirs
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 if ($ENV{"USER"} ne "crunch" && $< != 0) {
80   # use a tmp dir unique for my uid
81   $ENV{"CRUNCH_TMP"} .= "-$<";
82 }
83 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
84 $ENV{"CRUNCH_TMP"} = $ENV{"JOB_WORK"}; # deprecated
85 mkdir ($ENV{"JOB_WORK"});
86
87 my $force_unlock;
88 my $git_dir;
89 my $jobspec;
90 my $job_api_token;
91 my $resume_stash;
92 GetOptions('force-unlock' => \$force_unlock,
93            'git-dir=s' => \$git_dir,
94            'job=s' => \$jobspec,
95            'job-api-token=s' => \$job_api_token,
96            'resume-stash=s' => \$resume_stash,
97     );
98
99 if (defined $job_api_token) {
100   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
101 }
102
103 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
104 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
105 my $local_job = !$job_has_uuid;
106
107
108 $SIG{'HUP'} = sub
109 {
110   1;
111 };
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new;
124 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
125 $metastream->clear;
126 $metastream->write_start('log.txt');
127
128 my $User = $arv->{'users'}->{'current'}->execute;
129
130 my $Job = {};
131 my $job_id;
132 my $dbh;
133 my $sth;
134 if ($job_has_uuid)
135 {
136   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
137   if (!$force_unlock) {
138     if ($Job->{'is_locked_by_uuid'}) {
139       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
140     }
141     if ($Job->{'success'} ne undef) {
142       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
143     }
144     if ($Job->{'running'}) {
145       croak("Job 'running' flag is already set");
146     }
147     if ($Job->{'started_at'}) {
148       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
149     }
150   }
151 }
152 else
153 {
154   $Job = JSON::decode_json($jobspec);
155
156   if (!$resume_stash)
157   {
158     map { croak ("No $_ specified") unless $Job->{$_} }
159     qw(script script_version script_parameters);
160   }
161
162   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
163   $Job->{'started_at'} = gmtime;
164
165   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
166
167   $job_has_uuid = 1;
168 }
169 $job_id = $Job->{'uuid'};
170
171
172
173 $Job->{'resource_limits'} ||= {};
174 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
175 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
176
177
178 Log (undef, "check slurm allocation");
179 my @slot;
180 my @node;
181 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
182 my @sinfo;
183 if (!$have_slurm)
184 {
185   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
186   push @sinfo, "$localcpus localhost";
187 }
188 if (exists $ENV{SLURM_NODELIST})
189 {
190   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
191 }
192 foreach (@sinfo)
193 {
194   my ($ncpus, $slurm_nodelist) = split;
195   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
196
197   my @nodelist;
198   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
199   {
200     my $nodelist = $1;
201     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
202     {
203       my $ranges = $1;
204       foreach (split (",", $ranges))
205       {
206         my ($a, $b);
207         if (/(\d+)-(\d+)/)
208         {
209           $a = $1;
210           $b = $2;
211         }
212         else
213         {
214           $a = $_;
215           $b = $_;
216         }
217         push @nodelist, map {
218           my $n = $nodelist;
219           $n =~ s/\[[-,\d]+\]/$_/;
220           $n;
221         } ($a..$b);
222       }
223     }
224     else
225     {
226       push @nodelist, $nodelist;
227     }
228   }
229   foreach my $nodename (@nodelist)
230   {
231     Log (undef, "node $nodename - $ncpus slots");
232     my $node = { name => $nodename,
233                  ncpus => $ncpus,
234                  losing_streak => 0,
235                  hold_until => 0 };
236     foreach my $cpu (1..$ncpus)
237     {
238       push @slot, { node => $node,
239                     cpu => $cpu };
240     }
241   }
242   push @node, @nodelist;
243 }
244
245
246
247 # Ensure that we get one jobstep running on each allocated node before
248 # we start overloading nodes with concurrent steps
249
250 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
251
252
253
254 my $jobmanager_id;
255 if ($job_has_uuid)
256 {
257   # Claim this job, and make sure nobody else does
258
259   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
260   $Job->{'started_at'} = gmtime;
261   $Job->{'running'} = 1;
262   $Job->{'success'} = undef;
263   $Job->{'tasks_summary'} = { 'failed' => 0,
264                               'todo' => 1,
265                               'running' => 0,
266                               'done' => 0 };
267   if ($job_has_uuid) {
268     unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269       croak("Error while updating / locking job");
270     }
271   }
272 }
273
274
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
286
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
290
291
292 my @jobstep;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
297 my $squeue_checked;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'attempts' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 }
333 else
334 {
335   do {
336     local $/ = undef;
337     $build_script = <DATA>;
338   };
339   Log (undef, "Install revision ".$Job->{script_version});
340   my $nodelist = join(",", @node);
341
342   # Clean out crunch_tmp/work and crunch_tmp/opt
343
344   my $cleanpid = fork();
345   if ($cleanpid == 0)
346   {
347     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
349     exit (1);
350   }
351   while (1)
352   {
353     last if $cleanpid == waitpid (-1, WNOHANG);
354     freeze_if_want_freeze ($cleanpid);
355     select (undef, undef, undef, 0.1);
356   }
357   Log (undef, "Clean-work-dir exited $?");
358
359   # Install requested code version
360
361   my @execargs;
362   my @srunargs = ("srun",
363                   "--nodelist=$nodelist",
364                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
365
366   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
369
370   my $commit;
371   my $git_archive;
372   my $treeish = $Job->{'script_version'};
373   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374   # Todo: let script_version specify repository instead of expecting
375   # parent process to figure it out.
376   $ENV{"CRUNCH_SRC_URL"} = $repo;
377
378   # Create/update our clone of the remote git repo
379
380   if (!-d $ENV{"CRUNCH_SRC"}) {
381     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382         or croak ("git clone $repo failed: exit ".($?>>8));
383     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
384   }
385   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
386
387   # If this looks like a subversion r#, look for it in git-svn commit messages
388
389   if ($treeish =~ m{^\d{1,4}$}) {
390     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
391     chomp $gitlog;
392     if ($gitlog =~ /^[a-f0-9]{40}$/) {
393       $commit = $gitlog;
394       Log (undef, "Using commit $commit for script_version $treeish");
395     }
396   }
397
398   # If that didn't work, try asking git to look it up as a tree-ish.
399
400   if (!defined $commit) {
401
402     my $cooked_treeish = $treeish;
403     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404       # Looks like a git branch name -- make sure git knows it's
405       # relative to the remote repo
406       $cooked_treeish = "origin/$treeish";
407     }
408
409     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
410     chomp $found;
411     if ($found =~ /^[0-9a-f]{40}$/s) {
412       $commit = $found;
413       if ($commit ne $treeish) {
414         # Make sure we record the real commit id in the database,
415         # frozentokey, logs, etc. -- instead of an abbreviation or a
416         # branch name which can become ambiguous or point to a
417         # different commit in the future.
418         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419         Log (undef, "Using commit $commit for tree-ish $treeish");
420         if ($commit ne $treeish) {
421           $Job->{'script_version'} = $commit;
422           !$job_has_uuid or $Job->save() or croak("Error while updating job");
423         }
424       }
425     }
426   }
427
428   if (defined $commit) {
429     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430     @execargs = ("sh", "-c",
431                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
432     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
433   }
434   else {
435     croak ("could not figure out commit id for $treeish");
436   }
437
438   my $installpid = fork();
439   if ($installpid == 0)
440   {
441     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
442     exit (1);
443   }
444   while (1)
445   {
446     last if $installpid == waitpid (-1, WNOHANG);
447     freeze_if_want_freeze ($installpid);
448     select (undef, undef, undef, 0.1);
449   }
450   Log (undef, "Install exited $?");
451 }
452
453
454
455 foreach (qw (script script_version script_parameters resource_limits))
456 {
457   Log (undef,
458        "$_ " .
459        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
460 }
461 foreach (split (/\n/, $Job->{knobs}))
462 {
463   Log (undef, "knob " . $_);
464 }
465
466
467
468 my $success;
469
470
471
472 ONELEVEL:
473
474 my $thisround_succeeded = 0;
475 my $thisround_failed = 0;
476 my $thisround_failed_multiple = 0;
477
478 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
479                        or $a <=> $b } @jobstep_todo;
480 my $level = $jobstep[$jobstep_todo[0]]->{level};
481 Log (undef, "start level $level");
482
483
484
485 my %proc;
486 my @freeslot = (0..$#slot);
487 my @holdslot;
488 my %reader;
489 my $progress_is_dirty = 1;
490 my $progress_stats_updated = 0;
491
492 update_progress_stats();
493
494
495
496 THISROUND:
497 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
498 {
499   $main::please_continue = 0;
500
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507   if ($Jobstep->{attempts} > 9)
508   {
509     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
510     $success = 0;
511     last THISROUND;
512   }
513
514   pipe $reader{$id}, "writer" or croak ($!);
515   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
517
518   my $childslot = $freeslot[0];
519   my $childnode = $slot[$childslot]->{node};
520   my $childslotname = join (".",
521                             $slot[$childslot]->{node}->{name},
522                             $slot[$childslot]->{cpu});
523   my $childpid = fork();
524   if ($childpid == 0)
525   {
526     $SIG{'INT'} = 'DEFAULT';
527     $SIG{'QUIT'} = 'DEFAULT';
528     $SIG{'TERM'} = 'DEFAULT';
529
530     foreach (values (%reader))
531     {
532       close($_);
533     }
534     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535     open(STDOUT,">&writer");
536     open(STDERR,">&writer");
537
538     undef $dbh;
539     undef $sth;
540
541     delete $ENV{"GNUPGHOME"};
542     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543     $ENV{"TASK_QSEQUENCE"} = $id;
544     $ENV{"TASK_SEQUENCE"} = $level;
545     $ENV{"JOB_SCRIPT"} = $Job->{script};
546     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547       $param =~ tr/a-z/A-Z/;
548       $ENV{"JOB_PARAMETER_$param"} = $value;
549     }
550     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
555
556     $ENV{"GZIP"} = "-n";
557
558     my @srunargs = (
559       "srun",
560       "--nodelist=".$childnode->{name},
561       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562       "--job-name=$job_id.$id.$$",
563         );
564     my @execargs = qw(sh);
565     my $build_script_to_send = "";
566     my $command =
567         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568         ."&& cd $ENV{CRUNCH_TMP} ";
569     if ($build_script)
570     {
571       $build_script_to_send = $build_script;
572       $command .=
573           "&& perl -";
574     }
575     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
576     $command .=
577         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
578     my @execargs = ('bash', '-c', $command);
579     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
580     exit (1);
581   }
582   close("writer");
583   if (!defined $childpid)
584   {
585     close $reader{$id};
586     delete $reader{$id};
587     next;
588   }
589   shift @freeslot;
590   $proc{$childpid} = { jobstep => $id,
591                        time => time,
592                        slot => $childslot,
593                        jobstepname => "$job_id.$id.$childpid",
594                      };
595   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
596   $slot[$childslot]->{pid} = $childpid;
597
598   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
599   Log ($id, "child $childpid started on $childslotname");
600   $Jobstep->{attempts} ++;
601   $Jobstep->{starttime} = time;
602   $Jobstep->{node} = $childnode->{name};
603   $Jobstep->{slotindex} = $childslot;
604   delete $Jobstep->{stderr};
605   delete $Jobstep->{finishtime};
606
607   splice @jobstep_todo, $todo_ptr, 1;
608   --$todo_ptr;
609
610   $progress_is_dirty = 1;
611
612   while (!@freeslot
613          ||
614          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
615   {
616     last THISROUND if $main::please_freeze;
617     if ($main::please_info)
618     {
619       $main::please_info = 0;
620       freeze();
621       collate_output();
622       save_meta(1);
623       update_progress_stats();
624     }
625     my $gotsome
626         = readfrompipes ()
627         + reapchildren ();
628     if (!$gotsome)
629     {
630       check_squeue();
631       update_progress_stats();
632       select (undef, undef, undef, 0.1);
633     }
634     elsif (time - $progress_stats_updated >= 30)
635     {
636       update_progress_stats();
637     }
638     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
639         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
640     {
641       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
642           .($thisround_failed+$thisround_succeeded)
643           .") -- giving up on this round";
644       Log (undef, $message);
645       last THISROUND;
646     }
647
648     # move slots from freeslot to holdslot (or back to freeslot) if necessary
649     for (my $i=$#freeslot; $i>=0; $i--) {
650       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
651         push @holdslot, (splice @freeslot, $i, 1);
652       }
653     }
654     for (my $i=$#holdslot; $i>=0; $i--) {
655       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
656         push @freeslot, (splice @holdslot, $i, 1);
657       }
658     }
659
660     # give up if no nodes are succeeding
661     if (!grep { $_->{node}->{losing_streak} == 0 &&
662                     $_->{node}->{hold_count} < 4 } @slot) {
663       my $message = "Every node has failed -- giving up on this round";
664       Log (undef, $message);
665       last THISROUND;
666     }
667   }
668 }
669
670
671 push @freeslot, splice @holdslot;
672 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
673
674
675 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
676 while (%proc)
677 {
678   goto THISROUND if $main::please_continue;
679   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
680   readfrompipes ();
681   if (!reapchildren())
682   {
683     check_squeue();
684     update_progress_stats();
685     select (undef, undef, undef, 0.1);
686     killem (keys %proc) if $main::please_freeze;
687   }
688 }
689
690 update_progress_stats();
691 freeze_if_want_freeze();
692
693
694 if (!defined $success)
695 {
696   if (@jobstep_todo &&
697       $thisround_succeeded == 0 &&
698       ($thisround_failed == 0 || $thisround_failed > 4))
699   {
700     my $message = "stop because $thisround_failed tasks failed and none succeeded";
701     Log (undef, $message);
702     $success = 0;
703   }
704   if (!@jobstep_todo)
705   {
706     $success = 1;
707   }
708 }
709
710 goto ONELEVEL if !defined $success;
711
712
713 release_allocation();
714 freeze();
715 $Job->reload;
716 $Job->{'output'} = &collate_output();
717 $Job->{'running'} = 0;
718 $Job->{'success'} = $Job->{'output'} && $success;
719 $Job->{'finished_at'} = gmtime;
720 $Job->save if $job_has_uuid;
721
722 if ($Job->{'output'})
723 {
724   eval {
725     my $manifest_text = capturex("whget", $Job->{'output'});
726     $arv->{'collections'}->{'create'}->execute('collection' => {
727       'uuid' => $Job->{'output'},
728       'manifest_text' => $manifest_text,
729     });
730   };
731   if ($@) {
732     Log (undef, "Failed to register output manifest: $@");
733   }
734 }
735
736 Log (undef, "finish");
737
738 save_meta();
739 exit 0;
740
741
742
743 sub update_progress_stats
744 {
745   $progress_stats_updated = time;
746   return if !$progress_is_dirty;
747   my ($todo, $done, $running) = (scalar @jobstep_todo,
748                                  scalar @jobstep_done,
749                                  scalar @slot - scalar @freeslot - scalar @holdslot);
750   $Job->{'tasks_summary'} ||= {};
751   $Job->{'tasks_summary'}->{'todo'} = $todo;
752   $Job->{'tasks_summary'}->{'done'} = $done;
753   $Job->{'tasks_summary'}->{'running'} = $running;
754   $Job->save if $job_has_uuid;
755   Log (undef, "status: $done done, $running running, $todo todo");
756   $progress_is_dirty = 0;
757 }
758
759
760
761 sub reapchildren
762 {
763   my $pid = waitpid (-1, WNOHANG);
764   return 0 if $pid <= 0;
765
766   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
767                   . "."
768                   . $slot[$proc{$pid}->{slot}]->{cpu});
769   my $jobstepid = $proc{$pid}->{jobstep};
770   my $elapsed = time - $proc{$pid}->{time};
771   my $Jobstep = $jobstep[$jobstepid];
772
773   my $exitcode = $?;
774   my $exitinfo = "exit $exitcode";
775   $Jobstep->{'arvados_task'}->reload;
776   my $success = $Jobstep->{'arvados_task'}->{success};
777
778   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
779
780   if (!defined $success) {
781     # task did not indicate one way or the other --> fail
782     $Jobstep->{'arvados_task'}->{success} = 0;
783     $Jobstep->{'arvados_task'}->save;
784     $success = 0;
785   }
786
787   if (!$success)
788   {
789     my $no_incr_attempts;
790     $no_incr_attempts = 1 if $Jobstep->{node_fail};
791
792     ++$thisround_failed;
793     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
794
795     # Check for signs of a failed or misconfigured node
796     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
797         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
798       # Don't count this against jobstep failure thresholds if this
799       # node is already suspected faulty and srun exited quickly
800       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
801           $elapsed < 5 &&
802           $Jobstep->{attempts} > 1) {
803         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
804         $no_incr_attempts = 1;
805         --$Jobstep->{attempts};
806       }
807       ban_node_by_slot($proc{$pid}->{slot});
808     }
809
810     push @jobstep_todo, $jobstepid;
811     Log ($jobstepid, "failure in $elapsed seconds");
812
813     --$Jobstep->{attempts} if $no_incr_attempts;
814     $Job->{'tasks_summary'}->{'failed'}++;
815   }
816   else
817   {
818     ++$thisround_succeeded;
819     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
820     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
821     push @jobstep_done, $jobstepid;
822     Log ($jobstepid, "success in $elapsed seconds");
823   }
824   $Jobstep->{exitcode} = $exitcode;
825   $Jobstep->{finishtime} = time;
826   process_stderr ($jobstepid, $success);
827   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
828
829   close $reader{$jobstepid};
830   delete $reader{$jobstepid};
831   delete $slot[$proc{$pid}->{slot}]->{pid};
832   push @freeslot, $proc{$pid}->{slot};
833   delete $proc{$pid};
834
835   # Load new tasks
836   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
837     'where' => {
838       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
839     },
840     'order' => 'qsequence'
841   );
842   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
843     my $jobstep = {
844       'level' => $arvados_task->{'sequence'},
845       'attempts' => 0,
846       'arvados_task' => $arvados_task
847     };
848     push @jobstep, $jobstep;
849     push @jobstep_todo, $#jobstep;
850   }
851
852   $progress_is_dirty = 1;
853   1;
854 }
855
856
857 sub check_squeue
858 {
859   # return if the kill list was checked <4 seconds ago
860   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
861   {
862     return;
863   }
864   $squeue_kill_checked = time;
865
866   # use killem() on procs whose killtime is reached
867   for (keys %proc)
868   {
869     if (exists $proc{$_}->{killtime}
870         && $proc{$_}->{killtime} <= time)
871     {
872       killem ($_);
873     }
874   }
875
876   # return if the squeue was checked <60 seconds ago
877   if (defined $squeue_checked && $squeue_checked > time - 60)
878   {
879     return;
880   }
881   $squeue_checked = time;
882
883   if (!$have_slurm)
884   {
885     # here is an opportunity to check for mysterious problems with local procs
886     return;
887   }
888
889   # get a list of steps still running
890   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
891   chop @squeue;
892   if ($squeue[-1] ne "ok")
893   {
894     return;
895   }
896   pop @squeue;
897
898   # which of my jobsteps are running, according to squeue?
899   my %ok;
900   foreach (@squeue)
901   {
902     if (/^(\d+)\.(\d+) (\S+)/)
903     {
904       if ($1 eq $ENV{SLURM_JOBID})
905       {
906         $ok{$3} = 1;
907       }
908     }
909   }
910
911   # which of my active child procs (>60s old) were not mentioned by squeue?
912   foreach (keys %proc)
913   {
914     if ($proc{$_}->{time} < time - 60
915         && !exists $ok{$proc{$_}->{jobstepname}}
916         && !exists $proc{$_}->{killtime})
917     {
918       # kill this proc if it hasn't exited in 30 seconds
919       $proc{$_}->{killtime} = time + 30;
920     }
921   }
922 }
923
924
925 sub release_allocation
926 {
927   if ($have_slurm)
928   {
929     Log (undef, "release job allocation");
930     system "scancel $ENV{SLURM_JOBID}";
931   }
932 }
933
934
935 sub readfrompipes
936 {
937   my $gotsome = 0;
938   foreach my $job (keys %reader)
939   {
940     my $buf;
941     while (0 < sysread ($reader{$job}, $buf, 8192))
942     {
943       print STDERR $buf if $ENV{CRUNCH_DEBUG};
944       $jobstep[$job]->{stderr} .= $buf;
945       preprocess_stderr ($job);
946       if (length ($jobstep[$job]->{stderr}) > 16384)
947       {
948         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
949       }
950       $gotsome = 1;
951     }
952   }
953   return $gotsome;
954 }
955
956
957 sub preprocess_stderr
958 {
959   my $job = shift;
960
961   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
962     my $line = $1;
963     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
964     Log ($job, "stderr $line");
965     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
966       # whoa.
967       $main::please_freeze = 1;
968     }
969     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
970       $jobstep[$job]->{node_fail} = 1;
971       ban_node_by_slot($jobstep[$job]->{slotindex});
972     }
973   }
974 }
975
976
977 sub process_stderr
978 {
979   my $job = shift;
980   my $success = shift;
981   preprocess_stderr ($job);
982
983   map {
984     Log ($job, "stderr $_");
985   } split ("\n", $jobstep[$job]->{stderr});
986 }
987
988
989 sub collate_output
990 {
991   my $whc = Warehouse->new;
992   Log (undef, "collate");
993   $whc->write_start (1);
994   my $joboutput;
995   for (@jobstep)
996   {
997     next if (!exists $_->{'arvados_task'}->{output} ||
998              !$_->{'arvados_task'}->{'success'} ||
999              $_->{'exitcode'} != 0);
1000     my $output = $_->{'arvados_task'}->{output};
1001     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1002     {
1003       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1004       $whc->write_data ($output);
1005     }
1006     elsif (@jobstep == 1)
1007     {
1008       $joboutput = $output;
1009       $whc->write_finish;
1010     }
1011     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1012     {
1013       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1014       $whc->write_data ($outblock);
1015     }
1016     else
1017     {
1018       my $errstr = $whc->errstr;
1019       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1020       $success = 0;
1021     }
1022   }
1023   $joboutput = $whc->write_finish if !defined $joboutput;
1024   if ($joboutput)
1025   {
1026     Log (undef, "output $joboutput");
1027     $Job->{'output'} = $joboutput;
1028     $Job->save if $job_has_uuid;
1029   }
1030   else
1031   {
1032     Log (undef, "output undef");
1033   }
1034   return $joboutput;
1035 }
1036
1037
1038 sub killem
1039 {
1040   foreach (@_)
1041   {
1042     my $sig = 2;                # SIGINT first
1043     if (exists $proc{$_}->{"sent_$sig"} &&
1044         time - $proc{$_}->{"sent_$sig"} > 4)
1045     {
1046       $sig = 15;                # SIGTERM if SIGINT doesn't work
1047     }
1048     if (exists $proc{$_}->{"sent_$sig"} &&
1049         time - $proc{$_}->{"sent_$sig"} > 4)
1050     {
1051       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1052     }
1053     if (!exists $proc{$_}->{"sent_$sig"})
1054     {
1055       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1056       kill $sig, $_;
1057       select (undef, undef, undef, 0.1);
1058       if ($sig == 2)
1059       {
1060         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1061       }
1062       $proc{$_}->{"sent_$sig"} = time;
1063       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1064     }
1065   }
1066 }
1067
1068
1069 sub fhbits
1070 {
1071   my($bits);
1072   for (@_) {
1073     vec($bits,fileno($_),1) = 1;
1074   }
1075   $bits;
1076 }
1077
1078
1079 sub Log                         # ($jobstep_id, $logmessage)
1080 {
1081   if ($_[1] =~ /\n/) {
1082     for my $line (split (/\n/, $_[1])) {
1083       Log ($_[0], $line);
1084     }
1085     return;
1086   }
1087   my $fh = select STDERR; $|=1; select $fh;
1088   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1089   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1090   $message .= "\n";
1091   my $datetime;
1092   if ($metastream || -t STDERR) {
1093     my @gmtime = gmtime;
1094     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1095                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1096   }
1097   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1098
1099   return if !$metastream;
1100   $metastream->write_data ($datetime . " " . $message);
1101 }
1102
1103
1104 sub croak
1105 {
1106   my ($package, $file, $line) = caller;
1107   my $message = "@_ at $file line $line\n";
1108   Log (undef, $message);
1109   freeze() if @jobstep_todo;
1110   collate_output() if @jobstep_todo;
1111   cleanup();
1112   save_meta() if $metastream;
1113   die;
1114 }
1115
1116
1117 sub cleanup
1118 {
1119   return if !$job_has_uuid;
1120   $Job->reload;
1121   $Job->{'running'} = 0;
1122   $Job->{'success'} = 0;
1123   $Job->{'finished_at'} = gmtime;
1124   $Job->save;
1125 }
1126
1127
1128 sub save_meta
1129 {
1130   my $justcheckpoint = shift; # false if this will be the last meta saved
1131   my $m = $metastream;
1132   $m = $m->copy if $justcheckpoint;
1133   $m->write_finish;
1134   my $loglocator = $m->as_key;
1135   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1136   Log (undef, "meta key is $loglocator");
1137   $Job->{'log'} = $loglocator;
1138   $Job->save if $job_has_uuid;
1139 }
1140
1141
1142 sub freeze_if_want_freeze
1143 {
1144   if ($main::please_freeze)
1145   {
1146     release_allocation();
1147     if (@_)
1148     {
1149       # kill some srun procs before freeze+stop
1150       map { $proc{$_} = {} } @_;
1151       while (%proc)
1152       {
1153         killem (keys %proc);
1154         select (undef, undef, undef, 0.1);
1155         my $died;
1156         while (($died = waitpid (-1, WNOHANG)) > 0)
1157         {
1158           delete $proc{$died};
1159         }
1160       }
1161     }
1162     freeze();
1163     collate_output();
1164     cleanup();
1165     save_meta();
1166     exit 0;
1167   }
1168 }
1169
1170
1171 sub freeze
1172 {
1173   Log (undef, "Freeze not implemented");
1174   return;
1175 }
1176
1177
1178 sub thaw
1179 {
1180   croak ("Thaw not implemented");
1181
1182   my $whc;
1183   my $key = shift;
1184   Log (undef, "thaw from $key");
1185
1186   @jobstep = ();
1187   @jobstep_done = ();
1188   @jobstep_todo = ();
1189   @jobstep_tomerge = ();
1190   $jobstep_tomerge_level = 0;
1191   my $frozenjob = {};
1192
1193   my $stream = new Warehouse::Stream ( whc => $whc,
1194                                        hash => [split (",", $key)] );
1195   $stream->rewind;
1196   while (my $dataref = $stream->read_until (undef, "\n\n"))
1197   {
1198     if ($$dataref =~ /^job /)
1199     {
1200       foreach (split ("\n", $$dataref))
1201       {
1202         my ($k, $v) = split ("=", $_, 2);
1203         $frozenjob->{$k} = freezeunquote ($v);
1204       }
1205       next;
1206     }
1207
1208     if ($$dataref =~ /^merge (\d+) (.*)/)
1209     {
1210       $jobstep_tomerge_level = $1;
1211       @jobstep_tomerge
1212           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1213       next;
1214     }
1215
1216     my $Jobstep = { };
1217     foreach (split ("\n", $$dataref))
1218     {
1219       my ($k, $v) = split ("=", $_, 2);
1220       $Jobstep->{$k} = freezeunquote ($v) if $k;
1221     }
1222     $Jobstep->{attempts} = 0;
1223     push @jobstep, $Jobstep;
1224
1225     if ($Jobstep->{exitcode} eq "0")
1226     {
1227       push @jobstep_done, $#jobstep;
1228     }
1229     else
1230     {
1231       push @jobstep_todo, $#jobstep;
1232     }
1233   }
1234
1235   foreach (qw (script script_version script_parameters))
1236   {
1237     $Job->{$_} = $frozenjob->{$_};
1238   }
1239   $Job->save if $job_has_uuid;
1240 }
1241
1242
1243 sub freezequote
1244 {
1245   my $s = shift;
1246   $s =~ s/\\/\\\\/g;
1247   $s =~ s/\n/\\n/g;
1248   return $s;
1249 }
1250
1251
1252 sub freezeunquote
1253 {
1254   my $s = shift;
1255   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1256   return $s;
1257 }
1258
1259
1260 sub srun
1261 {
1262   my $srunargs = shift;
1263   my $execargs = shift;
1264   my $opts = shift || {};
1265   my $stdin = shift;
1266   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1267   print STDERR (join (" ",
1268                       map { / / ? "'$_'" : $_ }
1269                       (@$args)),
1270                 "\n")
1271       if $ENV{CRUNCH_DEBUG};
1272
1273   if (defined $stdin) {
1274     my $child = open STDIN, "-|";
1275     defined $child or die "no fork: $!";
1276     if ($child == 0) {
1277       print $stdin or die $!;
1278       close STDOUT or die $!;
1279       exit 0;
1280     }
1281   }
1282
1283   return system (@$args) if $opts->{fork};
1284
1285   exec @$args;
1286   warn "ENV size is ".length(join(" ",%ENV));
1287   die "exec failed: $!: @$args";
1288 }
1289
1290
1291 sub ban_node_by_slot {
1292   # Don't start any new jobsteps on this node for 60 seconds
1293   my $slotid = shift;
1294   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1295   $slot[$slotid]->{node}->{hold_count}++;
1296   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1297 }
1298
1299 __DATA__
1300 #!/usr/bin/perl
1301
1302 # checkout-and-build
1303
1304 use Fcntl ':flock';
1305
1306 my $destdir = $ENV{"CRUNCH_SRC"};
1307 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1308 my $repo = $ENV{"CRUNCH_SRC_URL"};
1309
1310 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1311 flock L, LOCK_EX;
1312 if (readlink ("$destdir.commit") eq $commit) {
1313     exit 0;
1314 }
1315
1316 unlink "$destdir.commit";
1317 open STDOUT, ">", "$destdir.log";
1318 open STDERR, ">&STDOUT";
1319
1320 mkdir $destdir;
1321 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1322 print TARX <DATA>;
1323 if(!close(TARX)) {
1324   die "'tar -C $destdir -xf -' exited $?: $!";
1325 }
1326
1327 my $pwd;
1328 chomp ($pwd = `pwd`);
1329 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1330 mkdir $install_dir;
1331 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1332     # Old version
1333     shell_or_die ("./tests/autotests.sh", $install_dir);
1334 } elsif (-e "./install.sh") {
1335     shell_or_die ("./install.sh", $install_dir);
1336 }
1337
1338 if ($commit) {
1339     unlink "$destdir.commit.new";
1340     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1341     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1342 }
1343
1344 close L;
1345
1346 exit 0;
1347
1348 sub shell_or_die
1349 {
1350   if ($ENV{"DEBUG"}) {
1351     print STDERR "@_\n";
1352   }
1353   system (@_) == 0
1354       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1355 }
1356
1357 __DATA__