30bc668280905d0bbab89e011f40c37a0096b376
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100
101
102 $SIG{'HUP'} = sub
103 {
104   1;
105 };
106 $SIG{'USR1'} = sub
107 {
108   $main::ENV{CRUNCH_DEBUG} = 1;
109 };
110 $SIG{'USR2'} = sub
111 {
112   $main::ENV{CRUNCH_DEBUG} = 0;
113 };
114
115
116
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
119 $metastream->clear;
120 $metastream->write_start('log.txt');
121
122 my $User = {};
123 my $Job = {};
124 my $job_id;
125 my $dbh;
126 my $sth;
127 if ($job_has_uuid)
128 {
129   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130   $User = $arv->{'users'}->{'current'}->execute;
131   if (!$force_unlock) {
132     if ($Job->{'is_locked_by'}) {
133       croak("Job is locked: " . $Job->{'is_locked_by'});
134     }
135     if ($Job->{'success'} ne undef) {
136       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
137     }
138     if ($Job->{'running'}) {
139       croak("Job 'running' flag is already set");
140     }
141     if ($Job->{'started_at'}) {
142       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
143     }
144   }
145 }
146 else
147 {
148   $Job = JSON::decode_json($jobspec);
149
150   if (!$resume_stash)
151   {
152     map { croak ("No $_ specified") unless $Job->{$_} }
153     qw(script script_version script_parameters);
154   }
155
156   if (!defined $Job->{'uuid'}) {
157     my $hostname = `hostname -s`;
158     chomp $hostname;
159     $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
160   }
161 }
162 $job_id = $Job->{'uuid'};
163
164
165
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
169
170
171 Log (undef, "check slurm allocation");
172 my @slot;
173 my @node;
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
175 my @sinfo;
176 if (!$have_slurm)
177 {
178   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179   push @sinfo, "$localcpus localhost";
180 }
181 if (exists $ENV{SLURM_NODELIST})
182 {
183   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
184 }
185 foreach (@sinfo)
186 {
187   my ($ncpus, $slurm_nodelist) = split;
188   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
189
190   my @nodelist;
191   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
192   {
193     my $nodelist = $1;
194     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
195     {
196       my $ranges = $1;
197       foreach (split (",", $ranges))
198       {
199         my ($a, $b);
200         if (/(\d+)-(\d+)/)
201         {
202           $a = $1;
203           $b = $2;
204         }
205         else
206         {
207           $a = $_;
208           $b = $_;
209         }
210         push @nodelist, map {
211           my $n = $nodelist;
212           $n =~ s/\[[-,\d]+\]/$_/;
213           $n;
214         } ($a..$b);
215       }
216     }
217     else
218     {
219       push @nodelist, $nodelist;
220     }
221   }
222   foreach my $nodename (@nodelist)
223   {
224     Log (undef, "node $nodename - $ncpus slots");
225     my $node = { name => $nodename,
226                  ncpus => $ncpus,
227                  losing_streak => 0,
228                  hold_until => 0 };
229     foreach my $cpu (1..$ncpus)
230     {
231       push @slot, { node => $node,
232                     cpu => $cpu };
233     }
234   }
235   push @node, @nodelist;
236 }
237
238
239
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
242
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
244
245
246
247 my $jobmanager_id;
248 if ($job_has_uuid)
249 {
250   # Claim this job, and make sure nobody else does
251
252   $Job->{'is_locked_by'} = $User->{'uuid'};
253   $Job->{'started_at'} = gmtime;
254   $Job->{'running'} = 1;
255   $Job->{'success'} = undef;
256   $Job->{'tasks_summary'} = { 'failed' => 0,
257                               'todo' => 1,
258                               'running' => 0,
259                               'done' => 0 };
260   if ($job_has_uuid) {
261     unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
262       croak("Error while updating / locking job");
263     }
264   }
265 }
266
267
268 Log (undef, "start");
269 $SIG{'INT'} = sub { $main::please_freeze = 1; };
270 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
271 $SIG{'TERM'} = \&croak;
272 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
273 $SIG{'ALRM'} = sub { $main::please_info = 1; };
274 $SIG{'CONT'} = sub { $main::please_continue = 1; };
275 $main::please_freeze = 0;
276 $main::please_info = 0;
277 $main::please_continue = 0;
278 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
279
280 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
281 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
282 $ENV{"JOB_UUID"} = $job_id;
283
284
285 my @jobstep;
286 my @jobstep_todo = ();
287 my @jobstep_done = ();
288 my @jobstep_tomerge = ();
289 my $jobstep_tomerge_level = 0;
290 my $squeue_checked;
291 my $squeue_kill_checked;
292 my $output_in_keep = 0;
293
294
295
296 if (defined $Job->{thawedfromkey})
297 {
298   thaw ($Job->{thawedfromkey});
299 }
300 else
301 {
302   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
303     'job_uuid' => $Job->{'uuid'},
304     'sequence' => 0,
305     'qsequence' => 0,
306     'parameters' => {},
307                                                           });
308   push @jobstep, { 'level' => 0,
309                    'attempts' => 0,
310                    'arvados_task' => $first_task,
311                  };
312   push @jobstep_todo, 0;
313 }
314
315
316 my $build_script;
317 do {
318   local $/ = undef;
319   $build_script = <DATA>;
320 };
321
322
323 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
324
325 my $skip_install = (!$job_has_uuid && $Job->{script_version} =~ m{^/});
326 if ($skip_install)
327 {
328   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
329 }
330 else
331 {
332   Log (undef, "Install revision ".$Job->{script_version});
333   my $nodelist = join(",", @node);
334
335   # Clean out crunch_tmp/work and crunch_tmp/opt
336
337   my $cleanpid = fork();
338   if ($cleanpid == 0)
339   {
340     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
341           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
342     exit (1);
343   }
344   while (1)
345   {
346     last if $cleanpid == waitpid (-1, WNOHANG);
347     freeze_if_want_freeze ($cleanpid);
348     select (undef, undef, undef, 0.1);
349   }
350   Log (undef, "Clean-work-dir exited $?");
351
352   # Install requested code version
353
354   my @execargs;
355   my @srunargs = ("srun",
356                   "--nodelist=$nodelist",
357                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
358
359   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
361   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
362
363   my $commit;
364   my $git_archive;
365   my $treeish = $Job->{'script_version'};
366   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
367   # Todo: let script_version specify repository instead of expecting
368   # parent process to figure it out.
369   $ENV{"CRUNCH_SRC_URL"} = $repo;
370
371   # Create/update our clone of the remote git repo
372
373   if (!-d $ENV{"CRUNCH_SRC"}) {
374     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
375         or croak ("git clone $repo failed: exit ".($?>>8));
376     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
377   }
378   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
379
380   # If this looks like a subversion r#, look for it in git-svn commit messages
381
382   if ($treeish =~ m{^\d{1,4}$}) {
383     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
384     chomp $gitlog;
385     if ($gitlog =~ /^[a-f0-9]{40}$/) {
386       $commit = $gitlog;
387       Log (undef, "Using commit $commit for script_version $treeish");
388     }
389   }
390
391   # If that didn't work, try asking git to look it up as a tree-ish.
392
393   if (!defined $commit) {
394
395     my $cooked_treeish = $treeish;
396     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
397       # Looks like a git branch name -- make sure git knows it's
398       # relative to the remote repo
399       $cooked_treeish = "origin/$treeish";
400     }
401
402     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
403     chomp $found;
404     if ($found =~ /^[0-9a-f]{40}$/s) {
405       $commit = $found;
406       if ($commit ne $treeish) {
407         # Make sure we record the real commit id in the database,
408         # frozentokey, logs, etc. -- instead of an abbreviation or a
409         # branch name which can become ambiguous or point to a
410         # different commit in the future.
411         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
412         Log (undef, "Using commit $commit for tree-ish $treeish");
413         if ($commit ne $treeish) {
414           $Job->{'script_version'} = $commit;
415           !$job_has_uuid or $Job->save() or croak("Error while updating job");
416         }
417       }
418     }
419   }
420
421   if (defined $commit) {
422     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
423     @execargs = ("sh", "-c",
424                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
425     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
426   }
427   else {
428     croak ("could not figure out commit id for $treeish");
429   }
430
431   my $installpid = fork();
432   if ($installpid == 0)
433   {
434     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
435     exit (1);
436   }
437   while (1)
438   {
439     last if $installpid == waitpid (-1, WNOHANG);
440     freeze_if_want_freeze ($installpid);
441     select (undef, undef, undef, 0.1);
442   }
443   Log (undef, "Install exited $?");
444 }
445
446
447
448 foreach (qw (script script_version script_parameters resource_limits))
449 {
450   Log (undef,
451        "$_ " .
452        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
453 }
454 foreach (split (/\n/, $Job->{knobs}))
455 {
456   Log (undef, "knob " . $_);
457 }
458
459
460
461 my $success;
462
463
464
465 ONELEVEL:
466
467 my $thisround_succeeded = 0;
468 my $thisround_failed = 0;
469 my $thisround_failed_multiple = 0;
470
471 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
472                        or $a <=> $b } @jobstep_todo;
473 my $level = $jobstep[$jobstep_todo[0]]->{level};
474 Log (undef, "start level $level");
475
476
477
478 my %proc;
479 my @freeslot = (0..$#slot);
480 my @holdslot;
481 my %reader;
482 my $progress_is_dirty = 1;
483 my $progress_stats_updated = 0;
484
485 update_progress_stats();
486
487
488
489 THISROUND:
490 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
491 {
492   $main::please_continue = 0;
493
494   my $id = $jobstep_todo[$todo_ptr];
495   my $Jobstep = $jobstep[$id];
496   if ($Jobstep->{level} != $level)
497   {
498     next;
499   }
500   if ($Jobstep->{attempts} > 9)
501   {
502     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
503     $success = 0;
504     last THISROUND;
505   }
506
507   pipe $reader{$id}, "writer" or croak ($!);
508   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
510
511   my $childslot = $freeslot[0];
512   my $childnode = $slot[$childslot]->{node};
513   my $childslotname = join (".",
514                             $slot[$childslot]->{node}->{name},
515                             $slot[$childslot]->{cpu});
516   my $childpid = fork();
517   if ($childpid == 0)
518   {
519     $SIG{'INT'} = 'DEFAULT';
520     $SIG{'QUIT'} = 'DEFAULT';
521     $SIG{'TERM'} = 'DEFAULT';
522
523     foreach (values (%reader))
524     {
525       close($_);
526     }
527     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528     open(STDOUT,">&writer");
529     open(STDERR,">&writer");
530
531     undef $dbh;
532     undef $sth;
533
534     delete $ENV{"GNUPGHOME"};
535     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536     $ENV{"TASK_QSEQUENCE"} = $id;
537     $ENV{"TASK_SEQUENCE"} = $level;
538     $ENV{"JOB_SCRIPT"} = $Job->{script};
539     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540       $param =~ tr/a-z/A-Z/;
541       $ENV{"JOB_PARAMETER_$param"} = $value;
542     }
543     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
546     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
547
548     $ENV{"GZIP"} = "-n";
549
550     my @srunargs = (
551       "srun",
552       "--nodelist=".$childnode->{name},
553       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
554       "--job-name=$job_id.$id.$$",
555         );
556     my @execargs = qw(sh);
557     my $build_script_to_send = "";
558     my $command =
559         "mkdir -p $ENV{CRUNCH_TMP}/revision "
560         ."&& cd $ENV{CRUNCH_TMP} ";
561     if ($build_script)
562     {
563       $build_script_to_send = $build_script;
564       $command .=
565           "&& perl -";
566     }
567     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
568     $command .=
569         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
570     my @execargs = ('bash', '-c', $command);
571     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
572     exit (1);
573   }
574   close("writer");
575   if (!defined $childpid)
576   {
577     close $reader{$id};
578     delete $reader{$id};
579     next;
580   }
581   shift @freeslot;
582   $proc{$childpid} = { jobstep => $id,
583                        time => time,
584                        slot => $childslot,
585                        jobstepname => "$job_id.$id.$childpid",
586                      };
587   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
588   $slot[$childslot]->{pid} = $childpid;
589
590   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
591   Log ($id, "child $childpid started on $childslotname");
592   $Jobstep->{attempts} ++;
593   $Jobstep->{starttime} = time;
594   $Jobstep->{node} = $childnode->{name};
595   $Jobstep->{slotindex} = $childslot;
596   delete $Jobstep->{stderr};
597   delete $Jobstep->{finishtime};
598
599   splice @jobstep_todo, $todo_ptr, 1;
600   --$todo_ptr;
601
602   $progress_is_dirty = 1;
603
604   while (!@freeslot
605          ||
606          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
607   {
608     last THISROUND if $main::please_freeze;
609     if ($main::please_info)
610     {
611       $main::please_info = 0;
612       freeze();
613       collate_output();
614       save_meta(1);
615       update_progress_stats();
616     }
617     my $gotsome
618         = readfrompipes ()
619         + reapchildren ();
620     if (!$gotsome)
621     {
622       check_squeue();
623       update_progress_stats();
624       select (undef, undef, undef, 0.1);
625     }
626     elsif (time - $progress_stats_updated >= 30)
627     {
628       update_progress_stats();
629     }
630     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
631         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
632     {
633       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
634           .($thisround_failed+$thisround_succeeded)
635           .") -- giving up on this round";
636       Log (undef, $message);
637       last THISROUND;
638     }
639
640     # move slots from freeslot to holdslot (or back to freeslot) if necessary
641     for (my $i=$#freeslot; $i>=0; $i--) {
642       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
643         push @holdslot, (splice @freeslot, $i, 1);
644       }
645     }
646     for (my $i=$#holdslot; $i>=0; $i--) {
647       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
648         push @freeslot, (splice @holdslot, $i, 1);
649       }
650     }
651
652     # give up if no nodes are succeeding
653     if (!grep { $_->{node}->{losing_streak} == 0 &&
654                     $_->{node}->{hold_count} < 4 } @slot) {
655       my $message = "Every node has failed -- giving up on this round";
656       Log (undef, $message);
657       last THISROUND;
658     }
659   }
660 }
661
662
663 push @freeslot, splice @holdslot;
664 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
665
666
667 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
668 while (%proc)
669 {
670   goto THISROUND if $main::please_continue;
671   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
672   readfrompipes ();
673   if (!reapchildren())
674   {
675     check_squeue();
676     update_progress_stats();
677     select (undef, undef, undef, 0.1);
678     killem (keys %proc) if $main::please_freeze;
679   }
680 }
681
682 update_progress_stats();
683 freeze_if_want_freeze();
684
685
686 if (!defined $success)
687 {
688   if (@jobstep_todo &&
689       $thisround_succeeded == 0 &&
690       ($thisround_failed == 0 || $thisround_failed > 4))
691   {
692     my $message = "stop because $thisround_failed tasks failed and none succeeded";
693     Log (undef, $message);
694     $success = 0;
695   }
696   if (!@jobstep_todo)
697   {
698     $success = 1;
699   }
700 }
701
702 goto ONELEVEL if !defined $success;
703
704
705 release_allocation();
706 freeze();
707 $Job->{'output'} = &collate_output();
708 $Job->{'success'} = $Job->{'output'} && $success;
709 $Job->save if $job_has_uuid;
710
711 if ($Job->{'output'})
712 {
713   eval {
714     my $manifest_text = capturex("whget", $Job->{'output'});
715     $arv->{'collections'}->{'create'}->execute('collection' => {
716       'uuid' => $Job->{'output'},
717       'manifest_text' => $manifest_text,
718     });
719   };
720   if ($@) {
721     Log (undef, "Failed to register output manifest: $@");
722   }
723 }
724
725 Log (undef, "finish");
726
727 save_meta();
728 exit 0;
729
730
731
732 sub update_progress_stats
733 {
734   $progress_stats_updated = time;
735   return if !$progress_is_dirty;
736   my ($todo, $done, $running) = (scalar @jobstep_todo,
737                                  scalar @jobstep_done,
738                                  scalar @slot - scalar @freeslot - scalar @holdslot);
739   $Job->{'tasks_summary'} ||= {};
740   $Job->{'tasks_summary'}->{'todo'} = $todo;
741   $Job->{'tasks_summary'}->{'done'} = $done;
742   $Job->{'tasks_summary'}->{'running'} = $running;
743   $Job->save if $job_has_uuid;
744   Log (undef, "status: $done done, $running running, $todo todo");
745   $progress_is_dirty = 0;
746 }
747
748
749
750 sub reapchildren
751 {
752   my $pid = waitpid (-1, WNOHANG);
753   return 0 if $pid <= 0;
754
755   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
756                   . "."
757                   . $slot[$proc{$pid}->{slot}]->{cpu});
758   my $jobstepid = $proc{$pid}->{jobstep};
759   my $elapsed = time - $proc{$pid}->{time};
760   my $Jobstep = $jobstep[$jobstepid];
761
762   my $exitcode = $?;
763   my $exitinfo = "exit $exitcode";
764   $Jobstep->{'arvados_task'}->reload;
765   my $success = $Jobstep->{'arvados_task'}->{success};
766
767   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
768
769   if (!defined $success) {
770     # task did not indicate one way or the other --> fail
771     $Jobstep->{'arvados_task'}->{success} = 0;
772     $Jobstep->{'arvados_task'}->save;
773     $success = 0;
774   }
775
776   if (!$success)
777   {
778     my $no_incr_attempts;
779     $no_incr_attempts = 1 if $Jobstep->{node_fail};
780
781     ++$thisround_failed;
782     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
783
784     # Check for signs of a failed or misconfigured node
785     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
786         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
787       # Don't count this against jobstep failure thresholds if this
788       # node is already suspected faulty and srun exited quickly
789       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
790           $elapsed < 5 &&
791           $Jobstep->{attempts} > 1) {
792         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
793         $no_incr_attempts = 1;
794         --$Jobstep->{attempts};
795       }
796       ban_node_by_slot($proc{$pid}->{slot});
797     }
798
799     push @jobstep_todo, $jobstepid;
800     Log ($jobstepid, "failure in $elapsed seconds");
801
802     --$Jobstep->{attempts} if $no_incr_attempts;
803     $Job->{'tasks_summary'}->{'failed'}++;
804   }
805   else
806   {
807     ++$thisround_succeeded;
808     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
809     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
810     push @jobstep_done, $jobstepid;
811     Log ($jobstepid, "success in $elapsed seconds");
812   }
813   $Jobstep->{exitcode} = $exitcode;
814   $Jobstep->{finishtime} = time;
815   process_stderr ($jobstepid, $success);
816   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
817
818   close $reader{$jobstepid};
819   delete $reader{$jobstepid};
820   delete $slot[$proc{$pid}->{slot}]->{pid};
821   push @freeslot, $proc{$pid}->{slot};
822   delete $proc{$pid};
823
824   # Load new tasks
825   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
826     'where' => {
827       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
828     },
829     'order' => 'qsequence'
830   );
831   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
832     my $jobstep = {
833       'level' => $arvados_task->{'sequence'},
834       'attempts' => 0,
835       'arvados_task' => $arvados_task
836     };
837     push @jobstep, $jobstep;
838     push @jobstep_todo, $#jobstep;
839   }
840
841   $progress_is_dirty = 1;
842   1;
843 }
844
845
846 sub check_squeue
847 {
848   # return if the kill list was checked <4 seconds ago
849   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
850   {
851     return;
852   }
853   $squeue_kill_checked = time;
854
855   # use killem() on procs whose killtime is reached
856   for (keys %proc)
857   {
858     if (exists $proc{$_}->{killtime}
859         && $proc{$_}->{killtime} <= time)
860     {
861       killem ($_);
862     }
863   }
864
865   # return if the squeue was checked <60 seconds ago
866   if (defined $squeue_checked && $squeue_checked > time - 60)
867   {
868     return;
869   }
870   $squeue_checked = time;
871
872   if (!$have_slurm)
873   {
874     # here is an opportunity to check for mysterious problems with local procs
875     return;
876   }
877
878   # get a list of steps still running
879   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
880   chop @squeue;
881   if ($squeue[-1] ne "ok")
882   {
883     return;
884   }
885   pop @squeue;
886
887   # which of my jobsteps are running, according to squeue?
888   my %ok;
889   foreach (@squeue)
890   {
891     if (/^(\d+)\.(\d+) (\S+)/)
892     {
893       if ($1 eq $ENV{SLURM_JOBID})
894       {
895         $ok{$3} = 1;
896       }
897     }
898   }
899
900   # which of my active child procs (>60s old) were not mentioned by squeue?
901   foreach (keys %proc)
902   {
903     if ($proc{$_}->{time} < time - 60
904         && !exists $ok{$proc{$_}->{jobstepname}}
905         && !exists $proc{$_}->{killtime})
906     {
907       # kill this proc if it hasn't exited in 30 seconds
908       $proc{$_}->{killtime} = time + 30;
909     }
910   }
911 }
912
913
914 sub release_allocation
915 {
916   if ($have_slurm)
917   {
918     Log (undef, "release job allocation");
919     system "scancel $ENV{SLURM_JOBID}";
920   }
921 }
922
923
924 sub readfrompipes
925 {
926   my $gotsome = 0;
927   foreach my $job (keys %reader)
928   {
929     my $buf;
930     while (0 < sysread ($reader{$job}, $buf, 8192))
931     {
932       print STDERR $buf if $ENV{CRUNCH_DEBUG};
933       $jobstep[$job]->{stderr} .= $buf;
934       preprocess_stderr ($job);
935       if (length ($jobstep[$job]->{stderr}) > 16384)
936       {
937         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
938       }
939       $gotsome = 1;
940     }
941   }
942   return $gotsome;
943 }
944
945
946 sub preprocess_stderr
947 {
948   my $job = shift;
949
950   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
951     my $line = $1;
952     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
953     Log ($job, "stderr $line");
954     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
955       # whoa.
956       $main::please_freeze = 1;
957     }
958     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
959       $jobstep[$job]->{node_fail} = 1;
960       ban_node_by_slot($jobstep[$job]->{slotindex});
961     }
962   }
963 }
964
965
966 sub process_stderr
967 {
968   my $job = shift;
969   my $success = shift;
970   preprocess_stderr ($job);
971
972   map {
973     Log ($job, "stderr $_");
974   } split ("\n", $jobstep[$job]->{stderr});
975 }
976
977
978 sub collate_output
979 {
980   my $whc = Warehouse->new;
981   Log (undef, "collate");
982   $whc->write_start (1);
983   my $joboutput;
984   for (@jobstep)
985   {
986     next if (!exists $_->{'arvados_task'}->{output} ||
987              !$_->{'arvados_task'}->{'success'} ||
988              $_->{'exitcode'} != 0);
989     my $output = $_->{'arvados_task'}->{output};
990     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
991     {
992       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
993       $whc->write_data ($output);
994     }
995     elsif (@jobstep == 1)
996     {
997       $joboutput = $output;
998       $whc->write_finish;
999     }
1000     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1001     {
1002       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1003       $whc->write_data ($outblock);
1004     }
1005     else
1006     {
1007       my $errstr = $whc->errstr;
1008       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1009       $success = 0;
1010     }
1011   }
1012   $joboutput = $whc->write_finish if !defined $joboutput;
1013   if ($joboutput)
1014   {
1015     Log (undef, "output $joboutput");
1016     $Job->{'output'} = $joboutput;
1017     $Job->save if $job_has_uuid;
1018   }
1019   else
1020   {
1021     Log (undef, "output undef");
1022   }
1023   return $joboutput;
1024 }
1025
1026
1027 sub killem
1028 {
1029   foreach (@_)
1030   {
1031     my $sig = 2;                # SIGINT first
1032     if (exists $proc{$_}->{"sent_$sig"} &&
1033         time - $proc{$_}->{"sent_$sig"} > 4)
1034     {
1035       $sig = 15;                # SIGTERM if SIGINT doesn't work
1036     }
1037     if (exists $proc{$_}->{"sent_$sig"} &&
1038         time - $proc{$_}->{"sent_$sig"} > 4)
1039     {
1040       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1041     }
1042     if (!exists $proc{$_}->{"sent_$sig"})
1043     {
1044       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1045       kill $sig, $_;
1046       select (undef, undef, undef, 0.1);
1047       if ($sig == 2)
1048       {
1049         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1050       }
1051       $proc{$_}->{"sent_$sig"} = time;
1052       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1053     }
1054   }
1055 }
1056
1057
1058 sub fhbits
1059 {
1060   my($bits);
1061   for (@_) {
1062     vec($bits,fileno($_),1) = 1;
1063   }
1064   $bits;
1065 }
1066
1067
1068 sub Log                         # ($jobstep_id, $logmessage)
1069 {
1070   if ($_[1] =~ /\n/) {
1071     for my $line (split (/\n/, $_[1])) {
1072       Log ($_[0], $line);
1073     }
1074     return;
1075   }
1076   my $fh = select STDERR; $|=1; select $fh;
1077   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1078   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1079   $message .= "\n";
1080   my $datetime;
1081   if ($metastream || -t STDERR) {
1082     my @gmtime = gmtime;
1083     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1084                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1085   }
1086   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1087
1088   return if !$metastream;
1089   $metastream->write_data ($datetime . " " . $message);
1090 }
1091
1092
1093 sub croak
1094 {
1095   my ($package, $file, $line) = caller;
1096   my $message = "@_ at $file line $line\n";
1097   Log (undef, $message);
1098   freeze() if @jobstep_todo;
1099   collate_output() if @jobstep_todo;
1100   cleanup();
1101   save_meta() if $metastream;
1102   die;
1103 }
1104
1105
1106 sub cleanup
1107 {
1108   return if !$job_has_uuid;
1109   $Job->reload if $job_has_uuid;
1110   $Job->{'running'} = 0;
1111   $Job->{'success'} = 0;
1112   $Job->{'finished_at'} = gmtime;
1113   $Job->save if $job_has_uuid;
1114 }
1115
1116
1117 sub save_meta
1118 {
1119   my $justcheckpoint = shift; # false if this will be the last meta saved
1120   my $m = $metastream;
1121   $m = $m->copy if $justcheckpoint;
1122   $m->write_finish;
1123   my $loglocator = $m->as_key;
1124   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1125   Log (undef, "meta key is $loglocator");
1126   $Job->{'log'} = $loglocator;
1127   $Job->save if $job_has_uuid;
1128 }
1129
1130
1131 sub freeze_if_want_freeze
1132 {
1133   if ($main::please_freeze)
1134   {
1135     release_allocation();
1136     if (@_)
1137     {
1138       # kill some srun procs before freeze+stop
1139       map { $proc{$_} = {} } @_;
1140       while (%proc)
1141       {
1142         killem (keys %proc);
1143         select (undef, undef, undef, 0.1);
1144         my $died;
1145         while (($died = waitpid (-1, WNOHANG)) > 0)
1146         {
1147           delete $proc{$died};
1148         }
1149       }
1150     }
1151     freeze();
1152     collate_output();
1153     cleanup();
1154     save_meta();
1155     exit 0;
1156   }
1157 }
1158
1159
1160 sub freeze
1161 {
1162   Log (undef, "Freeze not implemented");
1163   return;
1164 }
1165
1166
1167 sub thaw
1168 {
1169   croak ("Thaw not implemented");
1170
1171   my $whc;
1172   my $key = shift;
1173   Log (undef, "thaw from $key");
1174
1175   @jobstep = ();
1176   @jobstep_done = ();
1177   @jobstep_todo = ();
1178   @jobstep_tomerge = ();
1179   $jobstep_tomerge_level = 0;
1180   my $frozenjob = {};
1181
1182   my $stream = new Warehouse::Stream ( whc => $whc,
1183                                        hash => [split (",", $key)] );
1184   $stream->rewind;
1185   while (my $dataref = $stream->read_until (undef, "\n\n"))
1186   {
1187     if ($$dataref =~ /^job /)
1188     {
1189       foreach (split ("\n", $$dataref))
1190       {
1191         my ($k, $v) = split ("=", $_, 2);
1192         $frozenjob->{$k} = freezeunquote ($v);
1193       }
1194       next;
1195     }
1196
1197     if ($$dataref =~ /^merge (\d+) (.*)/)
1198     {
1199       $jobstep_tomerge_level = $1;
1200       @jobstep_tomerge
1201           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1202       next;
1203     }
1204
1205     my $Jobstep = { };
1206     foreach (split ("\n", $$dataref))
1207     {
1208       my ($k, $v) = split ("=", $_, 2);
1209       $Jobstep->{$k} = freezeunquote ($v) if $k;
1210     }
1211     $Jobstep->{attempts} = 0;
1212     push @jobstep, $Jobstep;
1213
1214     if ($Jobstep->{exitcode} eq "0")
1215     {
1216       push @jobstep_done, $#jobstep;
1217     }
1218     else
1219     {
1220       push @jobstep_todo, $#jobstep;
1221     }
1222   }
1223
1224   foreach (qw (script script_version script_parameters))
1225   {
1226     $Job->{$_} = $frozenjob->{$_};
1227   }
1228   $Job->save if $job_has_uuid;
1229 }
1230
1231
1232 sub freezequote
1233 {
1234   my $s = shift;
1235   $s =~ s/\\/\\\\/g;
1236   $s =~ s/\n/\\n/g;
1237   return $s;
1238 }
1239
1240
1241 sub freezeunquote
1242 {
1243   my $s = shift;
1244   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1245   return $s;
1246 }
1247
1248
1249 sub srun
1250 {
1251   my $srunargs = shift;
1252   my $execargs = shift;
1253   my $opts = shift || {};
1254   my $stdin = shift;
1255   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1256   print STDERR (join (" ",
1257                       map { / / ? "'$_'" : $_ }
1258                       (@$args)),
1259                 "\n")
1260       if $ENV{CRUNCH_DEBUG};
1261
1262   if (defined $stdin) {
1263     my $child = open STDIN, "-|";
1264     defined $child or die "no fork: $!";
1265     if ($child == 0) {
1266       print $stdin or die $!;
1267       close STDOUT or die $!;
1268       exit 0;
1269     }
1270   }
1271
1272   return system (@$args) if $opts->{fork};
1273
1274   exec @$args;
1275   warn "ENV size is ".length(join(" ",%ENV));
1276   die "exec failed: $!: @$args";
1277 }
1278
1279
1280 sub ban_node_by_slot {
1281   # Don't start any new jobsteps on this node for 60 seconds
1282   my $slotid = shift;
1283   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1284   $slot[$slotid]->{node}->{hold_count}++;
1285   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1286 }
1287
1288 __DATA__
1289 #!/usr/bin/perl
1290
1291 # checkout-and-build
1292
1293 use Fcntl ':flock';
1294
1295 my $destdir = $ENV{"CRUNCH_SRC"};
1296 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1297 my $repo = $ENV{"CRUNCH_SRC_URL"};
1298
1299 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1300 flock L, LOCK_EX;
1301 if (readlink ("$destdir.commit") eq $commit) {
1302     exit 0;
1303 }
1304
1305 unlink "$destdir.commit";
1306 open STDOUT, ">", "$destdir.log";
1307 open STDERR, ">&STDOUT";
1308
1309 mkdir $destdir;
1310 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1311 print TARX <DATA>;
1312 if(!close(TARX)) {
1313   die "'tar -C $destdir -xf -' exited $?: $!";
1314 }
1315
1316 my $pwd;
1317 chomp ($pwd = `pwd`);
1318 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1319 mkdir $install_dir;
1320 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1321     # Old version
1322     shell_or_die ("./tests/autotests.sh", $install_dir);
1323 } elsif (-e "./install.sh") {
1324     shell_or_die ("./install.sh", $install_dir);
1325 }
1326
1327 if ($commit) {
1328     unlink "$destdir.commit.new";
1329     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1330     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1331 }
1332
1333 close L;
1334
1335 exit 0;
1336
1337 sub shell_or_die
1338 {
1339   if ($ENV{"DEBUG"}) {
1340     print STDERR "@_\n";
1341   }
1342   system (@_) == 0
1343       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1344 }
1345
1346 __DATA__