c2738e224f6222613415bd7350c6d7dc33c50c97
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 unless (defined $ENV{"CRUNCH_TMP"}) {
79   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
80   if ($ENV{"USER"} ne "crunch" && $< != 0) {
81     # use a tmp dir unique for my uid
82     $ENV{"CRUNCH_TMP"} .= "-$<";
83   }
84 }
85 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
86 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
87 mkdir ($ENV{"JOB_WORK"});
88
89 my $force_unlock;
90 my $git_dir;
91 my $jobspec;
92 my $job_api_token;
93 my $resume_stash;
94 GetOptions('force-unlock' => \$force_unlock,
95            'git-dir=s' => \$git_dir,
96            'job=s' => \$jobspec,
97            'job-api-token=s' => \$job_api_token,
98            'resume-stash=s' => \$resume_stash,
99     );
100
101 if (defined $job_api_token) {
102   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
103 }
104
105 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
106 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
107 my $local_job = !$job_has_uuid;
108
109
110 $SIG{'HUP'} = sub
111 {
112   1;
113 };
114 $SIG{'USR1'} = sub
115 {
116   $main::ENV{CRUNCH_DEBUG} = 1;
117 };
118 $SIG{'USR2'} = sub
119 {
120   $main::ENV{CRUNCH_DEBUG} = 0;
121 };
122
123
124
125 my $arv = Arvados->new;
126 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
127 $metastream->clear;
128 $metastream->write_start('log.txt');
129
130 my $User = $arv->{'users'}->{'current'}->execute;
131
132 my $Job = {};
133 my $job_id;
134 my $dbh;
135 my $sth;
136 if ($job_has_uuid)
137 {
138   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
139   if (!$force_unlock) {
140     if ($Job->{'is_locked_by_uuid'}) {
141       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
142     }
143     if ($Job->{'success'} ne undef) {
144       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
145     }
146     if ($Job->{'running'}) {
147       croak("Job 'running' flag is already set");
148     }
149     if ($Job->{'started_at'}) {
150       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
151     }
152   }
153 }
154 else
155 {
156   $Job = JSON::decode_json($jobspec);
157
158   if (!$resume_stash)
159   {
160     map { croak ("No $_ specified") unless $Job->{$_} }
161     qw(script script_version script_parameters);
162   }
163
164   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
165   $Job->{'started_at'} = gmtime;
166
167   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
168
169   $job_has_uuid = 1;
170 }
171 $job_id = $Job->{'uuid'};
172
173
174
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
178
179
180 Log (undef, "check slurm allocation");
181 my @slot;
182 my @node;
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my @sinfo;
185 if (!$have_slurm)
186 {
187   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188   push @sinfo, "$localcpus localhost";
189 }
190 if (exists $ENV{SLURM_NODELIST})
191 {
192   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 }
194 foreach (@sinfo)
195 {
196   my ($ncpus, $slurm_nodelist) = split;
197   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198
199   my @nodelist;
200   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201   {
202     my $nodelist = $1;
203     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204     {
205       my $ranges = $1;
206       foreach (split (",", $ranges))
207       {
208         my ($a, $b);
209         if (/(\d+)-(\d+)/)
210         {
211           $a = $1;
212           $b = $2;
213         }
214         else
215         {
216           $a = $_;
217           $b = $_;
218         }
219         push @nodelist, map {
220           my $n = $nodelist;
221           $n =~ s/\[[-,\d]+\]/$_/;
222           $n;
223         } ($a..$b);
224       }
225     }
226     else
227     {
228       push @nodelist, $nodelist;
229     }
230   }
231   foreach my $nodename (@nodelist)
232   {
233     Log (undef, "node $nodename - $ncpus slots");
234     my $node = { name => $nodename,
235                  ncpus => $ncpus,
236                  losing_streak => 0,
237                  hold_until => 0 };
238     foreach my $cpu (1..$ncpus)
239     {
240       push @slot, { node => $node,
241                     cpu => $cpu };
242     }
243   }
244   push @node, @nodelist;
245 }
246
247
248
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
251
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
253
254
255
256 my $jobmanager_id;
257 if ($job_has_uuid)
258 {
259   # Claim this job, and make sure nobody else does
260
261   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
262   $Job->{'started_at'} = gmtime;
263   $Job->{'running'} = 1;
264   $Job->{'success'} = undef;
265   $Job->{'tasks_summary'} = { 'failed' => 0,
266                               'todo' => 1,
267                               'running' => 0,
268                               'done' => 0 };
269   if ($job_has_uuid) {
270     unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271       croak("Error while updating / locking job");
272     }
273   }
274 }
275
276
277 Log (undef, "start");
278 $SIG{'INT'} = sub { $main::please_freeze = 1; };
279 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
280 $SIG{'TERM'} = \&croak;
281 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
282 $SIG{'ALRM'} = sub { $main::please_info = 1; };
283 $SIG{'CONT'} = sub { $main::please_continue = 1; };
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302
303
304
305 if (defined $Job->{thawedfromkey})
306 {
307   thaw ($Job->{thawedfromkey});
308 }
309 else
310 {
311   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312     'job_uuid' => $Job->{'uuid'},
313     'sequence' => 0,
314     'qsequence' => 0,
315     'parameters' => {},
316                                                           });
317   push @jobstep, { 'level' => 0,
318                    'failures' => 0,
319                    'arvados_task' => $first_task,
320                  };
321   push @jobstep_todo, 0;
322 }
323
324
325 my $build_script;
326
327
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
329
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 if ($skip_install)
332 {
333   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
334 }
335 else
336 {
337   do {
338     local $/ = undef;
339     $build_script = <DATA>;
340   };
341   Log (undef, "Install revision ".$Job->{script_version});
342   my $nodelist = join(",", @node);
343
344   # Clean out crunch_tmp/work and crunch_tmp/opt
345
346   my $cleanpid = fork();
347   if ($cleanpid == 0)
348   {
349     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
351     exit (1);
352   }
353   while (1)
354   {
355     last if $cleanpid == waitpid (-1, WNOHANG);
356     freeze_if_want_freeze ($cleanpid);
357     select (undef, undef, undef, 0.1);
358   }
359   Log (undef, "Clean-work-dir exited $?");
360
361   # Install requested code version
362
363   my @execargs;
364   my @srunargs = ("srun",
365                   "--nodelist=$nodelist",
366                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
367
368   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
371
372   my $commit;
373   my $git_archive;
374   my $treeish = $Job->{'script_version'};
375   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376   # Todo: let script_version specify repository instead of expecting
377   # parent process to figure it out.
378   $ENV{"CRUNCH_SRC_URL"} = $repo;
379
380   # Create/update our clone of the remote git repo
381
382   if (!-d $ENV{"CRUNCH_SRC"}) {
383     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384         or croak ("git clone $repo failed: exit ".($?>>8));
385     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
386   }
387   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
388
389   # If this looks like a subversion r#, look for it in git-svn commit messages
390
391   if ($treeish =~ m{^\d{1,4}$}) {
392     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
393     chomp $gitlog;
394     if ($gitlog =~ /^[a-f0-9]{40}$/) {
395       $commit = $gitlog;
396       Log (undef, "Using commit $commit for script_version $treeish");
397     }
398   }
399
400   # If that didn't work, try asking git to look it up as a tree-ish.
401
402   if (!defined $commit) {
403
404     my $cooked_treeish = $treeish;
405     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406       # Looks like a git branch name -- make sure git knows it's
407       # relative to the remote repo
408       $cooked_treeish = "origin/$treeish";
409     }
410
411     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
412     chomp $found;
413     if ($found =~ /^[0-9a-f]{40}$/s) {
414       $commit = $found;
415       if ($commit ne $treeish) {
416         # Make sure we record the real commit id in the database,
417         # frozentokey, logs, etc. -- instead of an abbreviation or a
418         # branch name which can become ambiguous or point to a
419         # different commit in the future.
420         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421         Log (undef, "Using commit $commit for tree-ish $treeish");
422         if ($commit ne $treeish) {
423           $Job->{'script_version'} = $commit;
424           !$job_has_uuid or $Job->save() or croak("Error while updating job");
425         }
426       }
427     }
428   }
429
430   if (defined $commit) {
431     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432     @execargs = ("sh", "-c",
433                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
435   }
436   else {
437     croak ("could not figure out commit id for $treeish");
438   }
439
440   my $installpid = fork();
441   if ($installpid == 0)
442   {
443     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
444     exit (1);
445   }
446   while (1)
447   {
448     last if $installpid == waitpid (-1, WNOHANG);
449     freeze_if_want_freeze ($installpid);
450     select (undef, undef, undef, 0.1);
451   }
452   Log (undef, "Install exited $?");
453 }
454
455
456
457 foreach (qw (script script_version script_parameters runtime_constraints))
458 {
459   Log (undef,
460        "$_ " .
461        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
462 }
463 foreach (split (/\n/, $Job->{knobs}))
464 {
465   Log (undef, "knob " . $_);
466 }
467
468
469
470 $main::success = undef;
471
472
473
474 ONELEVEL:
475
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
479
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481                        or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
484
485
486
487 my %proc;
488 my @freeslot = (0..$#slot);
489 my @holdslot;
490 my %reader;
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
493
494 update_progress_stats();
495
496
497
498 THISROUND:
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
500 {
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507
508   pipe $reader{$id}, "writer" or croak ($!);
509   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
510   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
511
512   my $childslot = $freeslot[0];
513   my $childnode = $slot[$childslot]->{node};
514   my $childslotname = join (".",
515                             $slot[$childslot]->{node}->{name},
516                             $slot[$childslot]->{cpu});
517   my $childpid = fork();
518   if ($childpid == 0)
519   {
520     $SIG{'INT'} = 'DEFAULT';
521     $SIG{'QUIT'} = 'DEFAULT';
522     $SIG{'TERM'} = 'DEFAULT';
523
524     foreach (values (%reader))
525     {
526       close($_);
527     }
528     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
529     open(STDOUT,">&writer");
530     open(STDERR,">&writer");
531
532     undef $dbh;
533     undef $sth;
534
535     delete $ENV{"GNUPGHOME"};
536     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
537     $ENV{"TASK_QSEQUENCE"} = $id;
538     $ENV{"TASK_SEQUENCE"} = $level;
539     $ENV{"JOB_SCRIPT"} = $Job->{script};
540     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
541       $param =~ tr/a-z/A-Z/;
542       $ENV{"JOB_PARAMETER_$param"} = $value;
543     }
544     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
545     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
546     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
547     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
548     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
549
550     $ENV{"GZIP"} = "-n";
551
552     my @srunargs = (
553       "srun",
554       "--nodelist=".$childnode->{name},
555       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556       "--job-name=$job_id.$id.$$",
557         );
558     my @execargs = qw(sh);
559     my $build_script_to_send = "";
560     my $command =
561         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
562         ."&& cd $ENV{CRUNCH_TMP} ";
563     if ($build_script)
564     {
565       $build_script_to_send = $build_script;
566       $command .=
567           "&& perl -";
568     }
569     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
570     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
571     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
572     $command .=
573         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
574     my @execargs = ('bash', '-c', $command);
575     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
576     exit (111);
577   }
578   close("writer");
579   if (!defined $childpid)
580   {
581     close $reader{$id};
582     delete $reader{$id};
583     next;
584   }
585   shift @freeslot;
586   $proc{$childpid} = { jobstep => $id,
587                        time => time,
588                        slot => $childslot,
589                        jobstepname => "$job_id.$id.$childpid",
590                      };
591   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
592   $slot[$childslot]->{pid} = $childpid;
593
594   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
595   Log ($id, "child $childpid started on $childslotname");
596   $Jobstep->{starttime} = time;
597   $Jobstep->{node} = $childnode->{name};
598   $Jobstep->{slotindex} = $childslot;
599   delete $Jobstep->{stderr};
600   delete $Jobstep->{finishtime};
601
602   splice @jobstep_todo, $todo_ptr, 1;
603   --$todo_ptr;
604
605   $progress_is_dirty = 1;
606
607   while (!@freeslot
608          ||
609          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
610   {
611     last THISROUND if $main::please_freeze;
612     if ($main::please_info)
613     {
614       $main::please_info = 0;
615       freeze();
616       collate_output();
617       save_meta(1);
618       update_progress_stats();
619     }
620     my $gotsome
621         = readfrompipes ()
622         + reapchildren ();
623     if (!$gotsome)
624     {
625       check_squeue();
626       update_progress_stats();
627       select (undef, undef, undef, 0.1);
628     }
629     elsif (time - $progress_stats_updated >= 30)
630     {
631       update_progress_stats();
632     }
633     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
634         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
635     {
636       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
637           .($thisround_failed+$thisround_succeeded)
638           .") -- giving up on this round";
639       Log (undef, $message);
640       last THISROUND;
641     }
642
643     # move slots from freeslot to holdslot (or back to freeslot) if necessary
644     for (my $i=$#freeslot; $i>=0; $i--) {
645       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
646         push @holdslot, (splice @freeslot, $i, 1);
647       }
648     }
649     for (my $i=$#holdslot; $i>=0; $i--) {
650       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
651         push @freeslot, (splice @holdslot, $i, 1);
652       }
653     }
654
655     # give up if no nodes are succeeding
656     if (!grep { $_->{node}->{losing_streak} == 0 &&
657                     $_->{node}->{hold_count} < 4 } @slot) {
658       my $message = "Every node has failed -- giving up on this round";
659       Log (undef, $message);
660       last THISROUND;
661     }
662   }
663 }
664
665
666 push @freeslot, splice @holdslot;
667 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
668
669
670 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
671 while (%proc)
672 {
673   if ($main::please_continue) {
674     $main::please_continue = 0;
675     goto THISROUND;
676   }
677   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
678   readfrompipes ();
679   if (!reapchildren())
680   {
681     check_squeue();
682     update_progress_stats();
683     select (undef, undef, undef, 0.1);
684     killem (keys %proc) if $main::please_freeze;
685   }
686 }
687
688 update_progress_stats();
689 freeze_if_want_freeze();
690
691
692 if (!defined $main::success)
693 {
694   if (@jobstep_todo &&
695       $thisround_succeeded == 0 &&
696       ($thisround_failed == 0 || $thisround_failed > 4))
697   {
698     my $message = "stop because $thisround_failed tasks failed and none succeeded";
699     Log (undef, $message);
700     $main::success = 0;
701   }
702   if (!@jobstep_todo)
703   {
704     $main::success = 1;
705   }
706 }
707
708 goto ONELEVEL if !defined $main::success;
709
710
711 release_allocation();
712 freeze();
713 $Job->reload;
714 $Job->{'output'} = &collate_output();
715 $Job->{'running'} = 0;
716 $Job->{'success'} = $Job->{'output'} && $main::success;
717 $Job->{'finished_at'} = gmtime;
718 $Job->save if $job_has_uuid;
719
720 if ($Job->{'output'})
721 {
722   eval {
723     my $manifest_text = capturex("whget", $Job->{'output'});
724     $arv->{'collections'}->{'create'}->execute('collection' => {
725       'uuid' => $Job->{'output'},
726       'manifest_text' => $manifest_text,
727     });
728   };
729   if ($@) {
730     Log (undef, "Failed to register output manifest: $@");
731   }
732 }
733
734 Log (undef, "finish");
735
736 save_meta();
737 exit 0;
738
739
740
741 sub update_progress_stats
742 {
743   $progress_stats_updated = time;
744   return if !$progress_is_dirty;
745   my ($todo, $done, $running) = (scalar @jobstep_todo,
746                                  scalar @jobstep_done,
747                                  scalar @slot - scalar @freeslot - scalar @holdslot);
748   $Job->{'tasks_summary'} ||= {};
749   $Job->{'tasks_summary'}->{'todo'} = $todo;
750   $Job->{'tasks_summary'}->{'done'} = $done;
751   $Job->{'tasks_summary'}->{'running'} = $running;
752   $Job->save if $job_has_uuid;
753   Log (undef, "status: $done done, $running running, $todo todo");
754   $progress_is_dirty = 0;
755 }
756
757
758
759 sub reapchildren
760 {
761   my $pid = waitpid (-1, WNOHANG);
762   return 0 if $pid <= 0;
763
764   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
765                   . "."
766                   . $slot[$proc{$pid}->{slot}]->{cpu});
767   my $jobstepid = $proc{$pid}->{jobstep};
768   my $elapsed = time - $proc{$pid}->{time};
769   my $Jobstep = $jobstep[$jobstepid];
770
771   my $childstatus = $?;
772   my $exitvalue = $childstatus >> 8;
773   my $exitinfo = sprintf("exit %d signal %d%s",
774                          $exitvalue,
775                          $childstatus & 127,
776                          ($childstatus & 128 ? ' core dump' : ''));
777   $Jobstep->{'arvados_task'}->reload;
778   my $task_success = $Jobstep->{'arvados_task'}->{success};
779
780   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
781
782   if (!defined $task_success) {
783     # task did not indicate one way or the other --> fail
784     $Jobstep->{'arvados_task'}->{success} = 0;
785     $Jobstep->{'arvados_task'}->save;
786     $task_success = 0;
787   }
788
789   if (!$task_success)
790   {
791     my $temporary_fail;
792     $temporary_fail ||= $Jobstep->{node_fail};
793     $temporary_fail ||= ($exitvalue == 111);
794
795     ++$thisround_failed;
796     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
797
798     # Check for signs of a failed or misconfigured node
799     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
800         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
801       # Don't count this against jobstep failure thresholds if this
802       # node is already suspected faulty and srun exited quickly
803       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
804           $elapsed < 5) {
805         Log ($jobstepid, "blaming failure on suspect node " .
806              $slot[$proc{$pid}->{slot}]->{node}->{name});
807         $temporary_fail ||= 1;
808       }
809       ban_node_by_slot($proc{$pid}->{slot});
810     }
811
812     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
813                              ++$Jobstep->{'failures'},
814                              $temporary_fail ? 'temporary ' : 'permanent',
815                              $elapsed));
816
817     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
818       # Give up on this task, and the whole job
819       $main::success = 0;
820       $main::please_freeze = 1;
821     }
822     else {
823       # Put this task back on the todo queue
824       push @jobstep_todo, $jobstepid;
825     }
826     $Job->{'tasks_summary'}->{'failed'}++;
827   }
828   else
829   {
830     ++$thisround_succeeded;
831     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
832     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
833     push @jobstep_done, $jobstepid;
834     Log ($jobstepid, "success in $elapsed seconds");
835   }
836   $Jobstep->{exitcode} = $childstatus;
837   $Jobstep->{finishtime} = time;
838   process_stderr ($jobstepid, $task_success);
839   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
840
841   close $reader{$jobstepid};
842   delete $reader{$jobstepid};
843   delete $slot[$proc{$pid}->{slot}]->{pid};
844   push @freeslot, $proc{$pid}->{slot};
845   delete $proc{$pid};
846
847   # Load new tasks
848   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
849     'where' => {
850       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
851     },
852     'order' => 'qsequence'
853   );
854   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
855     my $jobstep = {
856       'level' => $arvados_task->{'sequence'},
857       'failures' => 0,
858       'arvados_task' => $arvados_task
859     };
860     push @jobstep, $jobstep;
861     push @jobstep_todo, $#jobstep;
862   }
863
864   $progress_is_dirty = 1;
865   1;
866 }
867
868
869 sub check_squeue
870 {
871   # return if the kill list was checked <4 seconds ago
872   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
873   {
874     return;
875   }
876   $squeue_kill_checked = time;
877
878   # use killem() on procs whose killtime is reached
879   for (keys %proc)
880   {
881     if (exists $proc{$_}->{killtime}
882         && $proc{$_}->{killtime} <= time)
883     {
884       killem ($_);
885     }
886   }
887
888   # return if the squeue was checked <60 seconds ago
889   if (defined $squeue_checked && $squeue_checked > time - 60)
890   {
891     return;
892   }
893   $squeue_checked = time;
894
895   if (!$have_slurm)
896   {
897     # here is an opportunity to check for mysterious problems with local procs
898     return;
899   }
900
901   # get a list of steps still running
902   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
903   chop @squeue;
904   if ($squeue[-1] ne "ok")
905   {
906     return;
907   }
908   pop @squeue;
909
910   # which of my jobsteps are running, according to squeue?
911   my %ok;
912   foreach (@squeue)
913   {
914     if (/^(\d+)\.(\d+) (\S+)/)
915     {
916       if ($1 eq $ENV{SLURM_JOBID})
917       {
918         $ok{$3} = 1;
919       }
920     }
921   }
922
923   # which of my active child procs (>60s old) were not mentioned by squeue?
924   foreach (keys %proc)
925   {
926     if ($proc{$_}->{time} < time - 60
927         && !exists $ok{$proc{$_}->{jobstepname}}
928         && !exists $proc{$_}->{killtime})
929     {
930       # kill this proc if it hasn't exited in 30 seconds
931       $proc{$_}->{killtime} = time + 30;
932     }
933   }
934 }
935
936
937 sub release_allocation
938 {
939   if ($have_slurm)
940   {
941     Log (undef, "release job allocation");
942     system "scancel $ENV{SLURM_JOBID}";
943   }
944 }
945
946
947 sub readfrompipes
948 {
949   my $gotsome = 0;
950   foreach my $job (keys %reader)
951   {
952     my $buf;
953     while (0 < sysread ($reader{$job}, $buf, 8192))
954     {
955       print STDERR $buf if $ENV{CRUNCH_DEBUG};
956       $jobstep[$job]->{stderr} .= $buf;
957       preprocess_stderr ($job);
958       if (length ($jobstep[$job]->{stderr}) > 16384)
959       {
960         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
961       }
962       $gotsome = 1;
963     }
964   }
965   return $gotsome;
966 }
967
968
969 sub preprocess_stderr
970 {
971   my $job = shift;
972
973   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
974     my $line = $1;
975     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
976     Log ($job, "stderr $line");
977     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
978       # whoa.
979       $main::please_freeze = 1;
980     }
981     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
982       $jobstep[$job]->{node_fail} = 1;
983       ban_node_by_slot($jobstep[$job]->{slotindex});
984     }
985   }
986 }
987
988
989 sub process_stderr
990 {
991   my $job = shift;
992   my $task_success = shift;
993   preprocess_stderr ($job);
994
995   map {
996     Log ($job, "stderr $_");
997   } split ("\n", $jobstep[$job]->{stderr});
998 }
999
1000
1001 sub collate_output
1002 {
1003   my $whc = Warehouse->new;
1004   Log (undef, "collate");
1005   $whc->write_start (1);
1006   my $joboutput;
1007   for (@jobstep)
1008   {
1009     next if (!exists $_->{'arvados_task'}->{output} ||
1010              !$_->{'arvados_task'}->{'success'} ||
1011              $_->{'exitcode'} != 0);
1012     my $output = $_->{'arvados_task'}->{output};
1013     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1014     {
1015       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1016       $whc->write_data ($output);
1017     }
1018     elsif (@jobstep == 1)
1019     {
1020       $joboutput = $output;
1021       $whc->write_finish;
1022     }
1023     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1024     {
1025       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1026       $whc->write_data ($outblock);
1027     }
1028     else
1029     {
1030       my $errstr = $whc->errstr;
1031       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1032       $main::success = 0;
1033     }
1034   }
1035   $joboutput = $whc->write_finish if !defined $joboutput;
1036   if ($joboutput)
1037   {
1038     Log (undef, "output $joboutput");
1039     $Job->{'output'} = $joboutput;
1040     $Job->save if $job_has_uuid;
1041   }
1042   else
1043   {
1044     Log (undef, "output undef");
1045   }
1046   return $joboutput;
1047 }
1048
1049
1050 sub killem
1051 {
1052   foreach (@_)
1053   {
1054     my $sig = 2;                # SIGINT first
1055     if (exists $proc{$_}->{"sent_$sig"} &&
1056         time - $proc{$_}->{"sent_$sig"} > 4)
1057     {
1058       $sig = 15;                # SIGTERM if SIGINT doesn't work
1059     }
1060     if (exists $proc{$_}->{"sent_$sig"} &&
1061         time - $proc{$_}->{"sent_$sig"} > 4)
1062     {
1063       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1064     }
1065     if (!exists $proc{$_}->{"sent_$sig"})
1066     {
1067       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1068       kill $sig, $_;
1069       select (undef, undef, undef, 0.1);
1070       if ($sig == 2)
1071       {
1072         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1073       }
1074       $proc{$_}->{"sent_$sig"} = time;
1075       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1076     }
1077   }
1078 }
1079
1080
1081 sub fhbits
1082 {
1083   my($bits);
1084   for (@_) {
1085     vec($bits,fileno($_),1) = 1;
1086   }
1087   $bits;
1088 }
1089
1090
1091 sub Log                         # ($jobstep_id, $logmessage)
1092 {
1093   if ($_[1] =~ /\n/) {
1094     for my $line (split (/\n/, $_[1])) {
1095       Log ($_[0], $line);
1096     }
1097     return;
1098   }
1099   my $fh = select STDERR; $|=1; select $fh;
1100   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1101   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1102   $message .= "\n";
1103   my $datetime;
1104   if ($metastream || -t STDERR) {
1105     my @gmtime = gmtime;
1106     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1107                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1108   }
1109   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1110
1111   return if !$metastream;
1112   $metastream->write_data ($datetime . " " . $message);
1113 }
1114
1115
1116 sub croak
1117 {
1118   my ($package, $file, $line) = caller;
1119   my $message = "@_ at $file line $line\n";
1120   Log (undef, $message);
1121   freeze() if @jobstep_todo;
1122   collate_output() if @jobstep_todo;
1123   cleanup();
1124   save_meta() if $metastream;
1125   die;
1126 }
1127
1128
1129 sub cleanup
1130 {
1131   return if !$job_has_uuid;
1132   $Job->reload;
1133   $Job->{'running'} = 0;
1134   $Job->{'success'} = 0;
1135   $Job->{'finished_at'} = gmtime;
1136   $Job->save;
1137 }
1138
1139
1140 sub save_meta
1141 {
1142   my $justcheckpoint = shift; # false if this will be the last meta saved
1143   my $m = $metastream;
1144   $m = $m->copy if $justcheckpoint;
1145   $m->write_finish;
1146   my $loglocator = $m->as_key;
1147   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1148   Log (undef, "meta key is $loglocator");
1149   $Job->{'log'} = $loglocator;
1150   $Job->save if $job_has_uuid;
1151 }
1152
1153
1154 sub freeze_if_want_freeze
1155 {
1156   if ($main::please_freeze)
1157   {
1158     release_allocation();
1159     if (@_)
1160     {
1161       # kill some srun procs before freeze+stop
1162       map { $proc{$_} = {} } @_;
1163       while (%proc)
1164       {
1165         killem (keys %proc);
1166         select (undef, undef, undef, 0.1);
1167         my $died;
1168         while (($died = waitpid (-1, WNOHANG)) > 0)
1169         {
1170           delete $proc{$died};
1171         }
1172       }
1173     }
1174     freeze();
1175     collate_output();
1176     cleanup();
1177     save_meta();
1178     exit 0;
1179   }
1180 }
1181
1182
1183 sub freeze
1184 {
1185   Log (undef, "Freeze not implemented");
1186   return;
1187 }
1188
1189
1190 sub thaw
1191 {
1192   croak ("Thaw not implemented");
1193
1194   my $whc;
1195   my $key = shift;
1196   Log (undef, "thaw from $key");
1197
1198   @jobstep = ();
1199   @jobstep_done = ();
1200   @jobstep_todo = ();
1201   @jobstep_tomerge = ();
1202   $jobstep_tomerge_level = 0;
1203   my $frozenjob = {};
1204
1205   my $stream = new Warehouse::Stream ( whc => $whc,
1206                                        hash => [split (",", $key)] );
1207   $stream->rewind;
1208   while (my $dataref = $stream->read_until (undef, "\n\n"))
1209   {
1210     if ($$dataref =~ /^job /)
1211     {
1212       foreach (split ("\n", $$dataref))
1213       {
1214         my ($k, $v) = split ("=", $_, 2);
1215         $frozenjob->{$k} = freezeunquote ($v);
1216       }
1217       next;
1218     }
1219
1220     if ($$dataref =~ /^merge (\d+) (.*)/)
1221     {
1222       $jobstep_tomerge_level = $1;
1223       @jobstep_tomerge
1224           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1225       next;
1226     }
1227
1228     my $Jobstep = { };
1229     foreach (split ("\n", $$dataref))
1230     {
1231       my ($k, $v) = split ("=", $_, 2);
1232       $Jobstep->{$k} = freezeunquote ($v) if $k;
1233     }
1234     $Jobstep->{'failures'} = 0;
1235     push @jobstep, $Jobstep;
1236
1237     if ($Jobstep->{exitcode} eq "0")
1238     {
1239       push @jobstep_done, $#jobstep;
1240     }
1241     else
1242     {
1243       push @jobstep_todo, $#jobstep;
1244     }
1245   }
1246
1247   foreach (qw (script script_version script_parameters))
1248   {
1249     $Job->{$_} = $frozenjob->{$_};
1250   }
1251   $Job->save if $job_has_uuid;
1252 }
1253
1254
1255 sub freezequote
1256 {
1257   my $s = shift;
1258   $s =~ s/\\/\\\\/g;
1259   $s =~ s/\n/\\n/g;
1260   return $s;
1261 }
1262
1263
1264 sub freezeunquote
1265 {
1266   my $s = shift;
1267   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1268   return $s;
1269 }
1270
1271
1272 sub srun
1273 {
1274   my $srunargs = shift;
1275   my $execargs = shift;
1276   my $opts = shift || {};
1277   my $stdin = shift;
1278   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1279   print STDERR (join (" ",
1280                       map { / / ? "'$_'" : $_ }
1281                       (@$args)),
1282                 "\n")
1283       if $ENV{CRUNCH_DEBUG};
1284
1285   if (defined $stdin) {
1286     my $child = open STDIN, "-|";
1287     defined $child or die "no fork: $!";
1288     if ($child == 0) {
1289       print $stdin or die $!;
1290       close STDOUT or die $!;
1291       exit 0;
1292     }
1293   }
1294
1295   return system (@$args) if $opts->{fork};
1296
1297   exec @$args;
1298   warn "ENV size is ".length(join(" ",%ENV));
1299   die "exec failed: $!: @$args";
1300 }
1301
1302
1303 sub ban_node_by_slot {
1304   # Don't start any new jobsteps on this node for 60 seconds
1305   my $slotid = shift;
1306   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1307   $slot[$slotid]->{node}->{hold_count}++;
1308   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1309 }
1310
1311 __DATA__
1312 #!/usr/bin/perl
1313
1314 # checkout-and-build
1315
1316 use Fcntl ':flock';
1317
1318 my $destdir = $ENV{"CRUNCH_SRC"};
1319 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1320 my $repo = $ENV{"CRUNCH_SRC_URL"};
1321
1322 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1323 flock L, LOCK_EX;
1324 if (readlink ("$destdir.commit") eq $commit) {
1325     exit 0;
1326 }
1327
1328 unlink "$destdir.commit";
1329 open STDOUT, ">", "$destdir.log";
1330 open STDERR, ">&STDOUT";
1331
1332 mkdir $destdir;
1333 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1334 print TARX <DATA>;
1335 if(!close(TARX)) {
1336   die "'tar -C $destdir -xf -' exited $?: $!";
1337 }
1338
1339 my $pwd;
1340 chomp ($pwd = `pwd`);
1341 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1342 mkdir $install_dir;
1343 if (-e "$destdir/crunch_scripts/install") {
1344     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1345 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1346     # Old version
1347     shell_or_die ("./tests/autotests.sh", $install_dir);
1348 } elsif (-e "./install.sh") {
1349     shell_or_die ("./install.sh", $install_dir);
1350 }
1351
1352 if ($commit) {
1353     unlink "$destdir.commit.new";
1354     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1355     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1356 }
1357
1358 close L;
1359
1360 exit 0;
1361
1362 sub shell_or_die
1363 {
1364   if ($ENV{"DEBUG"}) {
1365     print STDERR "@_\n";
1366   }
1367   system (@_) == 0
1368       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1369 }
1370
1371 __DATA__