Store crunch-job log in a collection instead of just raw data blocks.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
89
90 my $force_unlock;
91 my $git_dir;
92 my $jobspec;
93 my $job_api_token;
94 my $resume_stash;
95 GetOptions('force-unlock' => \$force_unlock,
96            'git-dir=s' => \$git_dir,
97            'job=s' => \$jobspec,
98            'job-api-token=s' => \$job_api_token,
99            'resume-stash=s' => \$resume_stash,
100     );
101
102 if (defined $job_api_token) {
103   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
104 }
105
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
109
110
111 $SIG{'USR1'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 1;
114 };
115 $SIG{'USR2'} = sub
116 {
117   $main::ENV{CRUNCH_DEBUG} = 0;
118 };
119
120
121
122 my $arv = Arvados->new;
123 my $metastream;
124
125 my $User = $arv->{'users'}->{'current'}->execute;
126
127 my $Job = {};
128 my $job_id;
129 my $dbh;
130 my $sth;
131 if ($job_has_uuid)
132 {
133   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
134   if (!$force_unlock) {
135     if ($Job->{'is_locked_by_uuid'}) {
136       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
137     }
138     if ($Job->{'success'} ne undef) {
139       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
140     }
141     if ($Job->{'running'}) {
142       croak("Job 'running' flag is already set");
143     }
144     if ($Job->{'started_at'}) {
145       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
146     }
147   }
148 }
149 else
150 {
151   $Job = JSON::decode_json($jobspec);
152
153   if (!$resume_stash)
154   {
155     map { croak ("No $_ specified") unless $Job->{$_} }
156     qw(script script_version script_parameters);
157   }
158
159   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
160   $Job->{'started_at'} = gmtime;
161
162   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
163
164   $job_has_uuid = 1;
165 }
166 $job_id = $Job->{'uuid'};
167
168 $metastream = Warehouse::Stream->new(whc => new Warehouse);
169 $metastream->clear;
170 $metastream->name('.');
171 $metastream->write_start($job_id . '.log.txt');
172
173
174 $Job->{'runtime_constraints'} ||= {};
175 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
176 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
177
178
179 Log (undef, "check slurm allocation");
180 my @slot;
181 my @node;
182 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
183 my @sinfo;
184 if (!$have_slurm)
185 {
186   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
187   push @sinfo, "$localcpus localhost";
188 }
189 if (exists $ENV{SLURM_NODELIST})
190 {
191   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
192 }
193 foreach (@sinfo)
194 {
195   my ($ncpus, $slurm_nodelist) = split;
196   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
197
198   my @nodelist;
199   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
200   {
201     my $nodelist = $1;
202     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
203     {
204       my $ranges = $1;
205       foreach (split (",", $ranges))
206       {
207         my ($a, $b);
208         if (/(\d+)-(\d+)/)
209         {
210           $a = $1;
211           $b = $2;
212         }
213         else
214         {
215           $a = $_;
216           $b = $_;
217         }
218         push @nodelist, map {
219           my $n = $nodelist;
220           $n =~ s/\[[-,\d]+\]/$_/;
221           $n;
222         } ($a..$b);
223       }
224     }
225     else
226     {
227       push @nodelist, $nodelist;
228     }
229   }
230   foreach my $nodename (@nodelist)
231   {
232     Log (undef, "node $nodename - $ncpus slots");
233     my $node = { name => $nodename,
234                  ncpus => $ncpus,
235                  losing_streak => 0,
236                  hold_until => 0 };
237     foreach my $cpu (1..$ncpus)
238     {
239       push @slot, { node => $node,
240                     cpu => $cpu };
241     }
242   }
243   push @node, @nodelist;
244 }
245
246
247
248 # Ensure that we get one jobstep running on each allocated node before
249 # we start overloading nodes with concurrent steps
250
251 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
252
253
254
255 my $jobmanager_id;
256 if ($job_has_uuid)
257 {
258   # Claim this job, and make sure nobody else does
259   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
260           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
261     croak("Error while updating / locking job");
262   }
263   $Job->update_attributes('started_at' => scalar gmtime,
264                           'running' => 1,
265                           'success' => undef,
266                           'tasks_summary' => { 'failed' => 0,
267                                                'todo' => 1,
268                                                'running' => 0,
269                                                'done' => 0 });
270 }
271
272
273 Log (undef, "start");
274 $SIG{'INT'} = sub { $main::please_freeze = 1; };
275 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
276 $SIG{'TERM'} = \&croak;
277 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
278 $SIG{'ALRM'} = sub { $main::please_info = 1; };
279 $SIG{'CONT'} = sub { $main::please_continue = 1; };
280 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
281
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 $main::please_refresh = 0;
286 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
287
288 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
289 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
290 $ENV{"JOB_UUID"} = $job_id;
291
292
293 my @jobstep;
294 my @jobstep_todo = ();
295 my @jobstep_done = ();
296 my @jobstep_tomerge = ();
297 my $jobstep_tomerge_level = 0;
298 my $squeue_checked;
299 my $squeue_kill_checked;
300 my $output_in_keep = 0;
301 my $latest_refresh = scalar time;
302
303
304
305 if (defined $Job->{thawedfromkey})
306 {
307   thaw ($Job->{thawedfromkey});
308 }
309 else
310 {
311   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312     'job_uuid' => $Job->{'uuid'},
313     'sequence' => 0,
314     'qsequence' => 0,
315     'parameters' => {},
316                                                           });
317   push @jobstep, { 'level' => 0,
318                    'failures' => 0,
319                    'arvados_task' => $first_task,
320                  };
321   push @jobstep_todo, 0;
322 }
323
324
325 my $build_script;
326
327
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
329
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 if ($skip_install)
332 {
333   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
334 }
335 else
336 {
337   do {
338     local $/ = undef;
339     $build_script = <DATA>;
340   };
341   Log (undef, "Install revision ".$Job->{script_version});
342   my $nodelist = join(",", @node);
343
344   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src
345
346   my $cleanpid = fork();
347   if ($cleanpid == 0)
348   {
349     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src']);
351     exit (1);
352   }
353   while (1)
354   {
355     last if $cleanpid == waitpid (-1, WNOHANG);
356     freeze_if_want_freeze ($cleanpid);
357     select (undef, undef, undef, 0.1);
358   }
359   Log (undef, "Clean-work-dir exited $?");
360
361   # Install requested code version
362
363   my @execargs;
364   my @srunargs = ("srun",
365                   "--nodelist=$nodelist",
366                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
367
368   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
371
372   my $commit;
373   my $git_archive;
374   my $treeish = $Job->{'script_version'};
375   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376   # Todo: let script_version specify repository instead of expecting
377   # parent process to figure it out.
378   $ENV{"CRUNCH_SRC_URL"} = $repo;
379
380   # Create/update our clone of the remote git repo
381
382   if (!-d $ENV{"CRUNCH_SRC"}) {
383     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384         or croak ("git clone $repo failed: exit ".($?>>8));
385     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
386   }
387   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
388
389   # If this looks like a subversion r#, look for it in git-svn commit messages
390
391   if ($treeish =~ m{^\d{1,4}$}) {
392     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
393     chomp $gitlog;
394     if ($gitlog =~ /^[a-f0-9]{40}$/) {
395       $commit = $gitlog;
396       Log (undef, "Using commit $commit for script_version $treeish");
397     }
398   }
399
400   # If that didn't work, try asking git to look it up as a tree-ish.
401
402   if (!defined $commit) {
403
404     my $cooked_treeish = $treeish;
405     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406       # Looks like a git branch name -- make sure git knows it's
407       # relative to the remote repo
408       $cooked_treeish = "origin/$treeish";
409     }
410
411     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
412     chomp $found;
413     if ($found =~ /^[0-9a-f]{40}$/s) {
414       $commit = $found;
415       if ($commit ne $treeish) {
416         # Make sure we record the real commit id in the database,
417         # frozentokey, logs, etc. -- instead of an abbreviation or a
418         # branch name which can become ambiguous or point to a
419         # different commit in the future.
420         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421         Log (undef, "Using commit $commit for tree-ish $treeish");
422         if ($commit ne $treeish) {
423           $Job->{'script_version'} = $commit;
424           !$job_has_uuid or
425               $Job->update_attributes('script_version' => $commit) or
426               croak("Error while updating job");
427         }
428       }
429     }
430   }
431
432   if (defined $commit) {
433     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
434     @execargs = ("sh", "-c",
435                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
436     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
437   }
438   else {
439     croak ("could not figure out commit id for $treeish");
440   }
441
442   my $installpid = fork();
443   if ($installpid == 0)
444   {
445     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
446     exit (1);
447   }
448   while (1)
449   {
450     last if $installpid == waitpid (-1, WNOHANG);
451     freeze_if_want_freeze ($installpid);
452     select (undef, undef, undef, 0.1);
453   }
454   Log (undef, "Install exited $?");
455 }
456
457
458
459 foreach (qw (script script_version script_parameters runtime_constraints))
460 {
461   Log (undef,
462        "$_ " .
463        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
464 }
465 foreach (split (/\n/, $Job->{knobs}))
466 {
467   Log (undef, "knob " . $_);
468 }
469
470
471
472 $main::success = undef;
473
474
475
476 ONELEVEL:
477
478 my $thisround_succeeded = 0;
479 my $thisround_failed = 0;
480 my $thisround_failed_multiple = 0;
481
482 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
483                        or $a <=> $b } @jobstep_todo;
484 my $level = $jobstep[$jobstep_todo[0]]->{level};
485 Log (undef, "start level $level");
486
487
488
489 my %proc;
490 my @freeslot = (0..$#slot);
491 my @holdslot;
492 my %reader;
493 my $progress_is_dirty = 1;
494 my $progress_stats_updated = 0;
495
496 update_progress_stats();
497
498
499
500 THISROUND:
501 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
502 {
503   my $id = $jobstep_todo[$todo_ptr];
504   my $Jobstep = $jobstep[$id];
505   if ($Jobstep->{level} != $level)
506   {
507     next;
508   }
509
510   pipe $reader{$id}, "writer" or croak ($!);
511   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
512   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
513
514   my $childslot = $freeslot[0];
515   my $childnode = $slot[$childslot]->{node};
516   my $childslotname = join (".",
517                             $slot[$childslot]->{node}->{name},
518                             $slot[$childslot]->{cpu});
519   my $childpid = fork();
520   if ($childpid == 0)
521   {
522     $SIG{'INT'} = 'DEFAULT';
523     $SIG{'QUIT'} = 'DEFAULT';
524     $SIG{'TERM'} = 'DEFAULT';
525
526     foreach (values (%reader))
527     {
528       close($_);
529     }
530     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
531     open(STDOUT,">&writer");
532     open(STDERR,">&writer");
533
534     undef $dbh;
535     undef $sth;
536
537     delete $ENV{"GNUPGHOME"};
538     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
539     $ENV{"TASK_QSEQUENCE"} = $id;
540     $ENV{"TASK_SEQUENCE"} = $level;
541     $ENV{"JOB_SCRIPT"} = $Job->{script};
542     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
543       $param =~ tr/a-z/A-Z/;
544       $ENV{"JOB_PARAMETER_$param"} = $value;
545     }
546     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
547     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
548     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
549     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
550     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
551
552     $ENV{"GZIP"} = "-n";
553
554     my @srunargs = (
555       "srun",
556       "--nodelist=".$childnode->{name},
557       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
558       "--job-name=$job_id.$id.$$",
559         );
560     my @execargs = qw(sh);
561     my $build_script_to_send = "";
562     my $command =
563         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
564         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
565         ."&& cd $ENV{CRUNCH_TMP} ";
566     if ($build_script)
567     {
568       $build_script_to_send = $build_script;
569       $command .=
570           "&& perl -";
571     }
572     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
573     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
574     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
575     $command .=
576         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
577     my @execargs = ('bash', '-c', $command);
578     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
579     exit (111);
580   }
581   close("writer");
582   if (!defined $childpid)
583   {
584     close $reader{$id};
585     delete $reader{$id};
586     next;
587   }
588   shift @freeslot;
589   $proc{$childpid} = { jobstep => $id,
590                        time => time,
591                        slot => $childslot,
592                        jobstepname => "$job_id.$id.$childpid",
593                      };
594   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
595   $slot[$childslot]->{pid} = $childpid;
596
597   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
598   Log ($id, "child $childpid started on $childslotname");
599   $Jobstep->{starttime} = time;
600   $Jobstep->{node} = $childnode->{name};
601   $Jobstep->{slotindex} = $childslot;
602   delete $Jobstep->{stderr};
603   delete $Jobstep->{finishtime};
604
605   splice @jobstep_todo, $todo_ptr, 1;
606   --$todo_ptr;
607
608   $progress_is_dirty = 1;
609
610   while (!@freeslot
611          ||
612          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
613   {
614     last THISROUND if $main::please_freeze;
615     if ($main::please_info)
616     {
617       $main::please_info = 0;
618       freeze();
619       collate_output();
620       save_meta(1);
621       update_progress_stats();
622     }
623     my $gotsome
624         = readfrompipes ()
625         + reapchildren ();
626     if (!$gotsome)
627     {
628       check_refresh_wanted();
629       check_squeue();
630       update_progress_stats();
631       select (undef, undef, undef, 0.1);
632     }
633     elsif (time - $progress_stats_updated >= 30)
634     {
635       update_progress_stats();
636     }
637     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
638         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
639     {
640       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
641           .($thisround_failed+$thisround_succeeded)
642           .") -- giving up on this round";
643       Log (undef, $message);
644       last THISROUND;
645     }
646
647     # move slots from freeslot to holdslot (or back to freeslot) if necessary
648     for (my $i=$#freeslot; $i>=0; $i--) {
649       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
650         push @holdslot, (splice @freeslot, $i, 1);
651       }
652     }
653     for (my $i=$#holdslot; $i>=0; $i--) {
654       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
655         push @freeslot, (splice @holdslot, $i, 1);
656       }
657     }
658
659     # give up if no nodes are succeeding
660     if (!grep { $_->{node}->{losing_streak} == 0 &&
661                     $_->{node}->{hold_count} < 4 } @slot) {
662       my $message = "Every node has failed -- giving up on this round";
663       Log (undef, $message);
664       last THISROUND;
665     }
666   }
667 }
668
669
670 push @freeslot, splice @holdslot;
671 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
672
673
674 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
675 while (%proc)
676 {
677   if ($main::please_continue) {
678     $main::please_continue = 0;
679     goto THISROUND;
680   }
681   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
682   readfrompipes ();
683   if (!reapchildren())
684   {
685     check_refresh_wanted();
686     check_squeue();
687     update_progress_stats();
688     select (undef, undef, undef, 0.1);
689     killem (keys %proc) if $main::please_freeze;
690   }
691 }
692
693 update_progress_stats();
694 freeze_if_want_freeze();
695
696
697 if (!defined $main::success)
698 {
699   if (@jobstep_todo &&
700       $thisround_succeeded == 0 &&
701       ($thisround_failed == 0 || $thisround_failed > 4))
702   {
703     my $message = "stop because $thisround_failed tasks failed and none succeeded";
704     Log (undef, $message);
705     $main::success = 0;
706   }
707   if (!@jobstep_todo)
708   {
709     $main::success = 1;
710   }
711 }
712
713 goto ONELEVEL if !defined $main::success;
714
715
716 release_allocation();
717 freeze();
718 if ($job_has_uuid) {
719   $Job->update_attributes('output' => &collate_output(),
720                           'running' => 0,
721                           'success' => $Job->{'output'} && $main::success,
722                           'finished_at' => scalar gmtime)
723 }
724
725 if ($Job->{'output'})
726 {
727   eval {
728     my $manifest_text = capturex("whget", $Job->{'output'});
729     $arv->{'collections'}->{'create'}->execute('collection' => {
730       'uuid' => $Job->{'output'},
731       'manifest_text' => $manifest_text,
732     });
733   };
734   if ($@) {
735     Log (undef, "Failed to register output manifest: $@");
736   }
737 }
738
739 Log (undef, "finish");
740
741 save_meta();
742 exit 0;
743
744
745
746 sub update_progress_stats
747 {
748   $progress_stats_updated = time;
749   return if !$progress_is_dirty;
750   my ($todo, $done, $running) = (scalar @jobstep_todo,
751                                  scalar @jobstep_done,
752                                  scalar @slot - scalar @freeslot - scalar @holdslot);
753   $Job->{'tasks_summary'} ||= {};
754   $Job->{'tasks_summary'}->{'todo'} = $todo;
755   $Job->{'tasks_summary'}->{'done'} = $done;
756   $Job->{'tasks_summary'}->{'running'} = $running;
757   if ($job_has_uuid) {
758     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
759   }
760   Log (undef, "status: $done done, $running running, $todo todo");
761   $progress_is_dirty = 0;
762 }
763
764
765
766 sub reapchildren
767 {
768   my $pid = waitpid (-1, WNOHANG);
769   return 0 if $pid <= 0;
770
771   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
772                   . "."
773                   . $slot[$proc{$pid}->{slot}]->{cpu});
774   my $jobstepid = $proc{$pid}->{jobstep};
775   my $elapsed = time - $proc{$pid}->{time};
776   my $Jobstep = $jobstep[$jobstepid];
777
778   my $childstatus = $?;
779   my $exitvalue = $childstatus >> 8;
780   my $exitinfo = sprintf("exit %d signal %d%s",
781                          $exitvalue,
782                          $childstatus & 127,
783                          ($childstatus & 128 ? ' core dump' : ''));
784   $Jobstep->{'arvados_task'}->reload;
785   my $task_success = $Jobstep->{'arvados_task'}->{success};
786
787   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
788
789   if (!defined $task_success) {
790     # task did not indicate one way or the other --> fail
791     $Jobstep->{'arvados_task'}->{success} = 0;
792     $Jobstep->{'arvados_task'}->save;
793     $task_success = 0;
794   }
795
796   if (!$task_success)
797   {
798     my $temporary_fail;
799     $temporary_fail ||= $Jobstep->{node_fail};
800     $temporary_fail ||= ($exitvalue == 111);
801
802     ++$thisround_failed;
803     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
804
805     # Check for signs of a failed or misconfigured node
806     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
807         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
808       # Don't count this against jobstep failure thresholds if this
809       # node is already suspected faulty and srun exited quickly
810       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
811           $elapsed < 5) {
812         Log ($jobstepid, "blaming failure on suspect node " .
813              $slot[$proc{$pid}->{slot}]->{node}->{name});
814         $temporary_fail ||= 1;
815       }
816       ban_node_by_slot($proc{$pid}->{slot});
817     }
818
819     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
820                              ++$Jobstep->{'failures'},
821                              $temporary_fail ? 'temporary ' : 'permanent',
822                              $elapsed));
823
824     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
825       # Give up on this task, and the whole job
826       $main::success = 0;
827       $main::please_freeze = 1;
828     }
829     else {
830       # Put this task back on the todo queue
831       push @jobstep_todo, $jobstepid;
832     }
833     $Job->{'tasks_summary'}->{'failed'}++;
834   }
835   else
836   {
837     ++$thisround_succeeded;
838     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
839     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
840     push @jobstep_done, $jobstepid;
841     Log ($jobstepid, "success in $elapsed seconds");
842   }
843   $Jobstep->{exitcode} = $childstatus;
844   $Jobstep->{finishtime} = time;
845   process_stderr ($jobstepid, $task_success);
846   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
847
848   close $reader{$jobstepid};
849   delete $reader{$jobstepid};
850   delete $slot[$proc{$pid}->{slot}]->{pid};
851   push @freeslot, $proc{$pid}->{slot};
852   delete $proc{$pid};
853
854   # Load new tasks
855   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
856     'where' => {
857       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
858     },
859     'order' => 'qsequence'
860   );
861   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
862     my $jobstep = {
863       'level' => $arvados_task->{'sequence'},
864       'failures' => 0,
865       'arvados_task' => $arvados_task
866     };
867     push @jobstep, $jobstep;
868     push @jobstep_todo, $#jobstep;
869   }
870
871   $progress_is_dirty = 1;
872   1;
873 }
874
875 sub check_refresh_wanted
876 {
877   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
878   if (@stat && $stat[9] > $latest_refresh) {
879     $latest_refresh = scalar time;
880     if ($job_has_uuid) {
881       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
882       for my $attr ('cancelled_at',
883                     'cancelled_by_user_uuid',
884                     'cancelled_by_client_uuid') {
885         $Job->{$attr} = $Job2->{$attr};
886       }
887       if ($Job->{'cancelled_at'}) {
888         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
889              " by user " . $Job->{cancelled_by_user_uuid});
890         $main::success = 0;
891         $main::please_freeze = 1;
892       }
893     }
894   }
895 }
896
897 sub check_squeue
898 {
899   # return if the kill list was checked <4 seconds ago
900   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
901   {
902     return;
903   }
904   $squeue_kill_checked = time;
905
906   # use killem() on procs whose killtime is reached
907   for (keys %proc)
908   {
909     if (exists $proc{$_}->{killtime}
910         && $proc{$_}->{killtime} <= time)
911     {
912       killem ($_);
913     }
914   }
915
916   # return if the squeue was checked <60 seconds ago
917   if (defined $squeue_checked && $squeue_checked > time - 60)
918   {
919     return;
920   }
921   $squeue_checked = time;
922
923   if (!$have_slurm)
924   {
925     # here is an opportunity to check for mysterious problems with local procs
926     return;
927   }
928
929   # get a list of steps still running
930   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
931   chop @squeue;
932   if ($squeue[-1] ne "ok")
933   {
934     return;
935   }
936   pop @squeue;
937
938   # which of my jobsteps are running, according to squeue?
939   my %ok;
940   foreach (@squeue)
941   {
942     if (/^(\d+)\.(\d+) (\S+)/)
943     {
944       if ($1 eq $ENV{SLURM_JOBID})
945       {
946         $ok{$3} = 1;
947       }
948     }
949   }
950
951   # which of my active child procs (>60s old) were not mentioned by squeue?
952   foreach (keys %proc)
953   {
954     if ($proc{$_}->{time} < time - 60
955         && !exists $ok{$proc{$_}->{jobstepname}}
956         && !exists $proc{$_}->{killtime})
957     {
958       # kill this proc if it hasn't exited in 30 seconds
959       $proc{$_}->{killtime} = time + 30;
960     }
961   }
962 }
963
964
965 sub release_allocation
966 {
967   if ($have_slurm)
968   {
969     Log (undef, "release job allocation");
970     system "scancel $ENV{SLURM_JOBID}";
971   }
972 }
973
974
975 sub readfrompipes
976 {
977   my $gotsome = 0;
978   foreach my $job (keys %reader)
979   {
980     my $buf;
981     while (0 < sysread ($reader{$job}, $buf, 8192))
982     {
983       print STDERR $buf if $ENV{CRUNCH_DEBUG};
984       $jobstep[$job]->{stderr} .= $buf;
985       preprocess_stderr ($job);
986       if (length ($jobstep[$job]->{stderr}) > 16384)
987       {
988         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
989       }
990       $gotsome = 1;
991     }
992   }
993   return $gotsome;
994 }
995
996
997 sub preprocess_stderr
998 {
999   my $job = shift;
1000
1001   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1002     my $line = $1;
1003     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1004     Log ($job, "stderr $line");
1005     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1006       # whoa.
1007       $main::please_freeze = 1;
1008     }
1009     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1010       $jobstep[$job]->{node_fail} = 1;
1011       ban_node_by_slot($jobstep[$job]->{slotindex});
1012     }
1013   }
1014 }
1015
1016
1017 sub process_stderr
1018 {
1019   my $job = shift;
1020   my $task_success = shift;
1021   preprocess_stderr ($job);
1022
1023   map {
1024     Log ($job, "stderr $_");
1025   } split ("\n", $jobstep[$job]->{stderr});
1026 }
1027
1028
1029 sub collate_output
1030 {
1031   my $whc = Warehouse->new;
1032   Log (undef, "collate");
1033   $whc->write_start (1);
1034   my $joboutput;
1035   for (@jobstep)
1036   {
1037     next if (!exists $_->{'arvados_task'}->{output} ||
1038              !$_->{'arvados_task'}->{'success'} ||
1039              $_->{'exitcode'} != 0);
1040     my $output = $_->{'arvados_task'}->{output};
1041     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1042     {
1043       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1044       $whc->write_data ($output);
1045     }
1046     elsif (@jobstep == 1)
1047     {
1048       $joboutput = $output;
1049       $whc->write_finish;
1050     }
1051     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1052     {
1053       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1054       $whc->write_data ($outblock);
1055     }
1056     else
1057     {
1058       my $errstr = $whc->errstr;
1059       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1060       $main::success = 0;
1061     }
1062   }
1063   $joboutput = $whc->write_finish if !defined $joboutput;
1064   if ($joboutput)
1065   {
1066     Log (undef, "output $joboutput");
1067     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1068   }
1069   else
1070   {
1071     Log (undef, "output undef");
1072   }
1073   return $joboutput;
1074 }
1075
1076
1077 sub killem
1078 {
1079   foreach (@_)
1080   {
1081     my $sig = 2;                # SIGINT first
1082     if (exists $proc{$_}->{"sent_$sig"} &&
1083         time - $proc{$_}->{"sent_$sig"} > 4)
1084     {
1085       $sig = 15;                # SIGTERM if SIGINT doesn't work
1086     }
1087     if (exists $proc{$_}->{"sent_$sig"} &&
1088         time - $proc{$_}->{"sent_$sig"} > 4)
1089     {
1090       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1091     }
1092     if (!exists $proc{$_}->{"sent_$sig"})
1093     {
1094       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1095       kill $sig, $_;
1096       select (undef, undef, undef, 0.1);
1097       if ($sig == 2)
1098       {
1099         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1100       }
1101       $proc{$_}->{"sent_$sig"} = time;
1102       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1103     }
1104   }
1105 }
1106
1107
1108 sub fhbits
1109 {
1110   my($bits);
1111   for (@_) {
1112     vec($bits,fileno($_),1) = 1;
1113   }
1114   $bits;
1115 }
1116
1117
1118 sub Log                         # ($jobstep_id, $logmessage)
1119 {
1120   if ($_[1] =~ /\n/) {
1121     for my $line (split (/\n/, $_[1])) {
1122       Log ($_[0], $line);
1123     }
1124     return;
1125   }
1126   my $fh = select STDERR; $|=1; select $fh;
1127   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1128   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1129   $message .= "\n";
1130   my $datetime;
1131   if ($metastream || -t STDERR) {
1132     my @gmtime = gmtime;
1133     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1134                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1135   }
1136   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1137
1138   return if !$metastream;
1139   $metastream->write_data ($datetime . " " . $message);
1140 }
1141
1142
1143 sub croak
1144 {
1145   my ($package, $file, $line) = caller;
1146   my $message = "@_ at $file line $line\n";
1147   Log (undef, $message);
1148   freeze() if @jobstep_todo;
1149   collate_output() if @jobstep_todo;
1150   cleanup();
1151   save_meta() if $metastream;
1152   die;
1153 }
1154
1155
1156 sub cleanup
1157 {
1158   return if !$job_has_uuid;
1159   $Job->update_attributes('running' => 0,
1160                           'success' => 0,
1161                           'finished_at' => scalar gmtime);
1162 }
1163
1164
1165 sub save_meta
1166 {
1167   my $justcheckpoint = shift; # false if this will be the last meta saved
1168   my $m = $metastream;
1169   $m = $m->copy if $justcheckpoint;
1170   $m->write_finish;
1171   my $whc = Warehouse->new;
1172   my $loglocator = $whc->store_block ($m->as_string);
1173   $arv->{'collections'}->{'create'}->execute('collection' => {
1174     'uuid' => $loglocator,
1175     'manifest_text' => $m->as_string,
1176   });
1177   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1178   Log (undef, "log manifest is $loglocator");
1179   $Job->{'log'} = $loglocator;
1180   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1181 }
1182
1183
1184 sub freeze_if_want_freeze
1185 {
1186   if ($main::please_freeze)
1187   {
1188     release_allocation();
1189     if (@_)
1190     {
1191       # kill some srun procs before freeze+stop
1192       map { $proc{$_} = {} } @_;
1193       while (%proc)
1194       {
1195         killem (keys %proc);
1196         select (undef, undef, undef, 0.1);
1197         my $died;
1198         while (($died = waitpid (-1, WNOHANG)) > 0)
1199         {
1200           delete $proc{$died};
1201         }
1202       }
1203     }
1204     freeze();
1205     collate_output();
1206     cleanup();
1207     save_meta();
1208     exit 0;
1209   }
1210 }
1211
1212
1213 sub freeze
1214 {
1215   Log (undef, "Freeze not implemented");
1216   return;
1217 }
1218
1219
1220 sub thaw
1221 {
1222   croak ("Thaw not implemented");
1223
1224   my $whc;
1225   my $key = shift;
1226   Log (undef, "thaw from $key");
1227
1228   @jobstep = ();
1229   @jobstep_done = ();
1230   @jobstep_todo = ();
1231   @jobstep_tomerge = ();
1232   $jobstep_tomerge_level = 0;
1233   my $frozenjob = {};
1234
1235   my $stream = new Warehouse::Stream ( whc => $whc,
1236                                        hash => [split (",", $key)] );
1237   $stream->rewind;
1238   while (my $dataref = $stream->read_until (undef, "\n\n"))
1239   {
1240     if ($$dataref =~ /^job /)
1241     {
1242       foreach (split ("\n", $$dataref))
1243       {
1244         my ($k, $v) = split ("=", $_, 2);
1245         $frozenjob->{$k} = freezeunquote ($v);
1246       }
1247       next;
1248     }
1249
1250     if ($$dataref =~ /^merge (\d+) (.*)/)
1251     {
1252       $jobstep_tomerge_level = $1;
1253       @jobstep_tomerge
1254           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1255       next;
1256     }
1257
1258     my $Jobstep = { };
1259     foreach (split ("\n", $$dataref))
1260     {
1261       my ($k, $v) = split ("=", $_, 2);
1262       $Jobstep->{$k} = freezeunquote ($v) if $k;
1263     }
1264     $Jobstep->{'failures'} = 0;
1265     push @jobstep, $Jobstep;
1266
1267     if ($Jobstep->{exitcode} eq "0")
1268     {
1269       push @jobstep_done, $#jobstep;
1270     }
1271     else
1272     {
1273       push @jobstep_todo, $#jobstep;
1274     }
1275   }
1276
1277   foreach (qw (script script_version script_parameters))
1278   {
1279     $Job->{$_} = $frozenjob->{$_};
1280   }
1281   $Job->save if $job_has_uuid;
1282 }
1283
1284
1285 sub freezequote
1286 {
1287   my $s = shift;
1288   $s =~ s/\\/\\\\/g;
1289   $s =~ s/\n/\\n/g;
1290   return $s;
1291 }
1292
1293
1294 sub freezeunquote
1295 {
1296   my $s = shift;
1297   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1298   return $s;
1299 }
1300
1301
1302 sub srun
1303 {
1304   my $srunargs = shift;
1305   my $execargs = shift;
1306   my $opts = shift || {};
1307   my $stdin = shift;
1308   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1309   print STDERR (join (" ",
1310                       map { / / ? "'$_'" : $_ }
1311                       (@$args)),
1312                 "\n")
1313       if $ENV{CRUNCH_DEBUG};
1314
1315   if (defined $stdin) {
1316     my $child = open STDIN, "-|";
1317     defined $child or die "no fork: $!";
1318     if ($child == 0) {
1319       print $stdin or die $!;
1320       close STDOUT or die $!;
1321       exit 0;
1322     }
1323   }
1324
1325   return system (@$args) if $opts->{fork};
1326
1327   exec @$args;
1328   warn "ENV size is ".length(join(" ",%ENV));
1329   die "exec failed: $!: @$args";
1330 }
1331
1332
1333 sub ban_node_by_slot {
1334   # Don't start any new jobsteps on this node for 60 seconds
1335   my $slotid = shift;
1336   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1337   $slot[$slotid]->{node}->{hold_count}++;
1338   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1339 }
1340
1341 __DATA__
1342 #!/usr/bin/perl
1343
1344 # checkout-and-build
1345
1346 use Fcntl ':flock';
1347
1348 my $destdir = $ENV{"CRUNCH_SRC"};
1349 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1350 my $repo = $ENV{"CRUNCH_SRC_URL"};
1351
1352 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1353 flock L, LOCK_EX;
1354 if (readlink ("$destdir.commit") eq $commit) {
1355     exit 0;
1356 }
1357
1358 unlink "$destdir.commit";
1359 open STDOUT, ">", "$destdir.log";
1360 open STDERR, ">&STDOUT";
1361
1362 mkdir $destdir;
1363 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1364 print TARX <DATA>;
1365 if(!close(TARX)) {
1366   die "'tar -C $destdir -xf -' exited $?: $!";
1367 }
1368
1369 my $pwd;
1370 chomp ($pwd = `pwd`);
1371 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1372 mkdir $install_dir;
1373 if (-e "$destdir/crunch_scripts/install") {
1374     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1375 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1376     # Old version
1377     shell_or_die ("./tests/autotests.sh", $install_dir);
1378 } elsif (-e "./install.sh") {
1379     shell_or_die ("./install.sh", $install_dir);
1380 }
1381
1382 if ($commit) {
1383     unlink "$destdir.commit.new";
1384     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1385     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1386 }
1387
1388 close L;
1389
1390 exit 0;
1391
1392 sub shell_or_die
1393 {
1394   if ($ENV{"DEBUG"}) {
1395     print STDERR "@_\n";
1396   }
1397   system (@_) == 0
1398       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1399 }
1400
1401 __DATA__