Allow users to cancel a running crunch job by updating cancelled_at
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
89
90 my $force_unlock;
91 my $git_dir;
92 my $jobspec;
93 my $job_api_token;
94 my $resume_stash;
95 GetOptions('force-unlock' => \$force_unlock,
96            'git-dir=s' => \$git_dir,
97            'job=s' => \$jobspec,
98            'job-api-token=s' => \$job_api_token,
99            'resume-stash=s' => \$resume_stash,
100     );
101
102 if (defined $job_api_token) {
103   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
104 }
105
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
109
110
111 $SIG{'USR1'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 1;
114 };
115 $SIG{'USR2'} = sub
116 {
117   $main::ENV{CRUNCH_DEBUG} = 0;
118 };
119
120
121
122 my $arv = Arvados->new;
123 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
124 $metastream->clear;
125 $metastream->write_start('log.txt');
126
127 my $User = $arv->{'users'}->{'current'}->execute;
128
129 my $Job = {};
130 my $job_id;
131 my $dbh;
132 my $sth;
133 if ($job_has_uuid)
134 {
135   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
136   if (!$force_unlock) {
137     if ($Job->{'is_locked_by_uuid'}) {
138       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139     }
140     if ($Job->{'success'} ne undef) {
141       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142     }
143     if ($Job->{'running'}) {
144       croak("Job 'running' flag is already set");
145     }
146     if ($Job->{'started_at'}) {
147       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148     }
149   }
150 }
151 else
152 {
153   $Job = JSON::decode_json($jobspec);
154
155   if (!$resume_stash)
156   {
157     map { croak ("No $_ specified") unless $Job->{$_} }
158     qw(script script_version script_parameters);
159   }
160
161   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
162   $Job->{'started_at'} = gmtime;
163
164   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
165
166   $job_has_uuid = 1;
167 }
168 $job_id = $Job->{'uuid'};
169
170
171
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
175
176
177 Log (undef, "check slurm allocation");
178 my @slot;
179 my @node;
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
181 my @sinfo;
182 if (!$have_slurm)
183 {
184   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185   push @sinfo, "$localcpus localhost";
186 }
187 if (exists $ENV{SLURM_NODELIST})
188 {
189   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
190 }
191 foreach (@sinfo)
192 {
193   my ($ncpus, $slurm_nodelist) = split;
194   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
195
196   my @nodelist;
197   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
198   {
199     my $nodelist = $1;
200     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
201     {
202       my $ranges = $1;
203       foreach (split (",", $ranges))
204       {
205         my ($a, $b);
206         if (/(\d+)-(\d+)/)
207         {
208           $a = $1;
209           $b = $2;
210         }
211         else
212         {
213           $a = $_;
214           $b = $_;
215         }
216         push @nodelist, map {
217           my $n = $nodelist;
218           $n =~ s/\[[-,\d]+\]/$_/;
219           $n;
220         } ($a..$b);
221       }
222     }
223     else
224     {
225       push @nodelist, $nodelist;
226     }
227   }
228   foreach my $nodename (@nodelist)
229   {
230     Log (undef, "node $nodename - $ncpus slots");
231     my $node = { name => $nodename,
232                  ncpus => $ncpus,
233                  losing_streak => 0,
234                  hold_until => 0 };
235     foreach my $cpu (1..$ncpus)
236     {
237       push @slot, { node => $node,
238                     cpu => $cpu };
239     }
240   }
241   push @node, @nodelist;
242 }
243
244
245
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
248
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250
251
252
253 my $jobmanager_id;
254 if ($job_has_uuid)
255 {
256   # Claim this job, and make sure nobody else does
257   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261   $Job->update_attributes('started_at' => gmtime,
262                           'running' => 1,
263                           'success' => undef,
264                           'tasks_summary' => { 'failed' => 0,
265                                                'todo' => 1,
266                                                'running' => 0,
267                                                'done' => 0 });
268 }
269
270
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
279
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
285
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
289
290
291 my @jobstep;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
296 my $squeue_checked;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299
300
301
302 if (defined $Job->{thawedfromkey})
303 {
304   thaw ($Job->{thawedfromkey});
305 }
306 else
307 {
308   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
309     'job_uuid' => $Job->{'uuid'},
310     'sequence' => 0,
311     'qsequence' => 0,
312     'parameters' => {},
313                                                           });
314   push @jobstep, { 'level' => 0,
315                    'failures' => 0,
316                    'arvados_task' => $first_task,
317                  };
318   push @jobstep_todo, 0;
319 }
320
321
322 my $build_script;
323
324
325 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
326
327 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
328 if ($skip_install)
329 {
330   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
331 }
332 else
333 {
334   do {
335     local $/ = undef;
336     $build_script = <DATA>;
337   };
338   Log (undef, "Install revision ".$Job->{script_version});
339   my $nodelist = join(",", @node);
340
341   # Clean out crunch_tmp/work and crunch_tmp/opt
342
343   my $cleanpid = fork();
344   if ($cleanpid == 0)
345   {
346     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
347           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
348     exit (1);
349   }
350   while (1)
351   {
352     last if $cleanpid == waitpid (-1, WNOHANG);
353     freeze_if_want_freeze ($cleanpid);
354     select (undef, undef, undef, 0.1);
355   }
356   Log (undef, "Clean-work-dir exited $?");
357
358   # Install requested code version
359
360   my @execargs;
361   my @srunargs = ("srun",
362                   "--nodelist=$nodelist",
363                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
364
365   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
366   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
367   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
368
369   my $commit;
370   my $git_archive;
371   my $treeish = $Job->{'script_version'};
372   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
373   # Todo: let script_version specify repository instead of expecting
374   # parent process to figure it out.
375   $ENV{"CRUNCH_SRC_URL"} = $repo;
376
377   # Create/update our clone of the remote git repo
378
379   if (!-d $ENV{"CRUNCH_SRC"}) {
380     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
381         or croak ("git clone $repo failed: exit ".($?>>8));
382     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
383   }
384   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
385
386   # If this looks like a subversion r#, look for it in git-svn commit messages
387
388   if ($treeish =~ m{^\d{1,4}$}) {
389     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
390     chomp $gitlog;
391     if ($gitlog =~ /^[a-f0-9]{40}$/) {
392       $commit = $gitlog;
393       Log (undef, "Using commit $commit for script_version $treeish");
394     }
395   }
396
397   # If that didn't work, try asking git to look it up as a tree-ish.
398
399   if (!defined $commit) {
400
401     my $cooked_treeish = $treeish;
402     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
403       # Looks like a git branch name -- make sure git knows it's
404       # relative to the remote repo
405       $cooked_treeish = "origin/$treeish";
406     }
407
408     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
409     chomp $found;
410     if ($found =~ /^[0-9a-f]{40}$/s) {
411       $commit = $found;
412       if ($commit ne $treeish) {
413         # Make sure we record the real commit id in the database,
414         # frozentokey, logs, etc. -- instead of an abbreviation or a
415         # branch name which can become ambiguous or point to a
416         # different commit in the future.
417         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
418         Log (undef, "Using commit $commit for tree-ish $treeish");
419         if ($commit ne $treeish) {
420           $Job->{'script_version'} = $commit;
421           !$job_has_uuid or
422               $Job->update_attributes('script_version' => $commit) or
423               croak("Error while updating job");
424         }
425       }
426     }
427   }
428
429   if (defined $commit) {
430     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
431     @execargs = ("sh", "-c",
432                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
433     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
434   }
435   else {
436     croak ("could not figure out commit id for $treeish");
437   }
438
439   my $installpid = fork();
440   if ($installpid == 0)
441   {
442     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
443     exit (1);
444   }
445   while (1)
446   {
447     last if $installpid == waitpid (-1, WNOHANG);
448     freeze_if_want_freeze ($installpid);
449     select (undef, undef, undef, 0.1);
450   }
451   Log (undef, "Install exited $?");
452 }
453
454
455
456 foreach (qw (script script_version script_parameters runtime_constraints))
457 {
458   Log (undef,
459        "$_ " .
460        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
461 }
462 foreach (split (/\n/, $Job->{knobs}))
463 {
464   Log (undef, "knob " . $_);
465 }
466
467
468
469 $main::success = undef;
470
471
472
473 ONELEVEL:
474
475 my $thisround_succeeded = 0;
476 my $thisround_failed = 0;
477 my $thisround_failed_multiple = 0;
478
479 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
480                        or $a <=> $b } @jobstep_todo;
481 my $level = $jobstep[$jobstep_todo[0]]->{level};
482 Log (undef, "start level $level");
483
484
485
486 my %proc;
487 my @freeslot = (0..$#slot);
488 my @holdslot;
489 my %reader;
490 my $progress_is_dirty = 1;
491 my $progress_stats_updated = 0;
492
493 update_progress_stats();
494
495
496
497 THISROUND:
498 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
499 {
500   my $id = $jobstep_todo[$todo_ptr];
501   my $Jobstep = $jobstep[$id];
502   if ($Jobstep->{level} != $level)
503   {
504     next;
505   }
506
507   pipe $reader{$id}, "writer" or croak ($!);
508   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
510
511   my $childslot = $freeslot[0];
512   my $childnode = $slot[$childslot]->{node};
513   my $childslotname = join (".",
514                             $slot[$childslot]->{node}->{name},
515                             $slot[$childslot]->{cpu});
516   my $childpid = fork();
517   if ($childpid == 0)
518   {
519     $SIG{'INT'} = 'DEFAULT';
520     $SIG{'QUIT'} = 'DEFAULT';
521     $SIG{'TERM'} = 'DEFAULT';
522
523     foreach (values (%reader))
524     {
525       close($_);
526     }
527     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528     open(STDOUT,">&writer");
529     open(STDERR,">&writer");
530
531     undef $dbh;
532     undef $sth;
533
534     delete $ENV{"GNUPGHOME"};
535     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536     $ENV{"TASK_QSEQUENCE"} = $id;
537     $ENV{"TASK_SEQUENCE"} = $level;
538     $ENV{"JOB_SCRIPT"} = $Job->{script};
539     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540       $param =~ tr/a-z/A-Z/;
541       $ENV{"JOB_PARAMETER_$param"} = $value;
542     }
543     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
546     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
547     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
548
549     $ENV{"GZIP"} = "-n";
550
551     my @srunargs = (
552       "srun",
553       "--nodelist=".$childnode->{name},
554       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
555       "--job-name=$job_id.$id.$$",
556         );
557     my @execargs = qw(sh);
558     my $build_script_to_send = "";
559     my $command =
560         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
561         ."&& cd $ENV{CRUNCH_TMP} ";
562     if ($build_script)
563     {
564       $build_script_to_send = $build_script;
565       $command .=
566           "&& perl -";
567     }
568     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
569     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
570     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
571     $command .=
572         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
573     my @execargs = ('bash', '-c', $command);
574     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
575     exit (111);
576   }
577   close("writer");
578   if (!defined $childpid)
579   {
580     close $reader{$id};
581     delete $reader{$id};
582     next;
583   }
584   shift @freeslot;
585   $proc{$childpid} = { jobstep => $id,
586                        time => time,
587                        slot => $childslot,
588                        jobstepname => "$job_id.$id.$childpid",
589                      };
590   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
591   $slot[$childslot]->{pid} = $childpid;
592
593   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
594   Log ($id, "child $childpid started on $childslotname");
595   $Jobstep->{starttime} = time;
596   $Jobstep->{node} = $childnode->{name};
597   $Jobstep->{slotindex} = $childslot;
598   delete $Jobstep->{stderr};
599   delete $Jobstep->{finishtime};
600
601   splice @jobstep_todo, $todo_ptr, 1;
602   --$todo_ptr;
603
604   $progress_is_dirty = 1;
605
606   while (!@freeslot
607          ||
608          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
609   {
610     last THISROUND if $main::please_freeze;
611     if ($main::please_refresh)
612     {
613       $main::please_refresh = 0;
614       if ($job_has_uuid) {
615         $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
616         for my $attr ('cancelled_at',
617                       'cancelled_by_user_uuid',
618                       'cancelled_by_client_uuid') {
619           $Job->{$attr} = $Job2->{$attr};
620         }
621         if ($Job->{'cancelled_at'}) {
622           Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
623                " by user " . $Job->{cancelled_by_user_uuid});
624           $main::success = 0;
625           $main::please_freeze = 1;
626         }
627       }
628     }
629     if ($main::please_info)
630     {
631       $main::please_info = 0;
632       freeze();
633       collate_output();
634       save_meta(1);
635       update_progress_stats();
636     }
637     my $gotsome
638         = readfrompipes ()
639         + reapchildren ();
640     if (!$gotsome)
641     {
642       check_squeue();
643       update_progress_stats();
644       select (undef, undef, undef, 0.1);
645     }
646     elsif (time - $progress_stats_updated >= 30)
647     {
648       update_progress_stats();
649     }
650     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
651         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
652     {
653       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
654           .($thisround_failed+$thisround_succeeded)
655           .") -- giving up on this round";
656       Log (undef, $message);
657       last THISROUND;
658     }
659
660     # move slots from freeslot to holdslot (or back to freeslot) if necessary
661     for (my $i=$#freeslot; $i>=0; $i--) {
662       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
663         push @holdslot, (splice @freeslot, $i, 1);
664       }
665     }
666     for (my $i=$#holdslot; $i>=0; $i--) {
667       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
668         push @freeslot, (splice @holdslot, $i, 1);
669       }
670     }
671
672     # give up if no nodes are succeeding
673     if (!grep { $_->{node}->{losing_streak} == 0 &&
674                     $_->{node}->{hold_count} < 4 } @slot) {
675       my $message = "Every node has failed -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679   }
680 }
681
682
683 push @freeslot, splice @holdslot;
684 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
685
686
687 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
688 while (%proc)
689 {
690   if ($main::please_continue) {
691     $main::please_continue = 0;
692     goto THISROUND;
693   }
694   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
695   readfrompipes ();
696   if (!reapchildren())
697   {
698     check_squeue();
699     update_progress_stats();
700     select (undef, undef, undef, 0.1);
701     killem (keys %proc) if $main::please_freeze;
702   }
703 }
704
705 update_progress_stats();
706 freeze_if_want_freeze();
707
708
709 if (!defined $main::success)
710 {
711   if (@jobstep_todo &&
712       $thisround_succeeded == 0 &&
713       ($thisround_failed == 0 || $thisround_failed > 4))
714   {
715     my $message = "stop because $thisround_failed tasks failed and none succeeded";
716     Log (undef, $message);
717     $main::success = 0;
718   }
719   if (!@jobstep_todo)
720   {
721     $main::success = 1;
722   }
723 }
724
725 goto ONELEVEL if !defined $main::success;
726
727
728 release_allocation();
729 freeze();
730 if ($job_has_uuid) {
731   $Job->update_attributes('output' => &collate_output(),
732                           'running' => 0,
733                           'success' => $Job->{'output'} && $main::success,
734                           'finished_at' => gmtime)
735 }
736
737 if ($Job->{'output'})
738 {
739   eval {
740     my $manifest_text = capturex("whget", $Job->{'output'});
741     $arv->{'collections'}->{'create'}->execute('collection' => {
742       'uuid' => $Job->{'output'},
743       'manifest_text' => $manifest_text,
744     });
745   };
746   if ($@) {
747     Log (undef, "Failed to register output manifest: $@");
748   }
749 }
750
751 Log (undef, "finish");
752
753 save_meta();
754 exit 0;
755
756
757
758 sub update_progress_stats
759 {
760   $progress_stats_updated = time;
761   return if !$progress_is_dirty;
762   my ($todo, $done, $running) = (scalar @jobstep_todo,
763                                  scalar @jobstep_done,
764                                  scalar @slot - scalar @freeslot - scalar @holdslot);
765   $Job->{'tasks_summary'} ||= {};
766   $Job->{'tasks_summary'}->{'todo'} = $todo;
767   $Job->{'tasks_summary'}->{'done'} = $done;
768   $Job->{'tasks_summary'}->{'running'} = $running;
769   if ($job_has_uuid) {
770     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
771   }
772   Log (undef, "status: $done done, $running running, $todo todo");
773   $progress_is_dirty = 0;
774 }
775
776
777
778 sub reapchildren
779 {
780   my $pid = waitpid (-1, WNOHANG);
781   return 0 if $pid <= 0;
782
783   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
784                   . "."
785                   . $slot[$proc{$pid}->{slot}]->{cpu});
786   my $jobstepid = $proc{$pid}->{jobstep};
787   my $elapsed = time - $proc{$pid}->{time};
788   my $Jobstep = $jobstep[$jobstepid];
789
790   my $childstatus = $?;
791   my $exitvalue = $childstatus >> 8;
792   my $exitinfo = sprintf("exit %d signal %d%s",
793                          $exitvalue,
794                          $childstatus & 127,
795                          ($childstatus & 128 ? ' core dump' : ''));
796   $Jobstep->{'arvados_task'}->reload;
797   my $task_success = $Jobstep->{'arvados_task'}->{success};
798
799   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
800
801   if (!defined $task_success) {
802     # task did not indicate one way or the other --> fail
803     $Jobstep->{'arvados_task'}->{success} = 0;
804     $Jobstep->{'arvados_task'}->save;
805     $task_success = 0;
806   }
807
808   if (!$task_success)
809   {
810     my $temporary_fail;
811     $temporary_fail ||= $Jobstep->{node_fail};
812     $temporary_fail ||= ($exitvalue == 111);
813
814     ++$thisround_failed;
815     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
816
817     # Check for signs of a failed or misconfigured node
818     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
819         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
820       # Don't count this against jobstep failure thresholds if this
821       # node is already suspected faulty and srun exited quickly
822       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
823           $elapsed < 5) {
824         Log ($jobstepid, "blaming failure on suspect node " .
825              $slot[$proc{$pid}->{slot}]->{node}->{name});
826         $temporary_fail ||= 1;
827       }
828       ban_node_by_slot($proc{$pid}->{slot});
829     }
830
831     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
832                              ++$Jobstep->{'failures'},
833                              $temporary_fail ? 'temporary ' : 'permanent',
834                              $elapsed));
835
836     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
837       # Give up on this task, and the whole job
838       $main::success = 0;
839       $main::please_freeze = 1;
840     }
841     else {
842       # Put this task back on the todo queue
843       push @jobstep_todo, $jobstepid;
844     }
845     $Job->{'tasks_summary'}->{'failed'}++;
846   }
847   else
848   {
849     ++$thisround_succeeded;
850     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
851     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
852     push @jobstep_done, $jobstepid;
853     Log ($jobstepid, "success in $elapsed seconds");
854   }
855   $Jobstep->{exitcode} = $childstatus;
856   $Jobstep->{finishtime} = time;
857   process_stderr ($jobstepid, $task_success);
858   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
859
860   close $reader{$jobstepid};
861   delete $reader{$jobstepid};
862   delete $slot[$proc{$pid}->{slot}]->{pid};
863   push @freeslot, $proc{$pid}->{slot};
864   delete $proc{$pid};
865
866   # Load new tasks
867   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
868     'where' => {
869       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
870     },
871     'order' => 'qsequence'
872   );
873   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
874     my $jobstep = {
875       'level' => $arvados_task->{'sequence'},
876       'failures' => 0,
877       'arvados_task' => $arvados_task
878     };
879     push @jobstep, $jobstep;
880     push @jobstep_todo, $#jobstep;
881   }
882
883   $progress_is_dirty = 1;
884   1;
885 }
886
887
888 sub check_squeue
889 {
890   # return if the kill list was checked <4 seconds ago
891   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
892   {
893     return;
894   }
895   $squeue_kill_checked = time;
896
897   # use killem() on procs whose killtime is reached
898   for (keys %proc)
899   {
900     if (exists $proc{$_}->{killtime}
901         && $proc{$_}->{killtime} <= time)
902     {
903       killem ($_);
904     }
905   }
906
907   # return if the squeue was checked <60 seconds ago
908   if (defined $squeue_checked && $squeue_checked > time - 60)
909   {
910     return;
911   }
912   $squeue_checked = time;
913
914   if (!$have_slurm)
915   {
916     # here is an opportunity to check for mysterious problems with local procs
917     return;
918   }
919
920   # get a list of steps still running
921   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
922   chop @squeue;
923   if ($squeue[-1] ne "ok")
924   {
925     return;
926   }
927   pop @squeue;
928
929   # which of my jobsteps are running, according to squeue?
930   my %ok;
931   foreach (@squeue)
932   {
933     if (/^(\d+)\.(\d+) (\S+)/)
934     {
935       if ($1 eq $ENV{SLURM_JOBID})
936       {
937         $ok{$3} = 1;
938       }
939     }
940   }
941
942   # which of my active child procs (>60s old) were not mentioned by squeue?
943   foreach (keys %proc)
944   {
945     if ($proc{$_}->{time} < time - 60
946         && !exists $ok{$proc{$_}->{jobstepname}}
947         && !exists $proc{$_}->{killtime})
948     {
949       # kill this proc if it hasn't exited in 30 seconds
950       $proc{$_}->{killtime} = time + 30;
951     }
952   }
953 }
954
955
956 sub release_allocation
957 {
958   if ($have_slurm)
959   {
960     Log (undef, "release job allocation");
961     system "scancel $ENV{SLURM_JOBID}";
962   }
963 }
964
965
966 sub readfrompipes
967 {
968   my $gotsome = 0;
969   foreach my $job (keys %reader)
970   {
971     my $buf;
972     while (0 < sysread ($reader{$job}, $buf, 8192))
973     {
974       print STDERR $buf if $ENV{CRUNCH_DEBUG};
975       $jobstep[$job]->{stderr} .= $buf;
976       preprocess_stderr ($job);
977       if (length ($jobstep[$job]->{stderr}) > 16384)
978       {
979         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
980       }
981       $gotsome = 1;
982     }
983   }
984   return $gotsome;
985 }
986
987
988 sub preprocess_stderr
989 {
990   my $job = shift;
991
992   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
993     my $line = $1;
994     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
995     Log ($job, "stderr $line");
996     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
997       # whoa.
998       $main::please_freeze = 1;
999     }
1000     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1001       $jobstep[$job]->{node_fail} = 1;
1002       ban_node_by_slot($jobstep[$job]->{slotindex});
1003     }
1004   }
1005 }
1006
1007
1008 sub process_stderr
1009 {
1010   my $job = shift;
1011   my $task_success = shift;
1012   preprocess_stderr ($job);
1013
1014   map {
1015     Log ($job, "stderr $_");
1016   } split ("\n", $jobstep[$job]->{stderr});
1017 }
1018
1019
1020 sub collate_output
1021 {
1022   my $whc = Warehouse->new;
1023   Log (undef, "collate");
1024   $whc->write_start (1);
1025   my $joboutput;
1026   for (@jobstep)
1027   {
1028     next if (!exists $_->{'arvados_task'}->{output} ||
1029              !$_->{'arvados_task'}->{'success'} ||
1030              $_->{'exitcode'} != 0);
1031     my $output = $_->{'arvados_task'}->{output};
1032     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1033     {
1034       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1035       $whc->write_data ($output);
1036     }
1037     elsif (@jobstep == 1)
1038     {
1039       $joboutput = $output;
1040       $whc->write_finish;
1041     }
1042     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1043     {
1044       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1045       $whc->write_data ($outblock);
1046     }
1047     else
1048     {
1049       my $errstr = $whc->errstr;
1050       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1051       $main::success = 0;
1052     }
1053   }
1054   $joboutput = $whc->write_finish if !defined $joboutput;
1055   if ($joboutput)
1056   {
1057     Log (undef, "output $joboutput");
1058     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1059   }
1060   else
1061   {
1062     Log (undef, "output undef");
1063   }
1064   return $joboutput;
1065 }
1066
1067
1068 sub killem
1069 {
1070   foreach (@_)
1071   {
1072     my $sig = 2;                # SIGINT first
1073     if (exists $proc{$_}->{"sent_$sig"} &&
1074         time - $proc{$_}->{"sent_$sig"} > 4)
1075     {
1076       $sig = 15;                # SIGTERM if SIGINT doesn't work
1077     }
1078     if (exists $proc{$_}->{"sent_$sig"} &&
1079         time - $proc{$_}->{"sent_$sig"} > 4)
1080     {
1081       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1082     }
1083     if (!exists $proc{$_}->{"sent_$sig"})
1084     {
1085       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1086       kill $sig, $_;
1087       select (undef, undef, undef, 0.1);
1088       if ($sig == 2)
1089       {
1090         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1091       }
1092       $proc{$_}->{"sent_$sig"} = time;
1093       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1094     }
1095   }
1096 }
1097
1098
1099 sub fhbits
1100 {
1101   my($bits);
1102   for (@_) {
1103     vec($bits,fileno($_),1) = 1;
1104   }
1105   $bits;
1106 }
1107
1108
1109 sub Log                         # ($jobstep_id, $logmessage)
1110 {
1111   if ($_[1] =~ /\n/) {
1112     for my $line (split (/\n/, $_[1])) {
1113       Log ($_[0], $line);
1114     }
1115     return;
1116   }
1117   my $fh = select STDERR; $|=1; select $fh;
1118   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1119   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1120   $message .= "\n";
1121   my $datetime;
1122   if ($metastream || -t STDERR) {
1123     my @gmtime = gmtime;
1124     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1125                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1126   }
1127   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1128
1129   return if !$metastream;
1130   $metastream->write_data ($datetime . " " . $message);
1131 }
1132
1133
1134 sub croak
1135 {
1136   my ($package, $file, $line) = caller;
1137   my $message = "@_ at $file line $line\n";
1138   Log (undef, $message);
1139   freeze() if @jobstep_todo;
1140   collate_output() if @jobstep_todo;
1141   cleanup();
1142   save_meta() if $metastream;
1143   die;
1144 }
1145
1146
1147 sub cleanup
1148 {
1149   return if !$job_has_uuid;
1150   $Job->update_attributes('running' => 0,
1151                           'success' => 0,
1152                           'finished_at' => gmtime);
1153 }
1154
1155
1156 sub save_meta
1157 {
1158   my $justcheckpoint = shift; # false if this will be the last meta saved
1159   my $m = $metastream;
1160   $m = $m->copy if $justcheckpoint;
1161   $m->write_finish;
1162   my $loglocator = $m->as_key;
1163   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1164   Log (undef, "meta key is $loglocator");
1165   $Job->{'log'} = $loglocator;
1166   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1167 }
1168
1169
1170 sub freeze_if_want_freeze
1171 {
1172   if ($main::please_freeze)
1173   {
1174     release_allocation();
1175     if (@_)
1176     {
1177       # kill some srun procs before freeze+stop
1178       map { $proc{$_} = {} } @_;
1179       while (%proc)
1180       {
1181         killem (keys %proc);
1182         select (undef, undef, undef, 0.1);
1183         my $died;
1184         while (($died = waitpid (-1, WNOHANG)) > 0)
1185         {
1186           delete $proc{$died};
1187         }
1188       }
1189     }
1190     freeze();
1191     collate_output();
1192     cleanup();
1193     save_meta();
1194     exit 0;
1195   }
1196 }
1197
1198
1199 sub freeze
1200 {
1201   Log (undef, "Freeze not implemented");
1202   return;
1203 }
1204
1205
1206 sub thaw
1207 {
1208   croak ("Thaw not implemented");
1209
1210   my $whc;
1211   my $key = shift;
1212   Log (undef, "thaw from $key");
1213
1214   @jobstep = ();
1215   @jobstep_done = ();
1216   @jobstep_todo = ();
1217   @jobstep_tomerge = ();
1218   $jobstep_tomerge_level = 0;
1219   my $frozenjob = {};
1220
1221   my $stream = new Warehouse::Stream ( whc => $whc,
1222                                        hash => [split (",", $key)] );
1223   $stream->rewind;
1224   while (my $dataref = $stream->read_until (undef, "\n\n"))
1225   {
1226     if ($$dataref =~ /^job /)
1227     {
1228       foreach (split ("\n", $$dataref))
1229       {
1230         my ($k, $v) = split ("=", $_, 2);
1231         $frozenjob->{$k} = freezeunquote ($v);
1232       }
1233       next;
1234     }
1235
1236     if ($$dataref =~ /^merge (\d+) (.*)/)
1237     {
1238       $jobstep_tomerge_level = $1;
1239       @jobstep_tomerge
1240           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1241       next;
1242     }
1243
1244     my $Jobstep = { };
1245     foreach (split ("\n", $$dataref))
1246     {
1247       my ($k, $v) = split ("=", $_, 2);
1248       $Jobstep->{$k} = freezeunquote ($v) if $k;
1249     }
1250     $Jobstep->{'failures'} = 0;
1251     push @jobstep, $Jobstep;
1252
1253     if ($Jobstep->{exitcode} eq "0")
1254     {
1255       push @jobstep_done, $#jobstep;
1256     }
1257     else
1258     {
1259       push @jobstep_todo, $#jobstep;
1260     }
1261   }
1262
1263   foreach (qw (script script_version script_parameters))
1264   {
1265     $Job->{$_} = $frozenjob->{$_};
1266   }
1267   $Job->save if $job_has_uuid;
1268 }
1269
1270
1271 sub freezequote
1272 {
1273   my $s = shift;
1274   $s =~ s/\\/\\\\/g;
1275   $s =~ s/\n/\\n/g;
1276   return $s;
1277 }
1278
1279
1280 sub freezeunquote
1281 {
1282   my $s = shift;
1283   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1284   return $s;
1285 }
1286
1287
1288 sub srun
1289 {
1290   my $srunargs = shift;
1291   my $execargs = shift;
1292   my $opts = shift || {};
1293   my $stdin = shift;
1294   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1295   print STDERR (join (" ",
1296                       map { / / ? "'$_'" : $_ }
1297                       (@$args)),
1298                 "\n")
1299       if $ENV{CRUNCH_DEBUG};
1300
1301   if (defined $stdin) {
1302     my $child = open STDIN, "-|";
1303     defined $child or die "no fork: $!";
1304     if ($child == 0) {
1305       print $stdin or die $!;
1306       close STDOUT or die $!;
1307       exit 0;
1308     }
1309   }
1310
1311   return system (@$args) if $opts->{fork};
1312
1313   exec @$args;
1314   warn "ENV size is ".length(join(" ",%ENV));
1315   die "exec failed: $!: @$args";
1316 }
1317
1318
1319 sub ban_node_by_slot {
1320   # Don't start any new jobsteps on this node for 60 seconds
1321   my $slotid = shift;
1322   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1323   $slot[$slotid]->{node}->{hold_count}++;
1324   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1325 }
1326
1327 __DATA__
1328 #!/usr/bin/perl
1329
1330 # checkout-and-build
1331
1332 use Fcntl ':flock';
1333
1334 my $destdir = $ENV{"CRUNCH_SRC"};
1335 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1336 my $repo = $ENV{"CRUNCH_SRC_URL"};
1337
1338 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1339 flock L, LOCK_EX;
1340 if (readlink ("$destdir.commit") eq $commit) {
1341     exit 0;
1342 }
1343
1344 unlink "$destdir.commit";
1345 open STDOUT, ">", "$destdir.log";
1346 open STDERR, ">&STDOUT";
1347
1348 mkdir $destdir;
1349 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1350 print TARX <DATA>;
1351 if(!close(TARX)) {
1352   die "'tar -C $destdir -xf -' exited $?: $!";
1353 }
1354
1355 my $pwd;
1356 chomp ($pwd = `pwd`);
1357 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1358 mkdir $install_dir;
1359 if (-e "$destdir/crunch_scripts/install") {
1360     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1361 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1362     # Old version
1363     shell_or_die ("./tests/autotests.sh", $install_dir);
1364 } elsif (-e "./install.sh") {
1365     shell_or_die ("./install.sh", $install_dir);
1366 }
1367
1368 if ($commit) {
1369     unlink "$destdir.commit.new";
1370     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1371     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1372 }
1373
1374 close L;
1375
1376 exit 0;
1377
1378 sub shell_or_die
1379 {
1380   if ($ENV{"DEBUG"}) {
1381     print STDERR "@_\n";
1382   }
1383   system (@_) == 0
1384       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1385 }
1386
1387 __DATA__