4601c5a477b00a544af3f0ef604b964b3bddc9f1
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 if ($ENV{"USER"} ne "crunch" && $< != 0) {
80   # use a tmp dir unique for my uid
81   $ENV{"CRUNCH_TMP"} .= "-$<";
82 }
83 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
84 $ENV{"CRUNCH_TMP"} = $ENV{"JOB_WORK"}; # deprecated
85 mkdir ($ENV{"JOB_WORK"});
86
87 my $force_unlock;
88 my $git_dir;
89 my $jobspec;
90 my $job_api_token;
91 my $resume_stash;
92 GetOptions('force-unlock' => \$force_unlock,
93            'git-dir=s' => \$git_dir,
94            'job=s' => \$jobspec,
95            'job-api-token=s' => \$job_api_token,
96            'resume-stash=s' => \$resume_stash,
97     );
98
99 if (defined $job_api_token) {
100   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
101 }
102
103 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
104 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
105 my $local_job = !$job_has_uuid;
106
107
108 $SIG{'HUP'} = sub
109 {
110   1;
111 };
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new;
124 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
125 $metastream->clear;
126 $metastream->write_start('log.txt');
127
128 my $User = $arv->{'users'}->{'current'}->execute;
129
130 my $Job = {};
131 my $job_id;
132 my $dbh;
133 my $sth;
134 if ($job_has_uuid)
135 {
136   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
137   if (!$force_unlock) {
138     if ($Job->{'is_locked_by_uuid'}) {
139       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
140     }
141     if ($Job->{'success'} ne undef) {
142       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
143     }
144     if ($Job->{'running'}) {
145       croak("Job 'running' flag is already set");
146     }
147     if ($Job->{'started_at'}) {
148       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
149     }
150   }
151 }
152 else
153 {
154   $Job = JSON::decode_json($jobspec);
155
156   if (!$resume_stash)
157   {
158     map { croak ("No $_ specified") unless $Job->{$_} }
159     qw(script script_version script_parameters);
160   }
161
162   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
163   $Job->{'started_at'} = gmtime;
164
165   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
166
167   $job_has_uuid = 1;
168 }
169 $job_id = $Job->{'uuid'};
170
171
172
173 $Job->{'resource_limits'} ||= {};
174 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
175 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
176
177
178 Log (undef, "check slurm allocation");
179 my @slot;
180 my @node;
181 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
182 my @sinfo;
183 if (!$have_slurm)
184 {
185   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
186   push @sinfo, "$localcpus localhost";
187 }
188 if (exists $ENV{SLURM_NODELIST})
189 {
190   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
191 }
192 foreach (@sinfo)
193 {
194   my ($ncpus, $slurm_nodelist) = split;
195   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
196
197   my @nodelist;
198   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
199   {
200     my $nodelist = $1;
201     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
202     {
203       my $ranges = $1;
204       foreach (split (",", $ranges))
205       {
206         my ($a, $b);
207         if (/(\d+)-(\d+)/)
208         {
209           $a = $1;
210           $b = $2;
211         }
212         else
213         {
214           $a = $_;
215           $b = $_;
216         }
217         push @nodelist, map {
218           my $n = $nodelist;
219           $n =~ s/\[[-,\d]+\]/$_/;
220           $n;
221         } ($a..$b);
222       }
223     }
224     else
225     {
226       push @nodelist, $nodelist;
227     }
228   }
229   foreach my $nodename (@nodelist)
230   {
231     Log (undef, "node $nodename - $ncpus slots");
232     my $node = { name => $nodename,
233                  ncpus => $ncpus,
234                  losing_streak => 0,
235                  hold_until => 0 };
236     foreach my $cpu (1..$ncpus)
237     {
238       push @slot, { node => $node,
239                     cpu => $cpu };
240     }
241   }
242   push @node, @nodelist;
243 }
244
245
246
247 # Ensure that we get one jobstep running on each allocated node before
248 # we start overloading nodes with concurrent steps
249
250 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
251
252
253
254 my $jobmanager_id;
255 if ($job_has_uuid)
256 {
257   # Claim this job, and make sure nobody else does
258
259   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
260   $Job->{'started_at'} = gmtime;
261   $Job->{'running'} = 1;
262   $Job->{'success'} = undef;
263   $Job->{'tasks_summary'} = { 'failed' => 0,
264                               'todo' => 1,
265                               'running' => 0,
266                               'done' => 0 };
267   if ($job_has_uuid) {
268     unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269       croak("Error while updating / locking job");
270     }
271   }
272 }
273
274
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
286
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
290
291
292 my @jobstep;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
297 my $squeue_checked;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'attempts' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 }
333 else
334 {
335   do {
336     local $/ = undef;
337     $build_script = <DATA>;
338   };
339   Log (undef, "Install revision ".$Job->{script_version});
340   my $nodelist = join(",", @node);
341
342   # Clean out crunch_tmp/work and crunch_tmp/opt
343
344   my $cleanpid = fork();
345   if ($cleanpid == 0)
346   {
347     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
349     exit (1);
350   }
351   while (1)
352   {
353     last if $cleanpid == waitpid (-1, WNOHANG);
354     freeze_if_want_freeze ($cleanpid);
355     select (undef, undef, undef, 0.1);
356   }
357   Log (undef, "Clean-work-dir exited $?");
358
359   # Install requested code version
360
361   my @execargs;
362   my @srunargs = ("srun",
363                   "--nodelist=$nodelist",
364                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
365
366   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
369
370   my $commit;
371   my $git_archive;
372   my $treeish = $Job->{'script_version'};
373   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374   # Todo: let script_version specify repository instead of expecting
375   # parent process to figure it out.
376   $ENV{"CRUNCH_SRC_URL"} = $repo;
377
378   # Create/update our clone of the remote git repo
379
380   if (!-d $ENV{"CRUNCH_SRC"}) {
381     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382         or croak ("git clone $repo failed: exit ".($?>>8));
383     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
384   }
385   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
386
387   # If this looks like a subversion r#, look for it in git-svn commit messages
388
389   if ($treeish =~ m{^\d{1,4}$}) {
390     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
391     chomp $gitlog;
392     if ($gitlog =~ /^[a-f0-9]{40}$/) {
393       $commit = $gitlog;
394       Log (undef, "Using commit $commit for script_version $treeish");
395     }
396   }
397
398   # If that didn't work, try asking git to look it up as a tree-ish.
399
400   if (!defined $commit) {
401
402     my $cooked_treeish = $treeish;
403     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404       # Looks like a git branch name -- make sure git knows it's
405       # relative to the remote repo
406       $cooked_treeish = "origin/$treeish";
407     }
408
409     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
410     chomp $found;
411     if ($found =~ /^[0-9a-f]{40}$/s) {
412       $commit = $found;
413       if ($commit ne $treeish) {
414         # Make sure we record the real commit id in the database,
415         # frozentokey, logs, etc. -- instead of an abbreviation or a
416         # branch name which can become ambiguous or point to a
417         # different commit in the future.
418         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419         Log (undef, "Using commit $commit for tree-ish $treeish");
420         if ($commit ne $treeish) {
421           $Job->{'script_version'} = $commit;
422           !$job_has_uuid or $Job->save() or croak("Error while updating job");
423         }
424       }
425     }
426   }
427
428   if (defined $commit) {
429     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430     @execargs = ("sh", "-c",
431                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
432     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
433   }
434   else {
435     croak ("could not figure out commit id for $treeish");
436   }
437
438   my $installpid = fork();
439   if ($installpid == 0)
440   {
441     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
442     exit (1);
443   }
444   while (1)
445   {
446     last if $installpid == waitpid (-1, WNOHANG);
447     freeze_if_want_freeze ($installpid);
448     select (undef, undef, undef, 0.1);
449   }
450   Log (undef, "Install exited $?");
451 }
452
453
454
455 foreach (qw (script script_version script_parameters resource_limits))
456 {
457   Log (undef,
458        "$_ " .
459        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
460 }
461 foreach (split (/\n/, $Job->{knobs}))
462 {
463   Log (undef, "knob " . $_);
464 }
465
466
467
468 my $success;
469
470
471
472 ONELEVEL:
473
474 my $thisround_succeeded = 0;
475 my $thisround_failed = 0;
476 my $thisround_failed_multiple = 0;
477
478 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
479                        or $a <=> $b } @jobstep_todo;
480 my $level = $jobstep[$jobstep_todo[0]]->{level};
481 Log (undef, "start level $level");
482
483
484
485 my %proc;
486 my @freeslot = (0..$#slot);
487 my @holdslot;
488 my %reader;
489 my $progress_is_dirty = 1;
490 my $progress_stats_updated = 0;
491
492 update_progress_stats();
493
494
495
496 THISROUND:
497 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
498 {
499   $main::please_continue = 0;
500
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507   if ($Jobstep->{attempts} > 9)
508   {
509     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
510     $success = 0;
511     last THISROUND;
512   }
513
514   pipe $reader{$id}, "writer" or croak ($!);
515   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
517
518   my $childslot = $freeslot[0];
519   my $childnode = $slot[$childslot]->{node};
520   my $childslotname = join (".",
521                             $slot[$childslot]->{node}->{name},
522                             $slot[$childslot]->{cpu});
523   my $childpid = fork();
524   if ($childpid == 0)
525   {
526     $SIG{'INT'} = 'DEFAULT';
527     $SIG{'QUIT'} = 'DEFAULT';
528     $SIG{'TERM'} = 'DEFAULT';
529
530     foreach (values (%reader))
531     {
532       close($_);
533     }
534     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535     open(STDOUT,">&writer");
536     open(STDERR,">&writer");
537
538     undef $dbh;
539     undef $sth;
540
541     delete $ENV{"GNUPGHOME"};
542     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543     $ENV{"TASK_QSEQUENCE"} = $id;
544     $ENV{"TASK_SEQUENCE"} = $level;
545     $ENV{"JOB_SCRIPT"} = $Job->{script};
546     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547       $param =~ tr/a-z/A-Z/;
548       $ENV{"JOB_PARAMETER_$param"} = $value;
549     }
550     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
555
556     $ENV{"GZIP"} = "-n";
557
558     my @srunargs = (
559       "srun",
560       "--nodelist=".$childnode->{name},
561       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562       "--job-name=$job_id.$id.$$",
563         );
564     my @execargs = qw(sh);
565     my $build_script_to_send = "";
566     my $command =
567         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568         ."&& cd $ENV{CRUNCH_TMP} ";
569     if ($build_script)
570     {
571       $build_script_to_send = $build_script;
572       $command .=
573           "&& perl -";
574     }
575     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
576     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
577     $command .=
578         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
579     my @execargs = ('bash', '-c', $command);
580     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
581     exit (1);
582   }
583   close("writer");
584   if (!defined $childpid)
585   {
586     close $reader{$id};
587     delete $reader{$id};
588     next;
589   }
590   shift @freeslot;
591   $proc{$childpid} = { jobstep => $id,
592                        time => time,
593                        slot => $childslot,
594                        jobstepname => "$job_id.$id.$childpid",
595                      };
596   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
597   $slot[$childslot]->{pid} = $childpid;
598
599   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
600   Log ($id, "child $childpid started on $childslotname");
601   $Jobstep->{attempts} ++;
602   $Jobstep->{starttime} = time;
603   $Jobstep->{node} = $childnode->{name};
604   $Jobstep->{slotindex} = $childslot;
605   delete $Jobstep->{stderr};
606   delete $Jobstep->{finishtime};
607
608   splice @jobstep_todo, $todo_ptr, 1;
609   --$todo_ptr;
610
611   $progress_is_dirty = 1;
612
613   while (!@freeslot
614          ||
615          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
616   {
617     last THISROUND if $main::please_freeze;
618     if ($main::please_info)
619     {
620       $main::please_info = 0;
621       freeze();
622       collate_output();
623       save_meta(1);
624       update_progress_stats();
625     }
626     my $gotsome
627         = readfrompipes ()
628         + reapchildren ();
629     if (!$gotsome)
630     {
631       check_squeue();
632       update_progress_stats();
633       select (undef, undef, undef, 0.1);
634     }
635     elsif (time - $progress_stats_updated >= 30)
636     {
637       update_progress_stats();
638     }
639     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
640         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
641     {
642       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
643           .($thisround_failed+$thisround_succeeded)
644           .") -- giving up on this round";
645       Log (undef, $message);
646       last THISROUND;
647     }
648
649     # move slots from freeslot to holdslot (or back to freeslot) if necessary
650     for (my $i=$#freeslot; $i>=0; $i--) {
651       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
652         push @holdslot, (splice @freeslot, $i, 1);
653       }
654     }
655     for (my $i=$#holdslot; $i>=0; $i--) {
656       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
657         push @freeslot, (splice @holdslot, $i, 1);
658       }
659     }
660
661     # give up if no nodes are succeeding
662     if (!grep { $_->{node}->{losing_streak} == 0 &&
663                     $_->{node}->{hold_count} < 4 } @slot) {
664       my $message = "Every node has failed -- giving up on this round";
665       Log (undef, $message);
666       last THISROUND;
667     }
668   }
669 }
670
671
672 push @freeslot, splice @holdslot;
673 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
674
675
676 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
677 while (%proc)
678 {
679   goto THISROUND if $main::please_continue;
680   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
681   readfrompipes ();
682   if (!reapchildren())
683   {
684     check_squeue();
685     update_progress_stats();
686     select (undef, undef, undef, 0.1);
687     killem (keys %proc) if $main::please_freeze;
688   }
689 }
690
691 update_progress_stats();
692 freeze_if_want_freeze();
693
694
695 if (!defined $success)
696 {
697   if (@jobstep_todo &&
698       $thisround_succeeded == 0 &&
699       ($thisround_failed == 0 || $thisround_failed > 4))
700   {
701     my $message = "stop because $thisround_failed tasks failed and none succeeded";
702     Log (undef, $message);
703     $success = 0;
704   }
705   if (!@jobstep_todo)
706   {
707     $success = 1;
708   }
709 }
710
711 goto ONELEVEL if !defined $success;
712
713
714 release_allocation();
715 freeze();
716 $Job->reload;
717 $Job->{'output'} = &collate_output();
718 $Job->{'running'} = 0;
719 $Job->{'success'} = $Job->{'output'} && $success;
720 $Job->{'finished_at'} = gmtime;
721 $Job->save if $job_has_uuid;
722
723 if ($Job->{'output'})
724 {
725   eval {
726     my $manifest_text = capturex("whget", $Job->{'output'});
727     $arv->{'collections'}->{'create'}->execute('collection' => {
728       'uuid' => $Job->{'output'},
729       'manifest_text' => $manifest_text,
730     });
731   };
732   if ($@) {
733     Log (undef, "Failed to register output manifest: $@");
734   }
735 }
736
737 Log (undef, "finish");
738
739 save_meta();
740 exit 0;
741
742
743
744 sub update_progress_stats
745 {
746   $progress_stats_updated = time;
747   return if !$progress_is_dirty;
748   my ($todo, $done, $running) = (scalar @jobstep_todo,
749                                  scalar @jobstep_done,
750                                  scalar @slot - scalar @freeslot - scalar @holdslot);
751   $Job->{'tasks_summary'} ||= {};
752   $Job->{'tasks_summary'}->{'todo'} = $todo;
753   $Job->{'tasks_summary'}->{'done'} = $done;
754   $Job->{'tasks_summary'}->{'running'} = $running;
755   $Job->save if $job_has_uuid;
756   Log (undef, "status: $done done, $running running, $todo todo");
757   $progress_is_dirty = 0;
758 }
759
760
761
762 sub reapchildren
763 {
764   my $pid = waitpid (-1, WNOHANG);
765   return 0 if $pid <= 0;
766
767   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
768                   . "."
769                   . $slot[$proc{$pid}->{slot}]->{cpu});
770   my $jobstepid = $proc{$pid}->{jobstep};
771   my $elapsed = time - $proc{$pid}->{time};
772   my $Jobstep = $jobstep[$jobstepid];
773
774   my $exitcode = $?;
775   my $exitinfo = "exit $exitcode";
776   $Jobstep->{'arvados_task'}->reload;
777   my $success = $Jobstep->{'arvados_task'}->{success};
778
779   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
780
781   if (!defined $success) {
782     # task did not indicate one way or the other --> fail
783     $Jobstep->{'arvados_task'}->{success} = 0;
784     $Jobstep->{'arvados_task'}->save;
785     $success = 0;
786   }
787
788   if (!$success)
789   {
790     my $no_incr_attempts;
791     $no_incr_attempts = 1 if $Jobstep->{node_fail};
792
793     ++$thisround_failed;
794     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
795
796     # Check for signs of a failed or misconfigured node
797     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
798         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
799       # Don't count this against jobstep failure thresholds if this
800       # node is already suspected faulty and srun exited quickly
801       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
802           $elapsed < 5 &&
803           $Jobstep->{attempts} > 1) {
804         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
805         $no_incr_attempts = 1;
806         --$Jobstep->{attempts};
807       }
808       ban_node_by_slot($proc{$pid}->{slot});
809     }
810
811     push @jobstep_todo, $jobstepid;
812     Log ($jobstepid, "failure in $elapsed seconds");
813
814     --$Jobstep->{attempts} if $no_incr_attempts;
815     $Job->{'tasks_summary'}->{'failed'}++;
816   }
817   else
818   {
819     ++$thisround_succeeded;
820     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
821     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
822     push @jobstep_done, $jobstepid;
823     Log ($jobstepid, "success in $elapsed seconds");
824   }
825   $Jobstep->{exitcode} = $exitcode;
826   $Jobstep->{finishtime} = time;
827   process_stderr ($jobstepid, $success);
828   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
829
830   close $reader{$jobstepid};
831   delete $reader{$jobstepid};
832   delete $slot[$proc{$pid}->{slot}]->{pid};
833   push @freeslot, $proc{$pid}->{slot};
834   delete $proc{$pid};
835
836   # Load new tasks
837   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
838     'where' => {
839       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
840     },
841     'order' => 'qsequence'
842   );
843   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
844     my $jobstep = {
845       'level' => $arvados_task->{'sequence'},
846       'attempts' => 0,
847       'arvados_task' => $arvados_task
848     };
849     push @jobstep, $jobstep;
850     push @jobstep_todo, $#jobstep;
851   }
852
853   $progress_is_dirty = 1;
854   1;
855 }
856
857
858 sub check_squeue
859 {
860   # return if the kill list was checked <4 seconds ago
861   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
862   {
863     return;
864   }
865   $squeue_kill_checked = time;
866
867   # use killem() on procs whose killtime is reached
868   for (keys %proc)
869   {
870     if (exists $proc{$_}->{killtime}
871         && $proc{$_}->{killtime} <= time)
872     {
873       killem ($_);
874     }
875   }
876
877   # return if the squeue was checked <60 seconds ago
878   if (defined $squeue_checked && $squeue_checked > time - 60)
879   {
880     return;
881   }
882   $squeue_checked = time;
883
884   if (!$have_slurm)
885   {
886     # here is an opportunity to check for mysterious problems with local procs
887     return;
888   }
889
890   # get a list of steps still running
891   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
892   chop @squeue;
893   if ($squeue[-1] ne "ok")
894   {
895     return;
896   }
897   pop @squeue;
898
899   # which of my jobsteps are running, according to squeue?
900   my %ok;
901   foreach (@squeue)
902   {
903     if (/^(\d+)\.(\d+) (\S+)/)
904     {
905       if ($1 eq $ENV{SLURM_JOBID})
906       {
907         $ok{$3} = 1;
908       }
909     }
910   }
911
912   # which of my active child procs (>60s old) were not mentioned by squeue?
913   foreach (keys %proc)
914   {
915     if ($proc{$_}->{time} < time - 60
916         && !exists $ok{$proc{$_}->{jobstepname}}
917         && !exists $proc{$_}->{killtime})
918     {
919       # kill this proc if it hasn't exited in 30 seconds
920       $proc{$_}->{killtime} = time + 30;
921     }
922   }
923 }
924
925
926 sub release_allocation
927 {
928   if ($have_slurm)
929   {
930     Log (undef, "release job allocation");
931     system "scancel $ENV{SLURM_JOBID}";
932   }
933 }
934
935
936 sub readfrompipes
937 {
938   my $gotsome = 0;
939   foreach my $job (keys %reader)
940   {
941     my $buf;
942     while (0 < sysread ($reader{$job}, $buf, 8192))
943     {
944       print STDERR $buf if $ENV{CRUNCH_DEBUG};
945       $jobstep[$job]->{stderr} .= $buf;
946       preprocess_stderr ($job);
947       if (length ($jobstep[$job]->{stderr}) > 16384)
948       {
949         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
950       }
951       $gotsome = 1;
952     }
953   }
954   return $gotsome;
955 }
956
957
958 sub preprocess_stderr
959 {
960   my $job = shift;
961
962   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
963     my $line = $1;
964     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
965     Log ($job, "stderr $line");
966     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
967       # whoa.
968       $main::please_freeze = 1;
969     }
970     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
971       $jobstep[$job]->{node_fail} = 1;
972       ban_node_by_slot($jobstep[$job]->{slotindex});
973     }
974   }
975 }
976
977
978 sub process_stderr
979 {
980   my $job = shift;
981   my $success = shift;
982   preprocess_stderr ($job);
983
984   map {
985     Log ($job, "stderr $_");
986   } split ("\n", $jobstep[$job]->{stderr});
987 }
988
989
990 sub collate_output
991 {
992   my $whc = Warehouse->new;
993   Log (undef, "collate");
994   $whc->write_start (1);
995   my $joboutput;
996   for (@jobstep)
997   {
998     next if (!exists $_->{'arvados_task'}->{output} ||
999              !$_->{'arvados_task'}->{'success'} ||
1000              $_->{'exitcode'} != 0);
1001     my $output = $_->{'arvados_task'}->{output};
1002     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1003     {
1004       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1005       $whc->write_data ($output);
1006     }
1007     elsif (@jobstep == 1)
1008     {
1009       $joboutput = $output;
1010       $whc->write_finish;
1011     }
1012     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1013     {
1014       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1015       $whc->write_data ($outblock);
1016     }
1017     else
1018     {
1019       my $errstr = $whc->errstr;
1020       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1021       $success = 0;
1022     }
1023   }
1024   $joboutput = $whc->write_finish if !defined $joboutput;
1025   if ($joboutput)
1026   {
1027     Log (undef, "output $joboutput");
1028     $Job->{'output'} = $joboutput;
1029     $Job->save if $job_has_uuid;
1030   }
1031   else
1032   {
1033     Log (undef, "output undef");
1034   }
1035   return $joboutput;
1036 }
1037
1038
1039 sub killem
1040 {
1041   foreach (@_)
1042   {
1043     my $sig = 2;                # SIGINT first
1044     if (exists $proc{$_}->{"sent_$sig"} &&
1045         time - $proc{$_}->{"sent_$sig"} > 4)
1046     {
1047       $sig = 15;                # SIGTERM if SIGINT doesn't work
1048     }
1049     if (exists $proc{$_}->{"sent_$sig"} &&
1050         time - $proc{$_}->{"sent_$sig"} > 4)
1051     {
1052       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1053     }
1054     if (!exists $proc{$_}->{"sent_$sig"})
1055     {
1056       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1057       kill $sig, $_;
1058       select (undef, undef, undef, 0.1);
1059       if ($sig == 2)
1060       {
1061         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1062       }
1063       $proc{$_}->{"sent_$sig"} = time;
1064       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1065     }
1066   }
1067 }
1068
1069
1070 sub fhbits
1071 {
1072   my($bits);
1073   for (@_) {
1074     vec($bits,fileno($_),1) = 1;
1075   }
1076   $bits;
1077 }
1078
1079
1080 sub Log                         # ($jobstep_id, $logmessage)
1081 {
1082   if ($_[1] =~ /\n/) {
1083     for my $line (split (/\n/, $_[1])) {
1084       Log ($_[0], $line);
1085     }
1086     return;
1087   }
1088   my $fh = select STDERR; $|=1; select $fh;
1089   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1090   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1091   $message .= "\n";
1092   my $datetime;
1093   if ($metastream || -t STDERR) {
1094     my @gmtime = gmtime;
1095     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1096                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1097   }
1098   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1099
1100   return if !$metastream;
1101   $metastream->write_data ($datetime . " " . $message);
1102 }
1103
1104
1105 sub croak
1106 {
1107   my ($package, $file, $line) = caller;
1108   my $message = "@_ at $file line $line\n";
1109   Log (undef, $message);
1110   freeze() if @jobstep_todo;
1111   collate_output() if @jobstep_todo;
1112   cleanup();
1113   save_meta() if $metastream;
1114   die;
1115 }
1116
1117
1118 sub cleanup
1119 {
1120   return if !$job_has_uuid;
1121   $Job->reload;
1122   $Job->{'running'} = 0;
1123   $Job->{'success'} = 0;
1124   $Job->{'finished_at'} = gmtime;
1125   $Job->save;
1126 }
1127
1128
1129 sub save_meta
1130 {
1131   my $justcheckpoint = shift; # false if this will be the last meta saved
1132   my $m = $metastream;
1133   $m = $m->copy if $justcheckpoint;
1134   $m->write_finish;
1135   my $loglocator = $m->as_key;
1136   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1137   Log (undef, "meta key is $loglocator");
1138   $Job->{'log'} = $loglocator;
1139   $Job->save if $job_has_uuid;
1140 }
1141
1142
1143 sub freeze_if_want_freeze
1144 {
1145   if ($main::please_freeze)
1146   {
1147     release_allocation();
1148     if (@_)
1149     {
1150       # kill some srun procs before freeze+stop
1151       map { $proc{$_} = {} } @_;
1152       while (%proc)
1153       {
1154         killem (keys %proc);
1155         select (undef, undef, undef, 0.1);
1156         my $died;
1157         while (($died = waitpid (-1, WNOHANG)) > 0)
1158         {
1159           delete $proc{$died};
1160         }
1161       }
1162     }
1163     freeze();
1164     collate_output();
1165     cleanup();
1166     save_meta();
1167     exit 0;
1168   }
1169 }
1170
1171
1172 sub freeze
1173 {
1174   Log (undef, "Freeze not implemented");
1175   return;
1176 }
1177
1178
1179 sub thaw
1180 {
1181   croak ("Thaw not implemented");
1182
1183   my $whc;
1184   my $key = shift;
1185   Log (undef, "thaw from $key");
1186
1187   @jobstep = ();
1188   @jobstep_done = ();
1189   @jobstep_todo = ();
1190   @jobstep_tomerge = ();
1191   $jobstep_tomerge_level = 0;
1192   my $frozenjob = {};
1193
1194   my $stream = new Warehouse::Stream ( whc => $whc,
1195                                        hash => [split (",", $key)] );
1196   $stream->rewind;
1197   while (my $dataref = $stream->read_until (undef, "\n\n"))
1198   {
1199     if ($$dataref =~ /^job /)
1200     {
1201       foreach (split ("\n", $$dataref))
1202       {
1203         my ($k, $v) = split ("=", $_, 2);
1204         $frozenjob->{$k} = freezeunquote ($v);
1205       }
1206       next;
1207     }
1208
1209     if ($$dataref =~ /^merge (\d+) (.*)/)
1210     {
1211       $jobstep_tomerge_level = $1;
1212       @jobstep_tomerge
1213           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1214       next;
1215     }
1216
1217     my $Jobstep = { };
1218     foreach (split ("\n", $$dataref))
1219     {
1220       my ($k, $v) = split ("=", $_, 2);
1221       $Jobstep->{$k} = freezeunquote ($v) if $k;
1222     }
1223     $Jobstep->{attempts} = 0;
1224     push @jobstep, $Jobstep;
1225
1226     if ($Jobstep->{exitcode} eq "0")
1227     {
1228       push @jobstep_done, $#jobstep;
1229     }
1230     else
1231     {
1232       push @jobstep_todo, $#jobstep;
1233     }
1234   }
1235
1236   foreach (qw (script script_version script_parameters))
1237   {
1238     $Job->{$_} = $frozenjob->{$_};
1239   }
1240   $Job->save if $job_has_uuid;
1241 }
1242
1243
1244 sub freezequote
1245 {
1246   my $s = shift;
1247   $s =~ s/\\/\\\\/g;
1248   $s =~ s/\n/\\n/g;
1249   return $s;
1250 }
1251
1252
1253 sub freezeunquote
1254 {
1255   my $s = shift;
1256   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1257   return $s;
1258 }
1259
1260
1261 sub srun
1262 {
1263   my $srunargs = shift;
1264   my $execargs = shift;
1265   my $opts = shift || {};
1266   my $stdin = shift;
1267   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1268   print STDERR (join (" ",
1269                       map { / / ? "'$_'" : $_ }
1270                       (@$args)),
1271                 "\n")
1272       if $ENV{CRUNCH_DEBUG};
1273
1274   if (defined $stdin) {
1275     my $child = open STDIN, "-|";
1276     defined $child or die "no fork: $!";
1277     if ($child == 0) {
1278       print $stdin or die $!;
1279       close STDOUT or die $!;
1280       exit 0;
1281     }
1282   }
1283
1284   return system (@$args) if $opts->{fork};
1285
1286   exec @$args;
1287   warn "ENV size is ".length(join(" ",%ENV));
1288   die "exec failed: $!: @$args";
1289 }
1290
1291
1292 sub ban_node_by_slot {
1293   # Don't start any new jobsteps on this node for 60 seconds
1294   my $slotid = shift;
1295   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1296   $slot[$slotid]->{node}->{hold_count}++;
1297   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1298 }
1299
1300 __DATA__
1301 #!/usr/bin/perl
1302
1303 # checkout-and-build
1304
1305 use Fcntl ':flock';
1306
1307 my $destdir = $ENV{"CRUNCH_SRC"};
1308 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1309 my $repo = $ENV{"CRUNCH_SRC_URL"};
1310
1311 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1312 flock L, LOCK_EX;
1313 if (readlink ("$destdir.commit") eq $commit) {
1314     exit 0;
1315 }
1316
1317 unlink "$destdir.commit";
1318 open STDOUT, ">", "$destdir.log";
1319 open STDERR, ">&STDOUT";
1320
1321 mkdir $destdir;
1322 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1323 print TARX <DATA>;
1324 if(!close(TARX)) {
1325   die "'tar -C $destdir -xf -' exited $?: $!";
1326 }
1327
1328 my $pwd;
1329 chomp ($pwd = `pwd`);
1330 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1331 mkdir $install_dir;
1332 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1333     # Old version
1334     shell_or_die ("./tests/autotests.sh", $install_dir);
1335 } elsif (-e "./install.sh") {
1336     shell_or_die ("./install.sh", $install_dir);
1337 }
1338
1339 if ($commit) {
1340     unlink "$destdir.commit.new";
1341     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1342     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1343 }
1344
1345 close L;
1346
1347 exit 0;
1348
1349 sub shell_or_die
1350 {
1351   if ($ENV{"DEBUG"}) {
1352     print STDERR "@_\n";
1353   }
1354   system (@_) == 0
1355       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1356 }
1357
1358 __DATA__