fix attempts to save dev job status in arvados db
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100
101
102 $SIG{'HUP'} = sub
103 {
104   1;
105 };
106 $SIG{'USR1'} = sub
107 {
108   $main::ENV{CRUNCH_DEBUG} = 1;
109 };
110 $SIG{'USR2'} = sub
111 {
112   $main::ENV{CRUNCH_DEBUG} = 0;
113 };
114
115
116
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
119 $metastream->clear;
120 $metastream->write_start('log.txt');
121
122 my $User = {};
123 my $Job = {};
124 my $job_id;
125 my $dbh;
126 my $sth;
127 if ($job_has_uuid)
128 {
129   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130   $User = $arv->{'users'}->{'current'}->execute;
131   if (!$force_unlock) {
132     if ($Job->{'is_locked_by'}) {
133       croak("Job is locked: " . $Job->{'is_locked_by'});
134     }
135     if ($Job->{'success'} ne undef) {
136       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
137     }
138     if ($Job->{'running'}) {
139       croak("Job 'running' flag is already set");
140     }
141     if ($Job->{'started_at'}) {
142       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
143     }
144   }
145 }
146 else
147 {
148   $Job = JSON::decode_json($jobspec);
149
150   if (!$resume_stash)
151   {
152     map { croak ("No $_ specified") unless $Job->{$_} }
153     qw(script script_version script_parameters);
154   }
155
156   if (!defined $Job->{'uuid'}) {
157     my $hostname = `hostname -s`;
158     chomp $hostname;
159     $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
160   }
161 }
162 $job_id = $Job->{'uuid'};
163
164
165
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
169
170
171 Log (undef, "check slurm allocation");
172 my @slot;
173 my @node;
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
175 my @sinfo;
176 if (!$have_slurm)
177 {
178   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179   push @sinfo, "$localcpus localhost";
180 }
181 if (exists $ENV{SLURM_NODELIST})
182 {
183   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
184 }
185 foreach (@sinfo)
186 {
187   my ($ncpus, $slurm_nodelist) = split;
188   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
189
190   my @nodelist;
191   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
192   {
193     my $nodelist = $1;
194     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
195     {
196       my $ranges = $1;
197       foreach (split (",", $ranges))
198       {
199         my ($a, $b);
200         if (/(\d+)-(\d+)/)
201         {
202           $a = $1;
203           $b = $2;
204         }
205         else
206         {
207           $a = $_;
208           $b = $_;
209         }
210         push @nodelist, map {
211           my $n = $nodelist;
212           $n =~ s/\[[-,\d]+\]/$_/;
213           $n;
214         } ($a..$b);
215       }
216     }
217     else
218     {
219       push @nodelist, $nodelist;
220     }
221   }
222   foreach my $nodename (@nodelist)
223   {
224     Log (undef, "node $nodename - $ncpus slots");
225     my $node = { name => $nodename,
226                  ncpus => $ncpus,
227                  losing_streak => 0,
228                  hold_until => 0 };
229     foreach my $cpu (1..$ncpus)
230     {
231       push @slot, { node => $node,
232                     cpu => $cpu };
233     }
234   }
235   push @node, @nodelist;
236 }
237
238
239
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
242
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
244
245
246
247 my $jobmanager_id;
248 if ($job_has_uuid)
249 {
250   # Claim this job, and make sure nobody else does
251
252   $Job->{'is_locked_by'} = $User->{'uuid'};
253   $Job->{'started_at'} = gmtime;
254   $Job->{'running'} = 1;
255   $Job->{'success'} = undef;
256   $Job->{'tasks_summary'} = { 'failed' => 0,
257                               'todo' => 1,
258                               'running' => 0,
259                               'done' => 0 };
260   if ($job_has_uuid) {
261     unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
262       croak("Error while updating / locking job");
263     }
264   }
265 }
266
267
268 Log (undef, "start");
269 $SIG{'INT'} = sub { $main::please_freeze = 1; };
270 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
271 $SIG{'TERM'} = \&croak;
272 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
273 $SIG{'ALRM'} = sub { $main::please_info = 1; };
274 $SIG{'CONT'} = sub { $main::please_continue = 1; };
275 $main::please_freeze = 0;
276 $main::please_info = 0;
277 $main::please_continue = 0;
278 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
279
280 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
281 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
282 $ENV{"JOB_UUID"} = $job_id;
283
284
285 my @jobstep;
286 my @jobstep_todo = ();
287 my @jobstep_done = ();
288 my @jobstep_tomerge = ();
289 my $jobstep_tomerge_level = 0;
290 my $squeue_checked;
291 my $squeue_kill_checked;
292 my $output_in_keep = 0;
293
294
295
296 if (defined $Job->{thawedfromkey})
297 {
298   thaw ($Job->{thawedfromkey});
299 }
300 else
301 {
302   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
303     'job_uuid' => $Job->{'uuid'},
304     'sequence' => 0,
305     'qsequence' => 0,
306     'parameters' => {},
307                                                           });
308   push @jobstep, { 'level' => 0,
309                    'attempts' => 0,
310                    'arvados_task' => $first_task,
311                  };
312   push @jobstep_todo, 0;
313 }
314
315
316 my $build_script;
317 do {
318   local $/ = undef;
319   $build_script = <DATA>;
320 };
321
322
323 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
324
325 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
326 if ($skip_install)
327 {
328   $ENV{"CRUNCH_SRC"} = $Job->{revision};
329 }
330 else
331 {
332   Log (undef, "Install revision ".$Job->{revision});
333   my $nodelist = join(",", @node);
334
335   # Clean out crunch_tmp/work and crunch_tmp/opt
336
337   my $cleanpid = fork();
338   if ($cleanpid == 0)
339   {
340     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
341           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
342     exit (1);
343   }
344   while (1)
345   {
346     last if $cleanpid == waitpid (-1, WNOHANG);
347     freeze_if_want_freeze ($cleanpid);
348     select (undef, undef, undef, 0.1);
349   }
350   Log (undef, "Clean-work-dir exited $?");
351
352   # Install requested code version
353
354   my @execargs;
355   my @srunargs = ("srun",
356                   "--nodelist=$nodelist",
357                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
358
359   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
360   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
361   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
362
363   my $commit;
364   my $git_archive;
365   my $treeish = $Job->{'script_version'};
366   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
367   # Todo: let script_version specify repository instead of expecting
368   # parent process to figure it out.
369   $ENV{"CRUNCH_SRC_URL"} = $repo;
370
371   # Create/update our clone of the remote git repo
372
373   if (!-d $ENV{"CRUNCH_SRC"}) {
374     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
375         or croak ("git clone $repo failed: exit ".($?>>8));
376     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
377   }
378   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
379
380   # If this looks like a subversion r#, look for it in git-svn commit messages
381
382   if ($treeish =~ m{^\d{1,4}$}) {
383     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
384     chomp $gitlog;
385     if ($gitlog =~ /^[a-f0-9]{40}$/) {
386       $commit = $gitlog;
387       Log (undef, "Using commit $commit for revision $treeish");
388     }
389   }
390
391   # If that didn't work, try asking git to look it up as a tree-ish.
392
393   if (!defined $commit) {
394
395     my $cooked_treeish = $treeish;
396     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
397       # Looks like a git branch name -- make sure git knows it's
398       # relative to the remote repo
399       $cooked_treeish = "origin/$treeish";
400     }
401
402     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
403     chomp $found;
404     if ($found =~ /^[0-9a-f]{40}$/s) {
405       $commit = $found;
406       if ($commit ne $treeish) {
407         # Make sure we record the real commit id in the database,
408         # frozentokey, logs, etc. -- instead of an abbreviation or a
409         # branch name which can become ambiguous or point to a
410         # different commit in the future.
411         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
412         Log (undef, "Using commit $commit for tree-ish $treeish");
413         if ($commit ne $treeish) {
414           $Job->{'script_version'} = $commit;
415           !$job_has_uuid or $Job->save() or croak("Error while updating job");
416         }
417       }
418     }
419   }
420
421   if (defined $commit) {
422     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
423     @execargs = ("sh", "-c",
424                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
425     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
426   }
427   else {
428     croak ("could not figure out commit id for $treeish");
429   }
430
431   my $installpid = fork();
432   if ($installpid == 0)
433   {
434     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
435     exit (1);
436   }
437   while (1)
438   {
439     last if $installpid == waitpid (-1, WNOHANG);
440     freeze_if_want_freeze ($installpid);
441     select (undef, undef, undef, 0.1);
442   }
443   Log (undef, "Install exited $?");
444 }
445
446
447
448 foreach (qw (script script_version script_parameters resource_limits))
449 {
450   Log (undef,
451        "$_ " .
452        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
453 }
454 foreach (split (/\n/, $Job->{knobs}))
455 {
456   Log (undef, "knob " . $_);
457 }
458
459
460
461 my $success;
462
463
464
465 ONELEVEL:
466
467 my $thisround_succeeded = 0;
468 my $thisround_failed = 0;
469 my $thisround_failed_multiple = 0;
470
471 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
472                        or $a <=> $b } @jobstep_todo;
473 my $level = $jobstep[$jobstep_todo[0]]->{level};
474 Log (undef, "start level $level");
475
476
477
478 my %proc;
479 my @freeslot = (0..$#slot);
480 my @holdslot;
481 my %reader;
482 my $progress_is_dirty = 1;
483 my $progress_stats_updated = 0;
484
485 update_progress_stats();
486
487
488
489 THISROUND:
490 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
491 {
492   $main::please_continue = 0;
493
494   my $id = $jobstep_todo[$todo_ptr];
495   my $Jobstep = $jobstep[$id];
496   if ($Jobstep->{level} != $level)
497   {
498     next;
499   }
500   if ($Jobstep->{attempts} > 9)
501   {
502     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
503     $success = 0;
504     last THISROUND;
505   }
506
507   pipe $reader{$id}, "writer" or croak ($!);
508   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
509   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
510
511   my $childslot = $freeslot[0];
512   my $childnode = $slot[$childslot]->{node};
513   my $childslotname = join (".",
514                             $slot[$childslot]->{node}->{name},
515                             $slot[$childslot]->{cpu});
516   my $childpid = fork();
517   if ($childpid == 0)
518   {
519     $SIG{'INT'} = 'DEFAULT';
520     $SIG{'QUIT'} = 'DEFAULT';
521     $SIG{'TERM'} = 'DEFAULT';
522
523     foreach (values (%reader))
524     {
525       close($_);
526     }
527     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
528     open(STDOUT,">&writer");
529     open(STDERR,">&writer");
530
531     undef $dbh;
532     undef $sth;
533
534     delete $ENV{"GNUPGHOME"};
535     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
536     $ENV{"TASK_QSEQUENCE"} = $id;
537     $ENV{"TASK_SEQUENCE"} = $level;
538     $ENV{"JOB_SCRIPT"} = $Job->{script};
539     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
540       $param =~ tr/a-z/A-Z/;
541       $ENV{"JOB_PARAMETER_$param"} = $value;
542     }
543     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
544     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
545     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
546     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
547
548     $ENV{"GZIP"} = "-n";
549
550     my @srunargs = (
551       "srun",
552       "--nodelist=".$childnode->{name},
553       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
554       "--job-name=$job_id.$id.$$",
555         );
556     my @execargs = qw(sh);
557     my $build_script_to_send = "";
558     my $command =
559         "mkdir -p $ENV{CRUNCH_TMP}/revision "
560         ."&& cd $ENV{CRUNCH_TMP} ";
561     if ($build_script)
562     {
563       $build_script_to_send = $build_script;
564       $command .=
565           "&& perl -";
566     }
567     elsif (!$skip_install)
568     {
569       $command .=
570           "&& "
571           ."( "
572           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
573           ."|| "
574           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
575           ."    && ./installrevision "
576           ."  ) "
577           .") ";
578     }
579     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
580     $command .=
581         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
582     my @execargs = ('bash', '-c', $command);
583     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
584     exit (1);
585   }
586   close("writer");
587   if (!defined $childpid)
588   {
589     close $reader{$id};
590     delete $reader{$id};
591     next;
592   }
593   shift @freeslot;
594   $proc{$childpid} = { jobstep => $id,
595                        time => time,
596                        slot => $childslot,
597                        jobstepname => "$job_id.$id.$childpid",
598                      };
599   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
600   $slot[$childslot]->{pid} = $childpid;
601
602   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
603   Log ($id, "child $childpid started on $childslotname");
604   $Jobstep->{attempts} ++;
605   $Jobstep->{starttime} = time;
606   $Jobstep->{node} = $childnode->{name};
607   $Jobstep->{slotindex} = $childslot;
608   delete $Jobstep->{stderr};
609   delete $Jobstep->{finishtime};
610
611   splice @jobstep_todo, $todo_ptr, 1;
612   --$todo_ptr;
613
614   $progress_is_dirty = 1;
615
616   while (!@freeslot
617          ||
618          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
619   {
620     last THISROUND if $main::please_freeze;
621     if ($main::please_info)
622     {
623       $main::please_info = 0;
624       freeze();
625       collate_output();
626       save_meta(1);
627       update_progress_stats();
628     }
629     my $gotsome
630         = readfrompipes ()
631         + reapchildren ();
632     if (!$gotsome)
633     {
634       check_squeue();
635       update_progress_stats();
636       select (undef, undef, undef, 0.1);
637     }
638     elsif (time - $progress_stats_updated >= 30)
639     {
640       update_progress_stats();
641     }
642     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
643         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
644     {
645       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
646           .($thisround_failed+$thisround_succeeded)
647           .") -- giving up on this round";
648       Log (undef, $message);
649       last THISROUND;
650     }
651
652     # move slots from freeslot to holdslot (or back to freeslot) if necessary
653     for (my $i=$#freeslot; $i>=0; $i--) {
654       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
655         push @holdslot, (splice @freeslot, $i, 1);
656       }
657     }
658     for (my $i=$#holdslot; $i>=0; $i--) {
659       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
660         push @freeslot, (splice @holdslot, $i, 1);
661       }
662     }
663
664     # give up if no nodes are succeeding
665     if (!grep { $_->{node}->{losing_streak} == 0 &&
666                     $_->{node}->{hold_count} < 4 } @slot) {
667       my $message = "Every node has failed -- giving up on this round";
668       Log (undef, $message);
669       last THISROUND;
670     }
671   }
672 }
673
674
675 push @freeslot, splice @holdslot;
676 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677
678
679 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 while (%proc)
681 {
682   goto THISROUND if $main::please_continue;
683   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
684   readfrompipes ();
685   if (!reapchildren())
686   {
687     check_squeue();
688     update_progress_stats();
689     select (undef, undef, undef, 0.1);
690     killem (keys %proc) if $main::please_freeze;
691   }
692 }
693
694 update_progress_stats();
695 freeze_if_want_freeze();
696
697
698 if (!defined $success)
699 {
700   if (@jobstep_todo &&
701       $thisround_succeeded == 0 &&
702       ($thisround_failed == 0 || $thisround_failed > 4))
703   {
704     my $message = "stop because $thisround_failed tasks failed and none succeeded";
705     Log (undef, $message);
706     $success = 0;
707   }
708   if (!@jobstep_todo)
709   {
710     $success = 1;
711   }
712 }
713
714 goto ONELEVEL if !defined $success;
715
716
717 release_allocation();
718 freeze();
719 $Job->{'output'} = &collate_output();
720 $Job->{'success'} = $Job->{'output'} && $success;
721 $Job->save if $job_has_uuid;
722
723 if ($Job->{'output'})
724 {
725   eval {
726     my $manifest_text = capturex("whget", $Job->{'output'});
727     $arv->{'collections'}->{'create'}->execute('collection' => {
728       'uuid' => $Job->{'output'},
729       'manifest_text' => $manifest_text,
730     });
731   };
732   if ($@) {
733     Log (undef, "Failed to register output manifest: $@");
734   }
735 }
736
737 Log (undef, "finish");
738
739 save_meta();
740 exit 0;
741
742
743
744 sub update_progress_stats
745 {
746   $progress_stats_updated = time;
747   return if !$progress_is_dirty;
748   my ($todo, $done, $running) = (scalar @jobstep_todo,
749                                  scalar @jobstep_done,
750                                  scalar @slot - scalar @freeslot - scalar @holdslot);
751   $Job->{'tasks_summary'} ||= {};
752   $Job->{'tasks_summary'}->{'todo'} = $todo;
753   $Job->{'tasks_summary'}->{'done'} = $done;
754   $Job->{'tasks_summary'}->{'running'} = $running;
755   $Job->save if $job_has_uuid;
756   Log (undef, "status: $done done, $running running, $todo todo");
757   $progress_is_dirty = 0;
758 }
759
760
761
762 sub reapchildren
763 {
764   my $pid = waitpid (-1, WNOHANG);
765   return 0 if $pid <= 0;
766
767   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
768                   . "."
769                   . $slot[$proc{$pid}->{slot}]->{cpu});
770   my $jobstepid = $proc{$pid}->{jobstep};
771   my $elapsed = time - $proc{$pid}->{time};
772   my $Jobstep = $jobstep[$jobstepid];
773
774   my $exitcode = $?;
775   my $exitinfo = "exit $exitcode";
776   $Jobstep->{'arvados_task'}->reload;
777   my $success = $Jobstep->{'arvados_task'}->{success};
778
779   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
780
781   if (!defined $success) {
782     # task did not indicate one way or the other --> fail
783     $Jobstep->{'arvados_task'}->{success} = 0;
784     $Jobstep->{'arvados_task'}->save;
785     $success = 0;
786   }
787
788   if (!$success)
789   {
790     my $no_incr_attempts;
791     $no_incr_attempts = 1 if $Jobstep->{node_fail};
792
793     ++$thisround_failed;
794     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
795
796     # Check for signs of a failed or misconfigured node
797     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
798         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
799       # Don't count this against jobstep failure thresholds if this
800       # node is already suspected faulty and srun exited quickly
801       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
802           $elapsed < 5 &&
803           $Jobstep->{attempts} > 1) {
804         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
805         $no_incr_attempts = 1;
806         --$Jobstep->{attempts};
807       }
808       ban_node_by_slot($proc{$pid}->{slot});
809     }
810
811     push @jobstep_todo, $jobstepid;
812     Log ($jobstepid, "failure in $elapsed seconds");
813
814     --$Jobstep->{attempts} if $no_incr_attempts;
815     $Job->{'tasks_summary'}->{'failed'}++;
816   }
817   else
818   {
819     ++$thisround_succeeded;
820     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
821     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
822     push @jobstep_done, $jobstepid;
823     Log ($jobstepid, "success in $elapsed seconds");
824   }
825   $Jobstep->{exitcode} = $exitcode;
826   $Jobstep->{finishtime} = time;
827   process_stderr ($jobstepid, $success);
828   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
829
830   close $reader{$jobstepid};
831   delete $reader{$jobstepid};
832   delete $slot[$proc{$pid}->{slot}]->{pid};
833   push @freeslot, $proc{$pid}->{slot};
834   delete $proc{$pid};
835
836   # Load new tasks
837   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
838     'where' => {
839       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
840     },
841     'order' => 'qsequence'
842   );
843   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
844     my $jobstep = {
845       'level' => $arvados_task->{'sequence'},
846       'attempts' => 0,
847       'arvados_task' => $arvados_task
848     };
849     push @jobstep, $jobstep;
850     push @jobstep_todo, $#jobstep;
851   }
852
853   $progress_is_dirty = 1;
854   1;
855 }
856
857
858 sub check_squeue
859 {
860   # return if the kill list was checked <4 seconds ago
861   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
862   {
863     return;
864   }
865   $squeue_kill_checked = time;
866
867   # use killem() on procs whose killtime is reached
868   for (keys %proc)
869   {
870     if (exists $proc{$_}->{killtime}
871         && $proc{$_}->{killtime} <= time)
872     {
873       killem ($_);
874     }
875   }
876
877   # return if the squeue was checked <60 seconds ago
878   if (defined $squeue_checked && $squeue_checked > time - 60)
879   {
880     return;
881   }
882   $squeue_checked = time;
883
884   if (!$have_slurm)
885   {
886     # here is an opportunity to check for mysterious problems with local procs
887     return;
888   }
889
890   # get a list of steps still running
891   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
892   chop @squeue;
893   if ($squeue[-1] ne "ok")
894   {
895     return;
896   }
897   pop @squeue;
898
899   # which of my jobsteps are running, according to squeue?
900   my %ok;
901   foreach (@squeue)
902   {
903     if (/^(\d+)\.(\d+) (\S+)/)
904     {
905       if ($1 eq $ENV{SLURM_JOBID})
906       {
907         $ok{$3} = 1;
908       }
909     }
910   }
911
912   # which of my active child procs (>60s old) were not mentioned by squeue?
913   foreach (keys %proc)
914   {
915     if ($proc{$_}->{time} < time - 60
916         && !exists $ok{$proc{$_}->{jobstepname}}
917         && !exists $proc{$_}->{killtime})
918     {
919       # kill this proc if it hasn't exited in 30 seconds
920       $proc{$_}->{killtime} = time + 30;
921     }
922   }
923 }
924
925
926 sub release_allocation
927 {
928   if ($have_slurm)
929   {
930     Log (undef, "release job allocation");
931     system "scancel $ENV{SLURM_JOBID}";
932   }
933 }
934
935
936 sub readfrompipes
937 {
938   my $gotsome = 0;
939   foreach my $job (keys %reader)
940   {
941     my $buf;
942     while (0 < sysread ($reader{$job}, $buf, 8192))
943     {
944       print STDERR $buf if $ENV{CRUNCH_DEBUG};
945       $jobstep[$job]->{stderr} .= $buf;
946       preprocess_stderr ($job);
947       if (length ($jobstep[$job]->{stderr}) > 16384)
948       {
949         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
950       }
951       $gotsome = 1;
952     }
953   }
954   return $gotsome;
955 }
956
957
958 sub preprocess_stderr
959 {
960   my $job = shift;
961
962   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
963     my $line = $1;
964     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
965     Log ($job, "stderr $line");
966     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
967       # whoa.
968       $main::please_freeze = 1;
969     }
970     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
971       $jobstep[$job]->{node_fail} = 1;
972       ban_node_by_slot($jobstep[$job]->{slotindex});
973     }
974   }
975 }
976
977
978 sub process_stderr
979 {
980   my $job = shift;
981   my $success = shift;
982   preprocess_stderr ($job);
983
984   map {
985     Log ($job, "stderr $_");
986   } split ("\n", $jobstep[$job]->{stderr});
987 }
988
989
990 sub collate_output
991 {
992   my $whc = Warehouse->new;
993   Log (undef, "collate");
994   $whc->write_start (1);
995   my $joboutput;
996   for (@jobstep)
997   {
998     next if (!exists $_->{'arvados_task'}->{output} ||
999              !$_->{'arvados_task'}->{'success'} ||
1000              $_->{'exitcode'} != 0);
1001     my $output = $_->{'arvados_task'}->{output};
1002     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1003     {
1004       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1005       $whc->write_data ($output);
1006     }
1007     elsif (@jobstep == 1)
1008     {
1009       $joboutput = $output;
1010       $whc->write_finish;
1011     }
1012     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1013     {
1014       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1015       $whc->write_data ($outblock);
1016     }
1017     else
1018     {
1019       my $errstr = $whc->errstr;
1020       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1021       $success = 0;
1022     }
1023   }
1024   $joboutput = $whc->write_finish if !defined $joboutput;
1025   if ($joboutput)
1026   {
1027     Log (undef, "output $joboutput");
1028     $Job->{'output'} = $joboutput;
1029     $Job->save if $job_has_uuid;
1030   }
1031   else
1032   {
1033     Log (undef, "output undef");
1034   }
1035   return $joboutput;
1036 }
1037
1038
1039 sub killem
1040 {
1041   foreach (@_)
1042   {
1043     my $sig = 2;                # SIGINT first
1044     if (exists $proc{$_}->{"sent_$sig"} &&
1045         time - $proc{$_}->{"sent_$sig"} > 4)
1046     {
1047       $sig = 15;                # SIGTERM if SIGINT doesn't work
1048     }
1049     if (exists $proc{$_}->{"sent_$sig"} &&
1050         time - $proc{$_}->{"sent_$sig"} > 4)
1051     {
1052       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1053     }
1054     if (!exists $proc{$_}->{"sent_$sig"})
1055     {
1056       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1057       kill $sig, $_;
1058       select (undef, undef, undef, 0.1);
1059       if ($sig == 2)
1060       {
1061         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1062       }
1063       $proc{$_}->{"sent_$sig"} = time;
1064       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1065     }
1066   }
1067 }
1068
1069
1070 sub fhbits
1071 {
1072   my($bits);
1073   for (@_) {
1074     vec($bits,fileno($_),1) = 1;
1075   }
1076   $bits;
1077 }
1078
1079
1080 sub Log                         # ($jobstep_id, $logmessage)
1081 {
1082   if ($_[1] =~ /\n/) {
1083     for my $line (split (/\n/, $_[1])) {
1084       Log ($_[0], $line);
1085     }
1086     return;
1087   }
1088   my $fh = select STDERR; $|=1; select $fh;
1089   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1090   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1091   $message .= "\n";
1092   my $datetime;
1093   if ($metastream || -t STDERR) {
1094     my @gmtime = gmtime;
1095     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1096                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1097   }
1098   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1099
1100   return if !$metastream;
1101   $metastream->write_data ($datetime . " " . $message);
1102 }
1103
1104
1105 sub croak
1106 {
1107   my ($package, $file, $line) = caller;
1108   my $message = "@_ at $file line $line\n";
1109   Log (undef, $message);
1110   freeze() if @jobstep_todo;
1111   collate_output() if @jobstep_todo;
1112   cleanup();
1113   save_meta() if $metastream;
1114   die;
1115 }
1116
1117
1118 sub cleanup
1119 {
1120   return if !$job_has_uuid;
1121   $Job->reload if $job_has_uuid;
1122   $Job->{'running'} = 0;
1123   $Job->{'success'} = 0;
1124   $Job->{'finished_at'} = gmtime;
1125   $Job->save if $job_has_uuid;
1126 }
1127
1128
1129 sub save_meta
1130 {
1131   my $justcheckpoint = shift; # false if this will be the last meta saved
1132   my $m = $metastream;
1133   $m = $m->copy if $justcheckpoint;
1134   $m->write_finish;
1135   my $loglocator = $m->as_key;
1136   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1137   Log (undef, "meta key is $loglocator");
1138   $Job->{'log'} = $loglocator;
1139   $Job->save if $job_has_uuid;
1140 }
1141
1142
1143 sub freeze_if_want_freeze
1144 {
1145   if ($main::please_freeze)
1146   {
1147     release_allocation();
1148     if (@_)
1149     {
1150       # kill some srun procs before freeze+stop
1151       map { $proc{$_} = {} } @_;
1152       while (%proc)
1153       {
1154         killem (keys %proc);
1155         select (undef, undef, undef, 0.1);
1156         my $died;
1157         while (($died = waitpid (-1, WNOHANG)) > 0)
1158         {
1159           delete $proc{$died};
1160         }
1161       }
1162     }
1163     freeze();
1164     collate_output();
1165     cleanup();
1166     save_meta();
1167     exit 0;
1168   }
1169 }
1170
1171
1172 sub freeze
1173 {
1174   Log (undef, "Freeze not implemented");
1175   return;
1176 }
1177
1178
1179 sub thaw
1180 {
1181   croak ("Thaw not implemented");
1182
1183   my $whc;
1184   my $key = shift;
1185   Log (undef, "thaw from $key");
1186
1187   @jobstep = ();
1188   @jobstep_done = ();
1189   @jobstep_todo = ();
1190   @jobstep_tomerge = ();
1191   $jobstep_tomerge_level = 0;
1192   my $frozenjob = {};
1193
1194   my $stream = new Warehouse::Stream ( whc => $whc,
1195                                        hash => [split (",", $key)] );
1196   $stream->rewind;
1197   while (my $dataref = $stream->read_until (undef, "\n\n"))
1198   {
1199     if ($$dataref =~ /^job /)
1200     {
1201       foreach (split ("\n", $$dataref))
1202       {
1203         my ($k, $v) = split ("=", $_, 2);
1204         $frozenjob->{$k} = freezeunquote ($v);
1205       }
1206       next;
1207     }
1208
1209     if ($$dataref =~ /^merge (\d+) (.*)/)
1210     {
1211       $jobstep_tomerge_level = $1;
1212       @jobstep_tomerge
1213           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1214       next;
1215     }
1216
1217     my $Jobstep = { };
1218     foreach (split ("\n", $$dataref))
1219     {
1220       my ($k, $v) = split ("=", $_, 2);
1221       $Jobstep->{$k} = freezeunquote ($v) if $k;
1222     }
1223     $Jobstep->{attempts} = 0;
1224     push @jobstep, $Jobstep;
1225
1226     if ($Jobstep->{exitcode} eq "0")
1227     {
1228       push @jobstep_done, $#jobstep;
1229     }
1230     else
1231     {
1232       push @jobstep_todo, $#jobstep;
1233     }
1234   }
1235
1236   foreach (qw (script script_version script_parameters))
1237   {
1238     $Job->{$_} = $frozenjob->{$_};
1239   }
1240   $Job->save if $job_has_uuid;
1241 }
1242
1243
1244 sub freezequote
1245 {
1246   my $s = shift;
1247   $s =~ s/\\/\\\\/g;
1248   $s =~ s/\n/\\n/g;
1249   return $s;
1250 }
1251
1252
1253 sub freezeunquote
1254 {
1255   my $s = shift;
1256   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1257   return $s;
1258 }
1259
1260
1261 sub srun
1262 {
1263   my $srunargs = shift;
1264   my $execargs = shift;
1265   my $opts = shift || {};
1266   my $stdin = shift;
1267   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1268   print STDERR (join (" ",
1269                       map { / / ? "'$_'" : $_ }
1270                       (@$args)),
1271                 "\n")
1272       if $ENV{CRUNCH_DEBUG};
1273
1274   if (defined $stdin) {
1275     my $child = open STDIN, "-|";
1276     defined $child or die "no fork: $!";
1277     if ($child == 0) {
1278       print $stdin or die $!;
1279       close STDOUT or die $!;
1280       exit 0;
1281     }
1282   }
1283
1284   return system (@$args) if $opts->{fork};
1285
1286   exec @$args;
1287   warn "ENV size is ".length(join(" ",%ENV));
1288   die "exec failed: $!: @$args";
1289 }
1290
1291
1292 sub ban_node_by_slot {
1293   # Don't start any new jobsteps on this node for 60 seconds
1294   my $slotid = shift;
1295   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1296   $slot[$slotid]->{node}->{hold_count}++;
1297   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1298 }
1299
1300 __DATA__
1301 #!/usr/bin/perl
1302
1303 # checkout-and-build
1304
1305 use Fcntl ':flock';
1306
1307 my $destdir = $ENV{"CRUNCH_SRC"};
1308 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1309 my $repo = $ENV{"CRUNCH_SRC_URL"};
1310
1311 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1312 flock L, LOCK_EX;
1313 if (readlink ("$destdir.commit") eq $commit) {
1314     exit 0;
1315 }
1316
1317 unlink "$destdir.commit";
1318 open STDOUT, ">", "$destdir.log";
1319 open STDERR, ">&STDOUT";
1320
1321 mkdir $destdir;
1322 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1323 print TARX <DATA>;
1324 if(!close(TARX)) {
1325   die "'tar -C $destdir -xf -' exited $?: $!";
1326 }
1327
1328 my $pwd;
1329 chomp ($pwd = `pwd`);
1330 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1331 mkdir $install_dir;
1332 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1333     # Old version
1334     shell_or_die ("./tests/autotests.sh", $install_dir);
1335 } elsif (-e "./install.sh") {
1336     shell_or_die ("./install.sh", $install_dir);
1337 }
1338
1339 if ($commit) {
1340     unlink "$destdir.commit.new";
1341     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1342     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1343 }
1344
1345 close L;
1346
1347 exit 0;
1348
1349 sub shell_or_die
1350 {
1351   if ($ENV{"DEBUG"}) {
1352     print STDERR "@_\n";
1353   }
1354   system (@_) == 0
1355       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1356 }
1357
1358 __DATA__