update running and finished_at attributes at completion
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100 my $local_job = !$job_has_uuid;
101
102
103 $SIG{'HUP'} = sub
104 {
105   1;
106 };
107 $SIG{'USR1'} = sub
108 {
109   $main::ENV{CRUNCH_DEBUG} = 1;
110 };
111 $SIG{'USR2'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 0;
114 };
115
116
117
118 my $arv = Arvados->new;
119 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->clear;
121 $metastream->write_start('log.txt');
122
123 my $User = $arv->{'users'}->{'current'}->execute;
124
125 my $Job = {};
126 my $job_id;
127 my $dbh;
128 my $sth;
129 if ($job_has_uuid)
130 {
131   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
132   if (!$force_unlock) {
133     if ($Job->{'is_locked_by'}) {
134       croak("Job is locked: " . $Job->{'is_locked_by'});
135     }
136     if ($Job->{'success'} ne undef) {
137       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138     }
139     if ($Job->{'running'}) {
140       croak("Job 'running' flag is already set");
141     }
142     if ($Job->{'started_at'}) {
143       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
144     }
145   }
146 }
147 else
148 {
149   $Job = JSON::decode_json($jobspec);
150
151   if (!$resume_stash)
152   {
153     map { croak ("No $_ specified") unless $Job->{$_} }
154     qw(script script_version script_parameters);
155   }
156
157   $Job->{'is_locked_by'} = $User->{'uuid'};
158   $Job->{'started_at'} = gmtime;
159
160   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
161
162   $job_has_uuid = 1;
163 }
164 $job_id = $Job->{'uuid'};
165
166
167
168 $Job->{'resource_limits'} ||= {};
169 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
170 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
171
172
173 Log (undef, "check slurm allocation");
174 my @slot;
175 my @node;
176 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
177 my @sinfo;
178 if (!$have_slurm)
179 {
180   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
181   push @sinfo, "$localcpus localhost";
182 }
183 if (exists $ENV{SLURM_NODELIST})
184 {
185   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
186 }
187 foreach (@sinfo)
188 {
189   my ($ncpus, $slurm_nodelist) = split;
190   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
191
192   my @nodelist;
193   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
194   {
195     my $nodelist = $1;
196     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
197     {
198       my $ranges = $1;
199       foreach (split (",", $ranges))
200       {
201         my ($a, $b);
202         if (/(\d+)-(\d+)/)
203         {
204           $a = $1;
205           $b = $2;
206         }
207         else
208         {
209           $a = $_;
210           $b = $_;
211         }
212         push @nodelist, map {
213           my $n = $nodelist;
214           $n =~ s/\[[-,\d]+\]/$_/;
215           $n;
216         } ($a..$b);
217       }
218     }
219     else
220     {
221       push @nodelist, $nodelist;
222     }
223   }
224   foreach my $nodename (@nodelist)
225   {
226     Log (undef, "node $nodename - $ncpus slots");
227     my $node = { name => $nodename,
228                  ncpus => $ncpus,
229                  losing_streak => 0,
230                  hold_until => 0 };
231     foreach my $cpu (1..$ncpus)
232     {
233       push @slot, { node => $node,
234                     cpu => $cpu };
235     }
236   }
237   push @node, @nodelist;
238 }
239
240
241
242 # Ensure that we get one jobstep running on each allocated node before
243 # we start overloading nodes with concurrent steps
244
245 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
246
247
248
249 my $jobmanager_id;
250 if ($job_has_uuid)
251 {
252   # Claim this job, and make sure nobody else does
253
254   $Job->{'is_locked_by'} = $User->{'uuid'};
255   $Job->{'started_at'} = gmtime;
256   $Job->{'running'} = 1;
257   $Job->{'success'} = undef;
258   $Job->{'tasks_summary'} = { 'failed' => 0,
259                               'todo' => 1,
260                               'running' => 0,
261                               'done' => 0 };
262   if ($job_has_uuid) {
263     unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
264       croak("Error while updating / locking job");
265     }
266   }
267 }
268
269
270 Log (undef, "start");
271 $SIG{'INT'} = sub { $main::please_freeze = 1; };
272 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
273 $SIG{'TERM'} = \&croak;
274 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
275 $SIG{'ALRM'} = sub { $main::please_info = 1; };
276 $SIG{'CONT'} = sub { $main::please_continue = 1; };
277 $main::please_freeze = 0;
278 $main::please_info = 0;
279 $main::please_continue = 0;
280 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
281
282 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
283 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
284 $ENV{"JOB_UUID"} = $job_id;
285
286
287 my @jobstep;
288 my @jobstep_todo = ();
289 my @jobstep_done = ();
290 my @jobstep_tomerge = ();
291 my $jobstep_tomerge_level = 0;
292 my $squeue_checked;
293 my $squeue_kill_checked;
294 my $output_in_keep = 0;
295
296
297
298 if (defined $Job->{thawedfromkey})
299 {
300   thaw ($Job->{thawedfromkey});
301 }
302 else
303 {
304   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
305     'job_uuid' => $Job->{'uuid'},
306     'sequence' => 0,
307     'qsequence' => 0,
308     'parameters' => {},
309                                                           });
310   push @jobstep, { 'level' => 0,
311                    'attempts' => 0,
312                    'arvados_task' => $first_task,
313                  };
314   push @jobstep_todo, 0;
315 }
316
317
318 my $build_script;
319
320
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
322
323 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
324 if ($skip_install)
325 {
326   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
327 }
328 else
329 {
330   do {
331     local $/ = undef;
332     $build_script = <DATA>;
333   };
334   Log (undef, "Install revision ".$Job->{script_version});
335   my $nodelist = join(",", @node);
336
337   # Clean out crunch_tmp/work and crunch_tmp/opt
338
339   my $cleanpid = fork();
340   if ($cleanpid == 0)
341   {
342     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
343           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
344     exit (1);
345   }
346   while (1)
347   {
348     last if $cleanpid == waitpid (-1, WNOHANG);
349     freeze_if_want_freeze ($cleanpid);
350     select (undef, undef, undef, 0.1);
351   }
352   Log (undef, "Clean-work-dir exited $?");
353
354   # Install requested code version
355
356   my @execargs;
357   my @srunargs = ("srun",
358                   "--nodelist=$nodelist",
359                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
360
361   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
363   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
364
365   my $commit;
366   my $git_archive;
367   my $treeish = $Job->{'script_version'};
368   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
369   # Todo: let script_version specify repository instead of expecting
370   # parent process to figure it out.
371   $ENV{"CRUNCH_SRC_URL"} = $repo;
372
373   # Create/update our clone of the remote git repo
374
375   if (!-d $ENV{"CRUNCH_SRC"}) {
376     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
377         or croak ("git clone $repo failed: exit ".($?>>8));
378     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
379   }
380   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
381
382   # If this looks like a subversion r#, look for it in git-svn commit messages
383
384   if ($treeish =~ m{^\d{1,4}$}) {
385     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
386     chomp $gitlog;
387     if ($gitlog =~ /^[a-f0-9]{40}$/) {
388       $commit = $gitlog;
389       Log (undef, "Using commit $commit for script_version $treeish");
390     }
391   }
392
393   # If that didn't work, try asking git to look it up as a tree-ish.
394
395   if (!defined $commit) {
396
397     my $cooked_treeish = $treeish;
398     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
399       # Looks like a git branch name -- make sure git knows it's
400       # relative to the remote repo
401       $cooked_treeish = "origin/$treeish";
402     }
403
404     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
405     chomp $found;
406     if ($found =~ /^[0-9a-f]{40}$/s) {
407       $commit = $found;
408       if ($commit ne $treeish) {
409         # Make sure we record the real commit id in the database,
410         # frozentokey, logs, etc. -- instead of an abbreviation or a
411         # branch name which can become ambiguous or point to a
412         # different commit in the future.
413         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
414         Log (undef, "Using commit $commit for tree-ish $treeish");
415         if ($commit ne $treeish) {
416           $Job->{'script_version'} = $commit;
417           !$job_has_uuid or $Job->save() or croak("Error while updating job");
418         }
419       }
420     }
421   }
422
423   if (defined $commit) {
424     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
425     @execargs = ("sh", "-c",
426                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
427     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
428   }
429   else {
430     croak ("could not figure out commit id for $treeish");
431   }
432
433   my $installpid = fork();
434   if ($installpid == 0)
435   {
436     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
437     exit (1);
438   }
439   while (1)
440   {
441     last if $installpid == waitpid (-1, WNOHANG);
442     freeze_if_want_freeze ($installpid);
443     select (undef, undef, undef, 0.1);
444   }
445   Log (undef, "Install exited $?");
446 }
447
448
449
450 foreach (qw (script script_version script_parameters resource_limits))
451 {
452   Log (undef,
453        "$_ " .
454        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
455 }
456 foreach (split (/\n/, $Job->{knobs}))
457 {
458   Log (undef, "knob " . $_);
459 }
460
461
462
463 my $success;
464
465
466
467 ONELEVEL:
468
469 my $thisround_succeeded = 0;
470 my $thisround_failed = 0;
471 my $thisround_failed_multiple = 0;
472
473 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
474                        or $a <=> $b } @jobstep_todo;
475 my $level = $jobstep[$jobstep_todo[0]]->{level};
476 Log (undef, "start level $level");
477
478
479
480 my %proc;
481 my @freeslot = (0..$#slot);
482 my @holdslot;
483 my %reader;
484 my $progress_is_dirty = 1;
485 my $progress_stats_updated = 0;
486
487 update_progress_stats();
488
489
490
491 THISROUND:
492 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
493 {
494   $main::please_continue = 0;
495
496   my $id = $jobstep_todo[$todo_ptr];
497   my $Jobstep = $jobstep[$id];
498   if ($Jobstep->{level} != $level)
499   {
500     next;
501   }
502   if ($Jobstep->{attempts} > 9)
503   {
504     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
505     $success = 0;
506     last THISROUND;
507   }
508
509   pipe $reader{$id}, "writer" or croak ($!);
510   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
511   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
512
513   my $childslot = $freeslot[0];
514   my $childnode = $slot[$childslot]->{node};
515   my $childslotname = join (".",
516                             $slot[$childslot]->{node}->{name},
517                             $slot[$childslot]->{cpu});
518   my $childpid = fork();
519   if ($childpid == 0)
520   {
521     $SIG{'INT'} = 'DEFAULT';
522     $SIG{'QUIT'} = 'DEFAULT';
523     $SIG{'TERM'} = 'DEFAULT';
524
525     foreach (values (%reader))
526     {
527       close($_);
528     }
529     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
530     open(STDOUT,">&writer");
531     open(STDERR,">&writer");
532
533     undef $dbh;
534     undef $sth;
535
536     delete $ENV{"GNUPGHOME"};
537     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
538     $ENV{"TASK_QSEQUENCE"} = $id;
539     $ENV{"TASK_SEQUENCE"} = $level;
540     $ENV{"JOB_SCRIPT"} = $Job->{script};
541     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
542       $param =~ tr/a-z/A-Z/;
543       $ENV{"JOB_PARAMETER_$param"} = $value;
544     }
545     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
546     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
547     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
548     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
549
550     $ENV{"GZIP"} = "-n";
551
552     my @srunargs = (
553       "srun",
554       "--nodelist=".$childnode->{name},
555       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556       "--job-name=$job_id.$id.$$",
557         );
558     my @execargs = qw(sh);
559     my $build_script_to_send = "";
560     my $command =
561         "mkdir -p $ENV{CRUNCH_WORK} $ENV{CRUNCH_TMP} "
562         ."&& cd $ENV{CRUNCH_TMP} ";
563     if ($build_script)
564     {
565       $build_script_to_send = $build_script;
566       $command .=
567           "&& perl -";
568     }
569     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
570     $command .=
571         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
572     my @execargs = ('bash', '-c', $command);
573     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
574     exit (1);
575   }
576   close("writer");
577   if (!defined $childpid)
578   {
579     close $reader{$id};
580     delete $reader{$id};
581     next;
582   }
583   shift @freeslot;
584   $proc{$childpid} = { jobstep => $id,
585                        time => time,
586                        slot => $childslot,
587                        jobstepname => "$job_id.$id.$childpid",
588                      };
589   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
590   $slot[$childslot]->{pid} = $childpid;
591
592   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
593   Log ($id, "child $childpid started on $childslotname");
594   $Jobstep->{attempts} ++;
595   $Jobstep->{starttime} = time;
596   $Jobstep->{node} = $childnode->{name};
597   $Jobstep->{slotindex} = $childslot;
598   delete $Jobstep->{stderr};
599   delete $Jobstep->{finishtime};
600
601   splice @jobstep_todo, $todo_ptr, 1;
602   --$todo_ptr;
603
604   $progress_is_dirty = 1;
605
606   while (!@freeslot
607          ||
608          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
609   {
610     last THISROUND if $main::please_freeze;
611     if ($main::please_info)
612     {
613       $main::please_info = 0;
614       freeze();
615       collate_output();
616       save_meta(1);
617       update_progress_stats();
618     }
619     my $gotsome
620         = readfrompipes ()
621         + reapchildren ();
622     if (!$gotsome)
623     {
624       check_squeue();
625       update_progress_stats();
626       select (undef, undef, undef, 0.1);
627     }
628     elsif (time - $progress_stats_updated >= 30)
629     {
630       update_progress_stats();
631     }
632     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
633         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
634     {
635       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
636           .($thisround_failed+$thisround_succeeded)
637           .") -- giving up on this round";
638       Log (undef, $message);
639       last THISROUND;
640     }
641
642     # move slots from freeslot to holdslot (or back to freeslot) if necessary
643     for (my $i=$#freeslot; $i>=0; $i--) {
644       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
645         push @holdslot, (splice @freeslot, $i, 1);
646       }
647     }
648     for (my $i=$#holdslot; $i>=0; $i--) {
649       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
650         push @freeslot, (splice @holdslot, $i, 1);
651       }
652     }
653
654     # give up if no nodes are succeeding
655     if (!grep { $_->{node}->{losing_streak} == 0 &&
656                     $_->{node}->{hold_count} < 4 } @slot) {
657       my $message = "Every node has failed -- giving up on this round";
658       Log (undef, $message);
659       last THISROUND;
660     }
661   }
662 }
663
664
665 push @freeslot, splice @holdslot;
666 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
667
668
669 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
670 while (%proc)
671 {
672   goto THISROUND if $main::please_continue;
673   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
674   readfrompipes ();
675   if (!reapchildren())
676   {
677     check_squeue();
678     update_progress_stats();
679     select (undef, undef, undef, 0.1);
680     killem (keys %proc) if $main::please_freeze;
681   }
682 }
683
684 update_progress_stats();
685 freeze_if_want_freeze();
686
687
688 if (!defined $success)
689 {
690   if (@jobstep_todo &&
691       $thisround_succeeded == 0 &&
692       ($thisround_failed == 0 || $thisround_failed > 4))
693   {
694     my $message = "stop because $thisround_failed tasks failed and none succeeded";
695     Log (undef, $message);
696     $success = 0;
697   }
698   if (!@jobstep_todo)
699   {
700     $success = 1;
701   }
702 }
703
704 goto ONELEVEL if !defined $success;
705
706
707 release_allocation();
708 freeze();
709 $Job->reload;
710 $Job->{'output'} = &collate_output();
711 $Job->{'running'} = 0;
712 $Job->{'success'} = $Job->{'output'} && $success;
713 $Job->{'finished_at'} = gmtime;
714 $Job->save if $job_has_uuid;
715
716 if ($Job->{'output'})
717 {
718   eval {
719     my $manifest_text = capturex("whget", $Job->{'output'});
720     $arv->{'collections'}->{'create'}->execute('collection' => {
721       'uuid' => $Job->{'output'},
722       'manifest_text' => $manifest_text,
723     });
724   };
725   if ($@) {
726     Log (undef, "Failed to register output manifest: $@");
727   }
728 }
729
730 Log (undef, "finish");
731
732 save_meta();
733 exit 0;
734
735
736
737 sub update_progress_stats
738 {
739   $progress_stats_updated = time;
740   return if !$progress_is_dirty;
741   my ($todo, $done, $running) = (scalar @jobstep_todo,
742                                  scalar @jobstep_done,
743                                  scalar @slot - scalar @freeslot - scalar @holdslot);
744   $Job->{'tasks_summary'} ||= {};
745   $Job->{'tasks_summary'}->{'todo'} = $todo;
746   $Job->{'tasks_summary'}->{'done'} = $done;
747   $Job->{'tasks_summary'}->{'running'} = $running;
748   $Job->save if $job_has_uuid;
749   Log (undef, "status: $done done, $running running, $todo todo");
750   $progress_is_dirty = 0;
751 }
752
753
754
755 sub reapchildren
756 {
757   my $pid = waitpid (-1, WNOHANG);
758   return 0 if $pid <= 0;
759
760   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
761                   . "."
762                   . $slot[$proc{$pid}->{slot}]->{cpu});
763   my $jobstepid = $proc{$pid}->{jobstep};
764   my $elapsed = time - $proc{$pid}->{time};
765   my $Jobstep = $jobstep[$jobstepid];
766
767   my $exitcode = $?;
768   my $exitinfo = "exit $exitcode";
769   $Jobstep->{'arvados_task'}->reload;
770   my $success = $Jobstep->{'arvados_task'}->{success};
771
772   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
773
774   if (!defined $success) {
775     # task did not indicate one way or the other --> fail
776     $Jobstep->{'arvados_task'}->{success} = 0;
777     $Jobstep->{'arvados_task'}->save;
778     $success = 0;
779   }
780
781   if (!$success)
782   {
783     my $no_incr_attempts;
784     $no_incr_attempts = 1 if $Jobstep->{node_fail};
785
786     ++$thisround_failed;
787     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
788
789     # Check for signs of a failed or misconfigured node
790     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
791         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
792       # Don't count this against jobstep failure thresholds if this
793       # node is already suspected faulty and srun exited quickly
794       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
795           $elapsed < 5 &&
796           $Jobstep->{attempts} > 1) {
797         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
798         $no_incr_attempts = 1;
799         --$Jobstep->{attempts};
800       }
801       ban_node_by_slot($proc{$pid}->{slot});
802     }
803
804     push @jobstep_todo, $jobstepid;
805     Log ($jobstepid, "failure in $elapsed seconds");
806
807     --$Jobstep->{attempts} if $no_incr_attempts;
808     $Job->{'tasks_summary'}->{'failed'}++;
809   }
810   else
811   {
812     ++$thisround_succeeded;
813     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
814     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
815     push @jobstep_done, $jobstepid;
816     Log ($jobstepid, "success in $elapsed seconds");
817   }
818   $Jobstep->{exitcode} = $exitcode;
819   $Jobstep->{finishtime} = time;
820   process_stderr ($jobstepid, $success);
821   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
822
823   close $reader{$jobstepid};
824   delete $reader{$jobstepid};
825   delete $slot[$proc{$pid}->{slot}]->{pid};
826   push @freeslot, $proc{$pid}->{slot};
827   delete $proc{$pid};
828
829   # Load new tasks
830   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
831     'where' => {
832       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
833     },
834     'order' => 'qsequence'
835   );
836   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
837     my $jobstep = {
838       'level' => $arvados_task->{'sequence'},
839       'attempts' => 0,
840       'arvados_task' => $arvados_task
841     };
842     push @jobstep, $jobstep;
843     push @jobstep_todo, $#jobstep;
844   }
845
846   $progress_is_dirty = 1;
847   1;
848 }
849
850
851 sub check_squeue
852 {
853   # return if the kill list was checked <4 seconds ago
854   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
855   {
856     return;
857   }
858   $squeue_kill_checked = time;
859
860   # use killem() on procs whose killtime is reached
861   for (keys %proc)
862   {
863     if (exists $proc{$_}->{killtime}
864         && $proc{$_}->{killtime} <= time)
865     {
866       killem ($_);
867     }
868   }
869
870   # return if the squeue was checked <60 seconds ago
871   if (defined $squeue_checked && $squeue_checked > time - 60)
872   {
873     return;
874   }
875   $squeue_checked = time;
876
877   if (!$have_slurm)
878   {
879     # here is an opportunity to check for mysterious problems with local procs
880     return;
881   }
882
883   # get a list of steps still running
884   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
885   chop @squeue;
886   if ($squeue[-1] ne "ok")
887   {
888     return;
889   }
890   pop @squeue;
891
892   # which of my jobsteps are running, according to squeue?
893   my %ok;
894   foreach (@squeue)
895   {
896     if (/^(\d+)\.(\d+) (\S+)/)
897     {
898       if ($1 eq $ENV{SLURM_JOBID})
899       {
900         $ok{$3} = 1;
901       }
902     }
903   }
904
905   # which of my active child procs (>60s old) were not mentioned by squeue?
906   foreach (keys %proc)
907   {
908     if ($proc{$_}->{time} < time - 60
909         && !exists $ok{$proc{$_}->{jobstepname}}
910         && !exists $proc{$_}->{killtime})
911     {
912       # kill this proc if it hasn't exited in 30 seconds
913       $proc{$_}->{killtime} = time + 30;
914     }
915   }
916 }
917
918
919 sub release_allocation
920 {
921   if ($have_slurm)
922   {
923     Log (undef, "release job allocation");
924     system "scancel $ENV{SLURM_JOBID}";
925   }
926 }
927
928
929 sub readfrompipes
930 {
931   my $gotsome = 0;
932   foreach my $job (keys %reader)
933   {
934     my $buf;
935     while (0 < sysread ($reader{$job}, $buf, 8192))
936     {
937       print STDERR $buf if $ENV{CRUNCH_DEBUG};
938       $jobstep[$job]->{stderr} .= $buf;
939       preprocess_stderr ($job);
940       if (length ($jobstep[$job]->{stderr}) > 16384)
941       {
942         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
943       }
944       $gotsome = 1;
945     }
946   }
947   return $gotsome;
948 }
949
950
951 sub preprocess_stderr
952 {
953   my $job = shift;
954
955   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
956     my $line = $1;
957     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
958     Log ($job, "stderr $line");
959     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
960       # whoa.
961       $main::please_freeze = 1;
962     }
963     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
964       $jobstep[$job]->{node_fail} = 1;
965       ban_node_by_slot($jobstep[$job]->{slotindex});
966     }
967   }
968 }
969
970
971 sub process_stderr
972 {
973   my $job = shift;
974   my $success = shift;
975   preprocess_stderr ($job);
976
977   map {
978     Log ($job, "stderr $_");
979   } split ("\n", $jobstep[$job]->{stderr});
980 }
981
982
983 sub collate_output
984 {
985   my $whc = Warehouse->new;
986   Log (undef, "collate");
987   $whc->write_start (1);
988   my $joboutput;
989   for (@jobstep)
990   {
991     next if (!exists $_->{'arvados_task'}->{output} ||
992              !$_->{'arvados_task'}->{'success'} ||
993              $_->{'exitcode'} != 0);
994     my $output = $_->{'arvados_task'}->{output};
995     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
996     {
997       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
998       $whc->write_data ($output);
999     }
1000     elsif (@jobstep == 1)
1001     {
1002       $joboutput = $output;
1003       $whc->write_finish;
1004     }
1005     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1006     {
1007       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1008       $whc->write_data ($outblock);
1009     }
1010     else
1011     {
1012       my $errstr = $whc->errstr;
1013       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1014       $success = 0;
1015     }
1016   }
1017   $joboutput = $whc->write_finish if !defined $joboutput;
1018   if ($joboutput)
1019   {
1020     Log (undef, "output $joboutput");
1021     $Job->{'output'} = $joboutput;
1022     $Job->save if $job_has_uuid;
1023   }
1024   else
1025   {
1026     Log (undef, "output undef");
1027   }
1028   return $joboutput;
1029 }
1030
1031
1032 sub killem
1033 {
1034   foreach (@_)
1035   {
1036     my $sig = 2;                # SIGINT first
1037     if (exists $proc{$_}->{"sent_$sig"} &&
1038         time - $proc{$_}->{"sent_$sig"} > 4)
1039     {
1040       $sig = 15;                # SIGTERM if SIGINT doesn't work
1041     }
1042     if (exists $proc{$_}->{"sent_$sig"} &&
1043         time - $proc{$_}->{"sent_$sig"} > 4)
1044     {
1045       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1046     }
1047     if (!exists $proc{$_}->{"sent_$sig"})
1048     {
1049       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1050       kill $sig, $_;
1051       select (undef, undef, undef, 0.1);
1052       if ($sig == 2)
1053       {
1054         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1055       }
1056       $proc{$_}->{"sent_$sig"} = time;
1057       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1058     }
1059   }
1060 }
1061
1062
1063 sub fhbits
1064 {
1065   my($bits);
1066   for (@_) {
1067     vec($bits,fileno($_),1) = 1;
1068   }
1069   $bits;
1070 }
1071
1072
1073 sub Log                         # ($jobstep_id, $logmessage)
1074 {
1075   if ($_[1] =~ /\n/) {
1076     for my $line (split (/\n/, $_[1])) {
1077       Log ($_[0], $line);
1078     }
1079     return;
1080   }
1081   my $fh = select STDERR; $|=1; select $fh;
1082   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1083   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1084   $message .= "\n";
1085   my $datetime;
1086   if ($metastream || -t STDERR) {
1087     my @gmtime = gmtime;
1088     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1089                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1090   }
1091   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1092
1093   return if !$metastream;
1094   $metastream->write_data ($datetime . " " . $message);
1095 }
1096
1097
1098 sub croak
1099 {
1100   my ($package, $file, $line) = caller;
1101   my $message = "@_ at $file line $line\n";
1102   Log (undef, $message);
1103   freeze() if @jobstep_todo;
1104   collate_output() if @jobstep_todo;
1105   cleanup();
1106   save_meta() if $metastream;
1107   die;
1108 }
1109
1110
1111 sub cleanup
1112 {
1113   return if !$job_has_uuid;
1114   $Job->reload;
1115   $Job->{'running'} = 0;
1116   $Job->{'success'} = 0;
1117   $Job->{'finished_at'} = gmtime;
1118   $Job->save;
1119 }
1120
1121
1122 sub save_meta
1123 {
1124   my $justcheckpoint = shift; # false if this will be the last meta saved
1125   my $m = $metastream;
1126   $m = $m->copy if $justcheckpoint;
1127   $m->write_finish;
1128   my $loglocator = $m->as_key;
1129   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1130   Log (undef, "meta key is $loglocator");
1131   $Job->{'log'} = $loglocator;
1132   $Job->save if $job_has_uuid;
1133 }
1134
1135
1136 sub freeze_if_want_freeze
1137 {
1138   if ($main::please_freeze)
1139   {
1140     release_allocation();
1141     if (@_)
1142     {
1143       # kill some srun procs before freeze+stop
1144       map { $proc{$_} = {} } @_;
1145       while (%proc)
1146       {
1147         killem (keys %proc);
1148         select (undef, undef, undef, 0.1);
1149         my $died;
1150         while (($died = waitpid (-1, WNOHANG)) > 0)
1151         {
1152           delete $proc{$died};
1153         }
1154       }
1155     }
1156     freeze();
1157     collate_output();
1158     cleanup();
1159     save_meta();
1160     exit 0;
1161   }
1162 }
1163
1164
1165 sub freeze
1166 {
1167   Log (undef, "Freeze not implemented");
1168   return;
1169 }
1170
1171
1172 sub thaw
1173 {
1174   croak ("Thaw not implemented");
1175
1176   my $whc;
1177   my $key = shift;
1178   Log (undef, "thaw from $key");
1179
1180   @jobstep = ();
1181   @jobstep_done = ();
1182   @jobstep_todo = ();
1183   @jobstep_tomerge = ();
1184   $jobstep_tomerge_level = 0;
1185   my $frozenjob = {};
1186
1187   my $stream = new Warehouse::Stream ( whc => $whc,
1188                                        hash => [split (",", $key)] );
1189   $stream->rewind;
1190   while (my $dataref = $stream->read_until (undef, "\n\n"))
1191   {
1192     if ($$dataref =~ /^job /)
1193     {
1194       foreach (split ("\n", $$dataref))
1195       {
1196         my ($k, $v) = split ("=", $_, 2);
1197         $frozenjob->{$k} = freezeunquote ($v);
1198       }
1199       next;
1200     }
1201
1202     if ($$dataref =~ /^merge (\d+) (.*)/)
1203     {
1204       $jobstep_tomerge_level = $1;
1205       @jobstep_tomerge
1206           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1207       next;
1208     }
1209
1210     my $Jobstep = { };
1211     foreach (split ("\n", $$dataref))
1212     {
1213       my ($k, $v) = split ("=", $_, 2);
1214       $Jobstep->{$k} = freezeunquote ($v) if $k;
1215     }
1216     $Jobstep->{attempts} = 0;
1217     push @jobstep, $Jobstep;
1218
1219     if ($Jobstep->{exitcode} eq "0")
1220     {
1221       push @jobstep_done, $#jobstep;
1222     }
1223     else
1224     {
1225       push @jobstep_todo, $#jobstep;
1226     }
1227   }
1228
1229   foreach (qw (script script_version script_parameters))
1230   {
1231     $Job->{$_} = $frozenjob->{$_};
1232   }
1233   $Job->save if $job_has_uuid;
1234 }
1235
1236
1237 sub freezequote
1238 {
1239   my $s = shift;
1240   $s =~ s/\\/\\\\/g;
1241   $s =~ s/\n/\\n/g;
1242   return $s;
1243 }
1244
1245
1246 sub freezeunquote
1247 {
1248   my $s = shift;
1249   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1250   return $s;
1251 }
1252
1253
1254 sub srun
1255 {
1256   my $srunargs = shift;
1257   my $execargs = shift;
1258   my $opts = shift || {};
1259   my $stdin = shift;
1260   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1261   print STDERR (join (" ",
1262                       map { / / ? "'$_'" : $_ }
1263                       (@$args)),
1264                 "\n")
1265       if $ENV{CRUNCH_DEBUG};
1266
1267   if (defined $stdin) {
1268     my $child = open STDIN, "-|";
1269     defined $child or die "no fork: $!";
1270     if ($child == 0) {
1271       print $stdin or die $!;
1272       close STDOUT or die $!;
1273       exit 0;
1274     }
1275   }
1276
1277   return system (@$args) if $opts->{fork};
1278
1279   exec @$args;
1280   warn "ENV size is ".length(join(" ",%ENV));
1281   die "exec failed: $!: @$args";
1282 }
1283
1284
1285 sub ban_node_by_slot {
1286   # Don't start any new jobsteps on this node for 60 seconds
1287   my $slotid = shift;
1288   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1289   $slot[$slotid]->{node}->{hold_count}++;
1290   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1291 }
1292
1293 __DATA__
1294 #!/usr/bin/perl
1295
1296 # checkout-and-build
1297
1298 use Fcntl ':flock';
1299
1300 my $destdir = $ENV{"CRUNCH_SRC"};
1301 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1302 my $repo = $ENV{"CRUNCH_SRC_URL"};
1303
1304 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1305 flock L, LOCK_EX;
1306 if (readlink ("$destdir.commit") eq $commit) {
1307     exit 0;
1308 }
1309
1310 unlink "$destdir.commit";
1311 open STDOUT, ">", "$destdir.log";
1312 open STDERR, ">&STDOUT";
1313
1314 mkdir $destdir;
1315 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1316 print TARX <DATA>;
1317 if(!close(TARX)) {
1318   die "'tar -C $destdir -xf -' exited $?: $!";
1319 }
1320
1321 my $pwd;
1322 chomp ($pwd = `pwd`);
1323 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1324 mkdir $install_dir;
1325 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1326     # Old version
1327     shell_or_die ("./tests/autotests.sh", $install_dir);
1328 } elsif (-e "./install.sh") {
1329     shell_or_die ("./install.sh", $install_dir);
1330 }
1331
1332 if ($commit) {
1333     unlink "$destdir.commit.new";
1334     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1335     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1336 }
1337
1338 close L;
1339
1340 exit 0;
1341
1342 sub shell_or_die
1343 {
1344   if ($ENV{"DEBUG"}) {
1345     print STDERR "@_\n";
1346   }
1347   system (@_) == 0
1348       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1349 }
1350
1351 __DATA__