send git-archive from ctl to compute node instead of running git-pull on compute
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100
101
102 $SIG{'HUP'} = sub
103 {
104   1;
105 };
106 $SIG{'USR1'} = sub
107 {
108   $main::ENV{CRUNCH_DEBUG} = 1;
109 };
110 $SIG{'USR2'} = sub
111 {
112   $main::ENV{CRUNCH_DEBUG} = 0;
113 };
114
115
116
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
119 $metastream->clear;
120 $metastream->write_start('log.txt');
121
122 my $User = {};
123 my $Job = {};
124 my $job_id;
125 my $dbh;
126 my $sth;
127 if ($job_has_uuid)
128 {
129   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130   $User = $arv->{'users'}->{'current'}->execute;
131   if (!$force_unlock) {
132     if ($Job->{'is_locked_by'}) {
133       croak("Job is locked: " . $Job->{'is_locked_by'});
134     }
135     if ($Job->{'success'} ne undef) {
136       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
137     }
138     if ($Job->{'running'}) {
139       croak("Job 'running' flag is already set");
140     }
141     if ($Job->{'started_at'}) {
142       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
143     }
144   }
145 }
146 else
147 {
148   $Job = JSON::decode_json($jobspec);
149
150   if (!$resume_stash)
151   {
152     map { croak ("No $_ specified") unless $Job->{$_} }
153     qw(script script_version script_parameters);
154   }
155
156   if (!defined $Job->{'uuid'}) {
157     chomp ($Job->{'uuid'} = sprintf ("%s-t%d-p%d", `hostname -s`, time, $$));
158   }
159 }
160 $job_id = $Job->{'uuid'};
161
162
163
164 $Job->{'resource_limits'} ||= {};
165 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
166 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
167
168
169 Log (undef, "check slurm allocation");
170 my @slot;
171 my @node;
172 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
173 my @sinfo;
174 if (!$have_slurm)
175 {
176   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
177   push @sinfo, "$localcpus localhost";
178 }
179 if (exists $ENV{SLURM_NODELIST})
180 {
181   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
182 }
183 foreach (@sinfo)
184 {
185   my ($ncpus, $slurm_nodelist) = split;
186   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
187
188   my @nodelist;
189   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
190   {
191     my $nodelist = $1;
192     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
193     {
194       my $ranges = $1;
195       foreach (split (",", $ranges))
196       {
197         my ($a, $b);
198         if (/(\d+)-(\d+)/)
199         {
200           $a = $1;
201           $b = $2;
202         }
203         else
204         {
205           $a = $_;
206           $b = $_;
207         }
208         push @nodelist, map {
209           my $n = $nodelist;
210           $n =~ s/\[[-,\d]+\]/$_/;
211           $n;
212         } ($a..$b);
213       }
214     }
215     else
216     {
217       push @nodelist, $nodelist;
218     }
219   }
220   foreach my $nodename (@nodelist)
221   {
222     Log (undef, "node $nodename - $ncpus slots");
223     my $node = { name => $nodename,
224                  ncpus => $ncpus,
225                  losing_streak => 0,
226                  hold_until => 0 };
227     foreach my $cpu (1..$ncpus)
228     {
229       push @slot, { node => $node,
230                     cpu => $cpu };
231     }
232   }
233   push @node, @nodelist;
234 }
235
236
237
238 # Ensure that we get one jobstep running on each allocated node before
239 # we start overloading nodes with concurrent steps
240
241 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
242
243
244
245 my $jobmanager_id;
246 if ($job_has_uuid)
247 {
248   # Claim this job, and make sure nobody else does
249
250   $Job->{'is_locked_by'} = $User->{'uuid'};
251   $Job->{'started_at'} = gmtime;
252   $Job->{'running'} = 1;
253   $Job->{'success'} = undef;
254   $Job->{'tasks_summary'} = { 'failed' => 0,
255                               'todo' => 1,
256                               'running' => 0,
257                               'done' => 0 };
258   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261 }
262
263
264 Log (undef, "start");
265 $SIG{'INT'} = sub { $main::please_freeze = 1; };
266 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
267 $SIG{'TERM'} = \&croak;
268 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
269 $SIG{'ALRM'} = sub { $main::please_info = 1; };
270 $SIG{'CONT'} = sub { $main::please_continue = 1; };
271 $main::please_freeze = 0;
272 $main::please_info = 0;
273 $main::please_continue = 0;
274 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
275
276 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
277 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
278 $ENV{"JOB_UUID"} = $job_id;
279
280
281 my @jobstep;
282 my @jobstep_todo = ();
283 my @jobstep_done = ();
284 my @jobstep_tomerge = ();
285 my $jobstep_tomerge_level = 0;
286 my $squeue_checked;
287 my $squeue_kill_checked;
288 my $output_in_keep = 0;
289
290
291
292 if (defined $Job->{thawedfromkey})
293 {
294   thaw ($Job->{thawedfromkey});
295 }
296 else
297 {
298   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
299     'job_uuid' => $Job->{'uuid'},
300     'sequence' => 0,
301     'qsequence' => 0,
302     'parameters' => {},
303                                                           });
304   push @jobstep, { 'level' => 0,
305                    'attempts' => 0,
306                    'arvados_task' => $first_task,
307                  };
308   push @jobstep_todo, 0;
309 }
310
311
312 my $build_script;
313 do {
314   local $/ = undef;
315   $build_script = <DATA>;
316 };
317
318
319 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
320
321 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
322 if ($skip_install)
323 {
324   $ENV{"CRUNCH_SRC"} = $Job->{revision};
325 }
326 else
327 {
328   Log (undef, "Install revision ".$Job->{revision});
329   my $nodelist = join(",", @node);
330
331   # Clean out crunch_tmp/work and crunch_tmp/opt
332
333   my $cleanpid = fork();
334   if ($cleanpid == 0)
335   {
336     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
337           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
338     exit (1);
339   }
340   while (1)
341   {
342     last if $cleanpid == waitpid (-1, WNOHANG);
343     freeze_if_want_freeze ($cleanpid);
344     select (undef, undef, undef, 0.1);
345   }
346   Log (undef, "Clean-work-dir exited $?");
347
348   # Install requested code version
349
350   my @execargs;
351   my @srunargs = ("srun",
352                   "--nodelist=$nodelist",
353                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
354
355   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
356   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
357   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
358
359   my $commit;
360   my $git_archive;
361   my $treeish = $Job->{'script_version'};
362   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
363   # Todo: let script_version specify repository instead of expecting
364   # parent process to figure it out.
365   $ENV{"CRUNCH_SRC_URL"} = $repo;
366
367   # Create/update our clone of the remote git repo
368
369   if (!-d $ENV{"CRUNCH_SRC"}) {
370     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
371         or croak ("git clone $repo failed: exit ".($?>>8));
372     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
373   }
374   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
375
376   # If this looks like a subversion r#, look for it in git-svn commit messages
377
378   if ($treeish =~ m{^\d{1,4}$}) {
379     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
380     chomp $gitlog;
381     if ($gitlog =~ /^[a-f0-9]{40}$/) {
382       $commit = $gitlog;
383       Log (undef, "Using commit $commit for revision $treeish");
384     }
385   }
386
387   # If that didn't work, try asking git to look it up as a tree-ish.
388
389   if (!defined $commit) {
390
391     my $cooked_treeish = $treeish;
392     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
393       # Looks like a git branch name -- make sure git knows it's
394       # relative to the remote repo
395       $cooked_treeish = "origin/$treeish";
396     }
397
398     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
399     chomp $found;
400     if ($found =~ /^[0-9a-f]{40}$/s) {
401       $commit = $found;
402       if ($commit ne $treeish) {
403         # Make sure we record the real commit id in the database,
404         # frozentokey, logs, etc. -- instead of an abbreviation or a
405         # branch name which can become ambiguous or point to a
406         # different commit in the future.
407         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
408         Log (undef, "Using commit $commit for tree-ish $treeish");
409         if ($commit ne $treeish) {
410           $Job->{'script_version'} = $commit;
411           $Job->save() or croak("Error while updating job");
412         }
413       }
414     }
415   }
416
417   if (defined $commit) {
418     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419     @execargs = ("sh", "-c",
420                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
421     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
422   }
423   else {
424     croak ("could not figure out commit id for $treeish");
425   }
426
427   my $installpid = fork();
428   if ($installpid == 0)
429   {
430     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
431     exit (1);
432   }
433   while (1)
434   {
435     last if $installpid == waitpid (-1, WNOHANG);
436     freeze_if_want_freeze ($installpid);
437     select (undef, undef, undef, 0.1);
438   }
439   Log (undef, "Install exited $?");
440 }
441
442
443
444 foreach (qw (script script_version script_parameters resource_limits))
445 {
446   Log (undef,
447        "$_ " .
448        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
449 }
450 foreach (split (/\n/, $Job->{knobs}))
451 {
452   Log (undef, "knob " . $_);
453 }
454
455
456
457 my $success;
458
459
460
461 ONELEVEL:
462
463 my $thisround_succeeded = 0;
464 my $thisround_failed = 0;
465 my $thisround_failed_multiple = 0;
466
467 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
468                        or $a <=> $b } @jobstep_todo;
469 my $level = $jobstep[$jobstep_todo[0]]->{level};
470 Log (undef, "start level $level");
471
472
473
474 my %proc;
475 my @freeslot = (0..$#slot);
476 my @holdslot;
477 my %reader;
478 my $progress_is_dirty = 1;
479 my $progress_stats_updated = 0;
480
481 update_progress_stats();
482
483
484
485 THISROUND:
486 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
487 {
488   $main::please_continue = 0;
489
490   my $id = $jobstep_todo[$todo_ptr];
491   my $Jobstep = $jobstep[$id];
492   if ($Jobstep->{level} != $level)
493   {
494     next;
495   }
496   if ($Jobstep->{attempts} > 9)
497   {
498     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
499     $success = 0;
500     last THISROUND;
501   }
502
503   pipe $reader{$id}, "writer" or croak ($!);
504   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
505   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
506
507   my $childslot = $freeslot[0];
508   my $childnode = $slot[$childslot]->{node};
509   my $childslotname = join (".",
510                             $slot[$childslot]->{node}->{name},
511                             $slot[$childslot]->{cpu});
512   my $childpid = fork();
513   if ($childpid == 0)
514   {
515     $SIG{'INT'} = 'DEFAULT';
516     $SIG{'QUIT'} = 'DEFAULT';
517     $SIG{'TERM'} = 'DEFAULT';
518
519     foreach (values (%reader))
520     {
521       close($_);
522     }
523     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
524     open(STDOUT,">&writer");
525     open(STDERR,">&writer");
526
527     undef $dbh;
528     undef $sth;
529
530     delete $ENV{"GNUPGHOME"};
531     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
532     $ENV{"TASK_QSEQUENCE"} = $id;
533     $ENV{"TASK_SEQUENCE"} = $level;
534     $ENV{"JOB_SCRIPT"} = $Job->{script};
535     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
536       $param =~ tr/a-z/A-Z/;
537       $ENV{"JOB_PARAMETER_$param"} = $value;
538     }
539     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
540     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
541     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
542     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
543
544     $ENV{"GZIP"} = "-n";
545
546     my @srunargs = (
547       "srun",
548       "--nodelist=".$childnode->{name},
549       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
550       "--job-name=$job_id.$id.$$",
551         );
552     my @execargs = qw(sh);
553     my $build_script_to_send = "";
554     my $command =
555         "mkdir -p $ENV{CRUNCH_TMP}/revision "
556         ."&& cd $ENV{CRUNCH_TMP} ";
557     if ($build_script)
558     {
559       $build_script_to_send = $build_script;
560       $command .=
561           "&& perl -";
562     }
563     elsif (!$skip_install)
564     {
565       $command .=
566           "&& "
567           ."( "
568           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
569           ."|| "
570           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
571           ."    && ./installrevision "
572           ."  ) "
573           .") ";
574     }
575     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
576     $command .=
577         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
578     my @execargs = ('bash', '-c', $command);
579     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
580     exit (1);
581   }
582   close("writer");
583   if (!defined $childpid)
584   {
585     close $reader{$id};
586     delete $reader{$id};
587     next;
588   }
589   shift @freeslot;
590   $proc{$childpid} = { jobstep => $id,
591                        time => time,
592                        slot => $childslot,
593                        jobstepname => "$job_id.$id.$childpid",
594                      };
595   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
596   $slot[$childslot]->{pid} = $childpid;
597
598   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
599   Log ($id, "child $childpid started on $childslotname");
600   $Jobstep->{attempts} ++;
601   $Jobstep->{starttime} = time;
602   $Jobstep->{node} = $childnode->{name};
603   $Jobstep->{slotindex} = $childslot;
604   delete $Jobstep->{stderr};
605   delete $Jobstep->{finishtime};
606
607   splice @jobstep_todo, $todo_ptr, 1;
608   --$todo_ptr;
609
610   $progress_is_dirty = 1;
611
612   while (!@freeslot
613          ||
614          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
615   {
616     last THISROUND if $main::please_freeze;
617     if ($main::please_info)
618     {
619       $main::please_info = 0;
620       freeze();
621       collate_output();
622       save_meta(1);
623       update_progress_stats();
624     }
625     my $gotsome
626         = readfrompipes ()
627         + reapchildren ();
628     if (!$gotsome)
629     {
630       check_squeue();
631       update_progress_stats();
632       select (undef, undef, undef, 0.1);
633     }
634     elsif (time - $progress_stats_updated >= 30)
635     {
636       update_progress_stats();
637     }
638     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
639         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
640     {
641       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
642           .($thisround_failed+$thisround_succeeded)
643           .") -- giving up on this round";
644       Log (undef, $message);
645       last THISROUND;
646     }
647
648     # move slots from freeslot to holdslot (or back to freeslot) if necessary
649     for (my $i=$#freeslot; $i>=0; $i--) {
650       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
651         push @holdslot, (splice @freeslot, $i, 1);
652       }
653     }
654     for (my $i=$#holdslot; $i>=0; $i--) {
655       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
656         push @freeslot, (splice @holdslot, $i, 1);
657       }
658     }
659
660     # give up if no nodes are succeeding
661     if (!grep { $_->{node}->{losing_streak} == 0 &&
662                     $_->{node}->{hold_count} < 4 } @slot) {
663       my $message = "Every node has failed -- giving up on this round";
664       Log (undef, $message);
665       last THISROUND;
666     }
667   }
668 }
669
670
671 push @freeslot, splice @holdslot;
672 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
673
674
675 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
676 while (%proc)
677 {
678   goto THISROUND if $main::please_continue;
679   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
680   readfrompipes ();
681   if (!reapchildren())
682   {
683     check_squeue();
684     update_progress_stats();
685     select (undef, undef, undef, 0.1);
686     killem (keys %proc) if $main::please_freeze;
687   }
688 }
689
690 update_progress_stats();
691 freeze_if_want_freeze();
692
693
694 if (!defined $success)
695 {
696   if (@jobstep_todo &&
697       $thisround_succeeded == 0 &&
698       ($thisround_failed == 0 || $thisround_failed > 4))
699   {
700     my $message = "stop because $thisround_failed tasks failed and none succeeded";
701     Log (undef, $message);
702     $success = 0;
703   }
704   if (!@jobstep_todo)
705   {
706     $success = 1;
707   }
708 }
709
710 goto ONELEVEL if !defined $success;
711
712
713 release_allocation();
714 freeze();
715 $Job->{'output'} = &collate_output();
716 $Job->{'success'} = $Job->{'output'} && $success;
717 $Job->save;
718
719 if ($Job->{'output'})
720 {
721   eval {
722     my $manifest_text = capturex("whget", $Job->{'output'});
723     $arv->{'collections'}->{'create'}->execute('collection' => {
724       'uuid' => $Job->{'output'},
725       'manifest_text' => $manifest_text,
726     });
727   };
728   if ($@) {
729     Log (undef, "Failed to register output manifest: $@");
730   }
731 }
732
733 Log (undef, "finish");
734
735 save_meta();
736 exit 0;
737
738
739
740 sub update_progress_stats
741 {
742   $progress_stats_updated = time;
743   return if !$progress_is_dirty;
744   my ($todo, $done, $running) = (scalar @jobstep_todo,
745                                  scalar @jobstep_done,
746                                  scalar @slot - scalar @freeslot - scalar @holdslot);
747   $Job->{'tasks_summary'} ||= {};
748   $Job->{'tasks_summary'}->{'todo'} = $todo;
749   $Job->{'tasks_summary'}->{'done'} = $done;
750   $Job->{'tasks_summary'}->{'running'} = $running;
751   $Job->save;
752   Log (undef, "status: $done done, $running running, $todo todo");
753   $progress_is_dirty = 0;
754 }
755
756
757
758 sub reapchildren
759 {
760   my $pid = waitpid (-1, WNOHANG);
761   return 0 if $pid <= 0;
762
763   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
764                   . "."
765                   . $slot[$proc{$pid}->{slot}]->{cpu});
766   my $jobstepid = $proc{$pid}->{jobstep};
767   my $elapsed = time - $proc{$pid}->{time};
768   my $Jobstep = $jobstep[$jobstepid];
769
770   my $exitcode = $?;
771   my $exitinfo = "exit $exitcode";
772   $Jobstep->{'arvados_task'}->reload;
773   my $success = $Jobstep->{'arvados_task'}->{success};
774
775   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
776
777   if (!defined $success) {
778     # task did not indicate one way or the other --> fail
779     $Jobstep->{'arvados_task'}->{success} = 0;
780     $Jobstep->{'arvados_task'}->save;
781     $success = 0;
782   }
783
784   if (!$success)
785   {
786     my $no_incr_attempts;
787     $no_incr_attempts = 1 if $Jobstep->{node_fail};
788
789     ++$thisround_failed;
790     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
791
792     # Check for signs of a failed or misconfigured node
793     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
794         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
795       # Don't count this against jobstep failure thresholds if this
796       # node is already suspected faulty and srun exited quickly
797       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
798           $elapsed < 5 &&
799           $Jobstep->{attempts} > 1) {
800         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
801         $no_incr_attempts = 1;
802         --$Jobstep->{attempts};
803       }
804       ban_node_by_slot($proc{$pid}->{slot});
805     }
806
807     push @jobstep_todo, $jobstepid;
808     Log ($jobstepid, "failure in $elapsed seconds");
809
810     --$Jobstep->{attempts} if $no_incr_attempts;
811     $Job->{'tasks_summary'}->{'failed'}++;
812   }
813   else
814   {
815     ++$thisround_succeeded;
816     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
817     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
818     push @jobstep_done, $jobstepid;
819     Log ($jobstepid, "success in $elapsed seconds");
820   }
821   $Jobstep->{exitcode} = $exitcode;
822   $Jobstep->{finishtime} = time;
823   process_stderr ($jobstepid, $success);
824   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
825
826   close $reader{$jobstepid};
827   delete $reader{$jobstepid};
828   delete $slot[$proc{$pid}->{slot}]->{pid};
829   push @freeslot, $proc{$pid}->{slot};
830   delete $proc{$pid};
831
832   # Load new tasks
833   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
834     'where' => {
835       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
836     },
837     'order' => 'qsequence'
838   );
839   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
840     my $jobstep = {
841       'level' => $arvados_task->{'sequence'},
842       'attempts' => 0,
843       'arvados_task' => $arvados_task
844     };
845     push @jobstep, $jobstep;
846     push @jobstep_todo, $#jobstep;
847   }
848
849   $progress_is_dirty = 1;
850   1;
851 }
852
853
854 sub check_squeue
855 {
856   # return if the kill list was checked <4 seconds ago
857   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
858   {
859     return;
860   }
861   $squeue_kill_checked = time;
862
863   # use killem() on procs whose killtime is reached
864   for (keys %proc)
865   {
866     if (exists $proc{$_}->{killtime}
867         && $proc{$_}->{killtime} <= time)
868     {
869       killem ($_);
870     }
871   }
872
873   # return if the squeue was checked <60 seconds ago
874   if (defined $squeue_checked && $squeue_checked > time - 60)
875   {
876     return;
877   }
878   $squeue_checked = time;
879
880   if (!$have_slurm)
881   {
882     # here is an opportunity to check for mysterious problems with local procs
883     return;
884   }
885
886   # get a list of steps still running
887   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
888   chop @squeue;
889   if ($squeue[-1] ne "ok")
890   {
891     return;
892   }
893   pop @squeue;
894
895   # which of my jobsteps are running, according to squeue?
896   my %ok;
897   foreach (@squeue)
898   {
899     if (/^(\d+)\.(\d+) (\S+)/)
900     {
901       if ($1 eq $ENV{SLURM_JOBID})
902       {
903         $ok{$3} = 1;
904       }
905     }
906   }
907
908   # which of my active child procs (>60s old) were not mentioned by squeue?
909   foreach (keys %proc)
910   {
911     if ($proc{$_}->{time} < time - 60
912         && !exists $ok{$proc{$_}->{jobstepname}}
913         && !exists $proc{$_}->{killtime})
914     {
915       # kill this proc if it hasn't exited in 30 seconds
916       $proc{$_}->{killtime} = time + 30;
917     }
918   }
919 }
920
921
922 sub release_allocation
923 {
924   if ($have_slurm)
925   {
926     Log (undef, "release job allocation");
927     system "scancel $ENV{SLURM_JOBID}";
928   }
929 }
930
931
932 sub readfrompipes
933 {
934   my $gotsome = 0;
935   foreach my $job (keys %reader)
936   {
937     my $buf;
938     while (0 < sysread ($reader{$job}, $buf, 8192))
939     {
940       print STDERR $buf if $ENV{CRUNCH_DEBUG};
941       $jobstep[$job]->{stderr} .= $buf;
942       preprocess_stderr ($job);
943       if (length ($jobstep[$job]->{stderr}) > 16384)
944       {
945         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
946       }
947       $gotsome = 1;
948     }
949   }
950   return $gotsome;
951 }
952
953
954 sub preprocess_stderr
955 {
956   my $job = shift;
957
958   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
959     my $line = $1;
960     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
961     Log ($job, "stderr $line");
962     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
963       # whoa.
964       $main::please_freeze = 1;
965     }
966     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
967       $jobstep[$job]->{node_fail} = 1;
968       ban_node_by_slot($jobstep[$job]->{slotindex});
969     }
970   }
971 }
972
973
974 sub process_stderr
975 {
976   my $job = shift;
977   my $success = shift;
978   preprocess_stderr ($job);
979
980   map {
981     Log ($job, "stderr $_");
982   } split ("\n", $jobstep[$job]->{stderr});
983 }
984
985
986 sub collate_output
987 {
988   my $whc = Warehouse->new;
989   Log (undef, "collate");
990   $whc->write_start (1);
991   my $joboutput;
992   for (@jobstep)
993   {
994     next if (!exists $_->{'arvados_task'}->{output} ||
995              !$_->{'arvados_task'}->{'success'} ||
996              $_->{'exitcode'} != 0);
997     my $output = $_->{'arvados_task'}->{output};
998     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
999     {
1000       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1001       $whc->write_data ($output);
1002     }
1003     elsif (@jobstep == 1)
1004     {
1005       $joboutput = $output;
1006       $whc->write_finish;
1007     }
1008     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1009     {
1010       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1011       $whc->write_data ($outblock);
1012     }
1013     else
1014     {
1015       my $errstr = $whc->errstr;
1016       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1017       $success = 0;
1018     }
1019   }
1020   $joboutput = $whc->write_finish if !defined $joboutput;
1021   if ($joboutput)
1022   {
1023     Log (undef, "output $joboutput");
1024     $Job->{'output'} = $joboutput;
1025     $Job->save;
1026   }
1027   else
1028   {
1029     Log (undef, "output undef");
1030   }
1031   return $joboutput;
1032 }
1033
1034
1035 sub killem
1036 {
1037   foreach (@_)
1038   {
1039     my $sig = 2;                # SIGINT first
1040     if (exists $proc{$_}->{"sent_$sig"} &&
1041         time - $proc{$_}->{"sent_$sig"} > 4)
1042     {
1043       $sig = 15;                # SIGTERM if SIGINT doesn't work
1044     }
1045     if (exists $proc{$_}->{"sent_$sig"} &&
1046         time - $proc{$_}->{"sent_$sig"} > 4)
1047     {
1048       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1049     }
1050     if (!exists $proc{$_}->{"sent_$sig"})
1051     {
1052       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1053       kill $sig, $_;
1054       select (undef, undef, undef, 0.1);
1055       if ($sig == 2)
1056       {
1057         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1058       }
1059       $proc{$_}->{"sent_$sig"} = time;
1060       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1061     }
1062   }
1063 }
1064
1065
1066 sub fhbits
1067 {
1068   my($bits);
1069   for (@_) {
1070     vec($bits,fileno($_),1) = 1;
1071   }
1072   $bits;
1073 }
1074
1075
1076 sub Log                         # ($jobstep_id, $logmessage)
1077 {
1078   if ($_[1] =~ /\n/) {
1079     for my $line (split (/\n/, $_[1])) {
1080       Log ($_[0], $line);
1081     }
1082     return;
1083   }
1084   my $fh = select STDERR; $|=1; select $fh;
1085   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1086   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1087   $message .= "\n";
1088   my $datetime;
1089   if ($metastream || -t STDERR) {
1090     my @gmtime = gmtime;
1091     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1092                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1093   }
1094   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1095
1096   return if !$metastream;
1097   $metastream->write_data ($datetime . " " . $message);
1098 }
1099
1100
1101 sub croak
1102 {
1103   my ($package, $file, $line) = caller;
1104   my $message = "@_ at $file line $line\n";
1105   Log (undef, $message);
1106   freeze() if @jobstep_todo;
1107   collate_output() if @jobstep_todo;
1108   cleanup();
1109   save_meta() if $metastream;
1110   die;
1111 }
1112
1113
1114 sub cleanup
1115 {
1116   return if !$job_has_uuid;
1117   $Job->reload;
1118   $Job->{'running'} = 0;
1119   $Job->{'success'} = 0;
1120   $Job->{'finished_at'} = gmtime;
1121   $Job->save;
1122 }
1123
1124
1125 sub save_meta
1126 {
1127   my $justcheckpoint = shift; # false if this will be the last meta saved
1128   my $m = $metastream;
1129   $m = $m->copy if $justcheckpoint;
1130   $m->write_finish;
1131   my $loglocator = $m->as_key;
1132   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1133   Log (undef, "meta key is $loglocator");
1134   $Job->{'log'} = $loglocator;
1135   $Job->save;
1136 }
1137
1138
1139 sub freeze_if_want_freeze
1140 {
1141   if ($main::please_freeze)
1142   {
1143     release_allocation();
1144     if (@_)
1145     {
1146       # kill some srun procs before freeze+stop
1147       map { $proc{$_} = {} } @_;
1148       while (%proc)
1149       {
1150         killem (keys %proc);
1151         select (undef, undef, undef, 0.1);
1152         my $died;
1153         while (($died = waitpid (-1, WNOHANG)) > 0)
1154         {
1155           delete $proc{$died};
1156         }
1157       }
1158     }
1159     freeze();
1160     collate_output();
1161     cleanup();
1162     save_meta();
1163     exit 0;
1164   }
1165 }
1166
1167
1168 sub freeze
1169 {
1170   Log (undef, "Freeze not implemented");
1171   return;
1172 }
1173
1174
1175 sub thaw
1176 {
1177   croak ("Thaw not implemented");
1178
1179   my $whc;
1180   my $key = shift;
1181   Log (undef, "thaw from $key");
1182
1183   @jobstep = ();
1184   @jobstep_done = ();
1185   @jobstep_todo = ();
1186   @jobstep_tomerge = ();
1187   $jobstep_tomerge_level = 0;
1188   my $frozenjob = {};
1189
1190   my $stream = new Warehouse::Stream ( whc => $whc,
1191                                        hash => [split (",", $key)] );
1192   $stream->rewind;
1193   while (my $dataref = $stream->read_until (undef, "\n\n"))
1194   {
1195     if ($$dataref =~ /^job /)
1196     {
1197       foreach (split ("\n", $$dataref))
1198       {
1199         my ($k, $v) = split ("=", $_, 2);
1200         $frozenjob->{$k} = freezeunquote ($v);
1201       }
1202       next;
1203     }
1204
1205     if ($$dataref =~ /^merge (\d+) (.*)/)
1206     {
1207       $jobstep_tomerge_level = $1;
1208       @jobstep_tomerge
1209           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1210       next;
1211     }
1212
1213     my $Jobstep = { };
1214     foreach (split ("\n", $$dataref))
1215     {
1216       my ($k, $v) = split ("=", $_, 2);
1217       $Jobstep->{$k} = freezeunquote ($v) if $k;
1218     }
1219     $Jobstep->{attempts} = 0;
1220     push @jobstep, $Jobstep;
1221
1222     if ($Jobstep->{exitcode} eq "0")
1223     {
1224       push @jobstep_done, $#jobstep;
1225     }
1226     else
1227     {
1228       push @jobstep_todo, $#jobstep;
1229     }
1230   }
1231
1232   foreach (qw (script script_version script_parameters))
1233   {
1234     $Job->{$_} = $frozenjob->{$_};
1235   }
1236   $Job->save;
1237 }
1238
1239
1240 sub freezequote
1241 {
1242   my $s = shift;
1243   $s =~ s/\\/\\\\/g;
1244   $s =~ s/\n/\\n/g;
1245   return $s;
1246 }
1247
1248
1249 sub freezeunquote
1250 {
1251   my $s = shift;
1252   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1253   return $s;
1254 }
1255
1256
1257 sub srun
1258 {
1259   my $srunargs = shift;
1260   my $execargs = shift;
1261   my $opts = shift || {};
1262   my $stdin = shift;
1263   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1264   print STDERR (join (" ",
1265                       map { / / ? "'$_'" : $_ }
1266                       (@$args)),
1267                 "\n")
1268       if $ENV{CRUNCH_DEBUG};
1269
1270   if (defined $stdin) {
1271     my $child = open STDIN, "-|";
1272     defined $child or die "no fork: $!";
1273     if ($child == 0) {
1274       print $stdin or die $!;
1275       close STDOUT or die $!;
1276       exit 0;
1277     }
1278   }
1279
1280   return system (@$args) if $opts->{fork};
1281
1282   exec @$args;
1283   warn "ENV size is ".length(join(" ",%ENV));
1284   die "exec failed: $!: @$args";
1285 }
1286
1287
1288 sub ban_node_by_slot {
1289   # Don't start any new jobsteps on this node for 60 seconds
1290   my $slotid = shift;
1291   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1292   $slot[$slotid]->{node}->{hold_count}++;
1293   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1294 }
1295
1296 __DATA__
1297 #!/usr/bin/perl
1298
1299 # checkout-and-build
1300
1301 use Fcntl ':flock';
1302
1303 my $destdir = $ENV{"CRUNCH_SRC"};
1304 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1305 my $repo = $ENV{"CRUNCH_SRC_URL"};
1306
1307 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1308 flock L, LOCK_EX;
1309 if (readlink ("$destdir.commit") eq $commit) {
1310     exit 0;
1311 }
1312
1313 unlink "$destdir.commit";
1314 open STDOUT, ">", "$destdir.log";
1315 open STDERR, ">&STDOUT";
1316
1317 mkdir $destdir;
1318 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1319 print TARX <DATA>;
1320 if(!close(TARX)) {
1321   die "'tar -C $destdir -xf -' exited $?: $!";
1322 }
1323
1324 my $pwd;
1325 chomp ($pwd = `pwd`);
1326 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1327 mkdir $install_dir;
1328 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1329     # Old version
1330     shell_or_die ("./tests/autotests.sh", $install_dir);
1331 } elsif (-e "./install.sh") {
1332     shell_or_die ("./install.sh", $install_dir);
1333 }
1334
1335 if ($commit) {
1336     unlink "$destdir.commit.new";
1337     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1338     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1339 }
1340
1341 close L;
1342
1343 exit 0;
1344
1345 sub shell_or_die
1346 {
1347   if ($ENV{"DEBUG"}) {
1348     print STDERR "@_\n";
1349   }
1350   system (@_) == 0
1351       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1352 }
1353
1354 __DATA__