710fc0b0c8caa404eaa19dc865270f6ef6bd3e74
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100 my $local_job = !$job_has_uuid;
101
102
103 $SIG{'HUP'} = sub
104 {
105   1;
106 };
107 $SIG{'USR1'} = sub
108 {
109   $main::ENV{CRUNCH_DEBUG} = 1;
110 };
111 $SIG{'USR2'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 0;
114 };
115
116
117
118 my $arv = Arvados->new;
119 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
120 $metastream->clear;
121 $metastream->write_start('log.txt');
122
123 my $User = $arv->{'users'}->{'current'}->execute;
124
125 my $Job = {};
126 my $job_id;
127 my $dbh;
128 my $sth;
129 if ($job_has_uuid)
130 {
131   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
132   if (!$force_unlock) {
133     if ($Job->{'is_locked_by'}) {
134       croak("Job is locked: " . $Job->{'is_locked_by'});
135     }
136     if ($Job->{'success'} ne undef) {
137       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
138     }
139     if ($Job->{'running'}) {
140       croak("Job 'running' flag is already set");
141     }
142     if ($Job->{'started_at'}) {
143       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
144     }
145   }
146 }
147 else
148 {
149   $Job = JSON::decode_json($jobspec);
150
151   if (!$resume_stash)
152   {
153     map { croak ("No $_ specified") unless $Job->{$_} }
154     qw(script script_version script_parameters);
155   }
156
157   $Job->{'is_locked_by'} = $User->{'uuid'};
158   $Job->{'started_at'} = gmtime;
159
160   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
161
162   $job_has_uuid = 1;
163 }
164 $job_id = $Job->{'uuid'};
165
166
167
168 $Job->{'resource_limits'} ||= {};
169 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
170 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
171
172
173 Log (undef, "check slurm allocation");
174 my @slot;
175 my @node;
176 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
177 my @sinfo;
178 if (!$have_slurm)
179 {
180   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
181   push @sinfo, "$localcpus localhost";
182 }
183 if (exists $ENV{SLURM_NODELIST})
184 {
185   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
186 }
187 foreach (@sinfo)
188 {
189   my ($ncpus, $slurm_nodelist) = split;
190   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
191
192   my @nodelist;
193   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
194   {
195     my $nodelist = $1;
196     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
197     {
198       my $ranges = $1;
199       foreach (split (",", $ranges))
200       {
201         my ($a, $b);
202         if (/(\d+)-(\d+)/)
203         {
204           $a = $1;
205           $b = $2;
206         }
207         else
208         {
209           $a = $_;
210           $b = $_;
211         }
212         push @nodelist, map {
213           my $n = $nodelist;
214           $n =~ s/\[[-,\d]+\]/$_/;
215           $n;
216         } ($a..$b);
217       }
218     }
219     else
220     {
221       push @nodelist, $nodelist;
222     }
223   }
224   foreach my $nodename (@nodelist)
225   {
226     Log (undef, "node $nodename - $ncpus slots");
227     my $node = { name => $nodename,
228                  ncpus => $ncpus,
229                  losing_streak => 0,
230                  hold_until => 0 };
231     foreach my $cpu (1..$ncpus)
232     {
233       push @slot, { node => $node,
234                     cpu => $cpu };
235     }
236   }
237   push @node, @nodelist;
238 }
239
240
241
242 # Ensure that we get one jobstep running on each allocated node before
243 # we start overloading nodes with concurrent steps
244
245 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
246
247
248
249 my $jobmanager_id;
250 if ($job_has_uuid)
251 {
252   # Claim this job, and make sure nobody else does
253
254   $Job->{'is_locked_by'} = $User->{'uuid'};
255   $Job->{'started_at'} = gmtime;
256   $Job->{'running'} = 1;
257   $Job->{'success'} = undef;
258   $Job->{'tasks_summary'} = { 'failed' => 0,
259                               'todo' => 1,
260                               'running' => 0,
261                               'done' => 0 };
262   if ($job_has_uuid) {
263     unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
264       croak("Error while updating / locking job");
265     }
266   }
267 }
268
269
270 Log (undef, "start");
271 $SIG{'INT'} = sub { $main::please_freeze = 1; };
272 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
273 $SIG{'TERM'} = \&croak;
274 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
275 $SIG{'ALRM'} = sub { $main::please_info = 1; };
276 $SIG{'CONT'} = sub { $main::please_continue = 1; };
277 $main::please_freeze = 0;
278 $main::please_info = 0;
279 $main::please_continue = 0;
280 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
281
282 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
283 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
284 $ENV{"JOB_UUID"} = $job_id;
285
286
287 my @jobstep;
288 my @jobstep_todo = ();
289 my @jobstep_done = ();
290 my @jobstep_tomerge = ();
291 my $jobstep_tomerge_level = 0;
292 my $squeue_checked;
293 my $squeue_kill_checked;
294 my $output_in_keep = 0;
295
296
297
298 if (defined $Job->{thawedfromkey})
299 {
300   thaw ($Job->{thawedfromkey});
301 }
302 else
303 {
304   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
305     'job_uuid' => $Job->{'uuid'},
306     'sequence' => 0,
307     'qsequence' => 0,
308     'parameters' => {},
309                                                           });
310   push @jobstep, { 'level' => 0,
311                    'attempts' => 0,
312                    'arvados_task' => $first_task,
313                  };
314   push @jobstep_todo, 0;
315 }
316
317
318 my $build_script;
319
320
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
322
323 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
324 if ($skip_install)
325 {
326   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
327 }
328 else
329 {
330   do {
331     local $/ = undef;
332     $build_script = <DATA>;
333   };
334   Log (undef, "Install revision ".$Job->{script_version});
335   my $nodelist = join(",", @node);
336
337   # Clean out crunch_tmp/work and crunch_tmp/opt
338
339   my $cleanpid = fork();
340   if ($cleanpid == 0)
341   {
342     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
343           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
344     exit (1);
345   }
346   while (1)
347   {
348     last if $cleanpid == waitpid (-1, WNOHANG);
349     freeze_if_want_freeze ($cleanpid);
350     select (undef, undef, undef, 0.1);
351   }
352   Log (undef, "Clean-work-dir exited $?");
353
354   # Install requested code version
355
356   my @execargs;
357   my @srunargs = ("srun",
358                   "--nodelist=$nodelist",
359                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
360
361   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
362   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
363   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
364
365   my $commit;
366   my $git_archive;
367   my $treeish = $Job->{'script_version'};
368   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
369   # Todo: let script_version specify repository instead of expecting
370   # parent process to figure it out.
371   $ENV{"CRUNCH_SRC_URL"} = $repo;
372
373   # Create/update our clone of the remote git repo
374
375   if (!-d $ENV{"CRUNCH_SRC"}) {
376     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
377         or croak ("git clone $repo failed: exit ".($?>>8));
378     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
379   }
380   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
381
382   # If this looks like a subversion r#, look for it in git-svn commit messages
383
384   if ($treeish =~ m{^\d{1,4}$}) {
385     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
386     chomp $gitlog;
387     if ($gitlog =~ /^[a-f0-9]{40}$/) {
388       $commit = $gitlog;
389       Log (undef, "Using commit $commit for script_version $treeish");
390     }
391   }
392
393   # If that didn't work, try asking git to look it up as a tree-ish.
394
395   if (!defined $commit) {
396
397     my $cooked_treeish = $treeish;
398     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
399       # Looks like a git branch name -- make sure git knows it's
400       # relative to the remote repo
401       $cooked_treeish = "origin/$treeish";
402     }
403
404     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
405     chomp $found;
406     if ($found =~ /^[0-9a-f]{40}$/s) {
407       $commit = $found;
408       if ($commit ne $treeish) {
409         # Make sure we record the real commit id in the database,
410         # frozentokey, logs, etc. -- instead of an abbreviation or a
411         # branch name which can become ambiguous or point to a
412         # different commit in the future.
413         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
414         Log (undef, "Using commit $commit for tree-ish $treeish");
415         if ($commit ne $treeish) {
416           $Job->{'script_version'} = $commit;
417           !$job_has_uuid or $Job->save() or croak("Error while updating job");
418         }
419       }
420     }
421   }
422
423   if (defined $commit) {
424     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
425     @execargs = ("sh", "-c",
426                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
427     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
428   }
429   else {
430     croak ("could not figure out commit id for $treeish");
431   }
432
433   my $installpid = fork();
434   if ($installpid == 0)
435   {
436     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
437     exit (1);
438   }
439   while (1)
440   {
441     last if $installpid == waitpid (-1, WNOHANG);
442     freeze_if_want_freeze ($installpid);
443     select (undef, undef, undef, 0.1);
444   }
445   Log (undef, "Install exited $?");
446 }
447
448
449
450 foreach (qw (script script_version script_parameters resource_limits))
451 {
452   Log (undef,
453        "$_ " .
454        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
455 }
456 foreach (split (/\n/, $Job->{knobs}))
457 {
458   Log (undef, "knob " . $_);
459 }
460
461
462
463 my $success;
464
465
466
467 ONELEVEL:
468
469 my $thisround_succeeded = 0;
470 my $thisround_failed = 0;
471 my $thisround_failed_multiple = 0;
472
473 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
474                        or $a <=> $b } @jobstep_todo;
475 my $level = $jobstep[$jobstep_todo[0]]->{level};
476 Log (undef, "start level $level");
477
478
479
480 my %proc;
481 my @freeslot = (0..$#slot);
482 my @holdslot;
483 my %reader;
484 my $progress_is_dirty = 1;
485 my $progress_stats_updated = 0;
486
487 update_progress_stats();
488
489
490
491 THISROUND:
492 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
493 {
494   $main::please_continue = 0;
495
496   my $id = $jobstep_todo[$todo_ptr];
497   my $Jobstep = $jobstep[$id];
498   if ($Jobstep->{level} != $level)
499   {
500     next;
501   }
502   if ($Jobstep->{attempts} > 9)
503   {
504     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
505     $success = 0;
506     last THISROUND;
507   }
508
509   pipe $reader{$id}, "writer" or croak ($!);
510   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
511   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
512
513   my $childslot = $freeslot[0];
514   my $childnode = $slot[$childslot]->{node};
515   my $childslotname = join (".",
516                             $slot[$childslot]->{node}->{name},
517                             $slot[$childslot]->{cpu});
518   my $childpid = fork();
519   if ($childpid == 0)
520   {
521     $SIG{'INT'} = 'DEFAULT';
522     $SIG{'QUIT'} = 'DEFAULT';
523     $SIG{'TERM'} = 'DEFAULT';
524
525     foreach (values (%reader))
526     {
527       close($_);
528     }
529     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
530     open(STDOUT,">&writer");
531     open(STDERR,">&writer");
532
533     undef $dbh;
534     undef $sth;
535
536     delete $ENV{"GNUPGHOME"};
537     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
538     $ENV{"TASK_QSEQUENCE"} = $id;
539     $ENV{"TASK_SEQUENCE"} = $level;
540     $ENV{"JOB_SCRIPT"} = $Job->{script};
541     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
542       $param =~ tr/a-z/A-Z/;
543       $ENV{"JOB_PARAMETER_$param"} = $value;
544     }
545     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
546     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
547     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
548     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
549
550     $ENV{"GZIP"} = "-n";
551
552     my @srunargs = (
553       "srun",
554       "--nodelist=".$childnode->{name},
555       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556       "--job-name=$job_id.$id.$$",
557         );
558     my @execargs = qw(sh);
559     my $build_script_to_send = "";
560     my $command =
561         "mkdir -p $ENV{CRUNCH_WORK} $ENV{CRUNCH_TMP} "
562         ."&& cd $ENV{CRUNCH_TMP} ";
563     if ($build_script)
564     {
565       $build_script_to_send = $build_script;
566       $command .=
567           "&& perl -";
568     }
569     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
570     $command .=
571         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
572     my @execargs = ('bash', '-c', $command);
573     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
574     exit (1);
575   }
576   close("writer");
577   if (!defined $childpid)
578   {
579     close $reader{$id};
580     delete $reader{$id};
581     next;
582   }
583   shift @freeslot;
584   $proc{$childpid} = { jobstep => $id,
585                        time => time,
586                        slot => $childslot,
587                        jobstepname => "$job_id.$id.$childpid",
588                      };
589   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
590   $slot[$childslot]->{pid} = $childpid;
591
592   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
593   Log ($id, "child $childpid started on $childslotname");
594   $Jobstep->{attempts} ++;
595   $Jobstep->{starttime} = time;
596   $Jobstep->{node} = $childnode->{name};
597   $Jobstep->{slotindex} = $childslot;
598   delete $Jobstep->{stderr};
599   delete $Jobstep->{finishtime};
600
601   splice @jobstep_todo, $todo_ptr, 1;
602   --$todo_ptr;
603
604   $progress_is_dirty = 1;
605
606   while (!@freeslot
607          ||
608          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
609   {
610     last THISROUND if $main::please_freeze;
611     if ($main::please_info)
612     {
613       $main::please_info = 0;
614       freeze();
615       collate_output();
616       save_meta(1);
617       update_progress_stats();
618     }
619     my $gotsome
620         = readfrompipes ()
621         + reapchildren ();
622     if (!$gotsome)
623     {
624       check_squeue();
625       update_progress_stats();
626       select (undef, undef, undef, 0.1);
627     }
628     elsif (time - $progress_stats_updated >= 30)
629     {
630       update_progress_stats();
631     }
632     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
633         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
634     {
635       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
636           .($thisround_failed+$thisround_succeeded)
637           .") -- giving up on this round";
638       Log (undef, $message);
639       last THISROUND;
640     }
641
642     # move slots from freeslot to holdslot (or back to freeslot) if necessary
643     for (my $i=$#freeslot; $i>=0; $i--) {
644       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
645         push @holdslot, (splice @freeslot, $i, 1);
646       }
647     }
648     for (my $i=$#holdslot; $i>=0; $i--) {
649       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
650         push @freeslot, (splice @holdslot, $i, 1);
651       }
652     }
653
654     # give up if no nodes are succeeding
655     if (!grep { $_->{node}->{losing_streak} == 0 &&
656                     $_->{node}->{hold_count} < 4 } @slot) {
657       my $message = "Every node has failed -- giving up on this round";
658       Log (undef, $message);
659       last THISROUND;
660     }
661   }
662 }
663
664
665 push @freeslot, splice @holdslot;
666 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
667
668
669 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
670 while (%proc)
671 {
672   goto THISROUND if $main::please_continue;
673   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
674   readfrompipes ();
675   if (!reapchildren())
676   {
677     check_squeue();
678     update_progress_stats();
679     select (undef, undef, undef, 0.1);
680     killem (keys %proc) if $main::please_freeze;
681   }
682 }
683
684 update_progress_stats();
685 freeze_if_want_freeze();
686
687
688 if (!defined $success)
689 {
690   if (@jobstep_todo &&
691       $thisround_succeeded == 0 &&
692       ($thisround_failed == 0 || $thisround_failed > 4))
693   {
694     my $message = "stop because $thisround_failed tasks failed and none succeeded";
695     Log (undef, $message);
696     $success = 0;
697   }
698   if (!@jobstep_todo)
699   {
700     $success = 1;
701   }
702 }
703
704 goto ONELEVEL if !defined $success;
705
706
707 release_allocation();
708 freeze();
709 $Job->{'output'} = &collate_output();
710 $Job->{'success'} = $Job->{'output'} && $success;
711 $Job->save if $job_has_uuid;
712
713 if ($Job->{'output'})
714 {
715   eval {
716     my $manifest_text = capturex("whget", $Job->{'output'});
717     $arv->{'collections'}->{'create'}->execute('collection' => {
718       'uuid' => $Job->{'output'},
719       'manifest_text' => $manifest_text,
720     });
721   };
722   if ($@) {
723     Log (undef, "Failed to register output manifest: $@");
724   }
725 }
726
727 Log (undef, "finish");
728
729 save_meta();
730 exit 0;
731
732
733
734 sub update_progress_stats
735 {
736   $progress_stats_updated = time;
737   return if !$progress_is_dirty;
738   my ($todo, $done, $running) = (scalar @jobstep_todo,
739                                  scalar @jobstep_done,
740                                  scalar @slot - scalar @freeslot - scalar @holdslot);
741   $Job->{'tasks_summary'} ||= {};
742   $Job->{'tasks_summary'}->{'todo'} = $todo;
743   $Job->{'tasks_summary'}->{'done'} = $done;
744   $Job->{'tasks_summary'}->{'running'} = $running;
745   $Job->save if $job_has_uuid;
746   Log (undef, "status: $done done, $running running, $todo todo");
747   $progress_is_dirty = 0;
748 }
749
750
751
752 sub reapchildren
753 {
754   my $pid = waitpid (-1, WNOHANG);
755   return 0 if $pid <= 0;
756
757   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
758                   . "."
759                   . $slot[$proc{$pid}->{slot}]->{cpu});
760   my $jobstepid = $proc{$pid}->{jobstep};
761   my $elapsed = time - $proc{$pid}->{time};
762   my $Jobstep = $jobstep[$jobstepid];
763
764   my $exitcode = $?;
765   my $exitinfo = "exit $exitcode";
766   $Jobstep->{'arvados_task'}->reload;
767   my $success = $Jobstep->{'arvados_task'}->{success};
768
769   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
770
771   if (!defined $success) {
772     # task did not indicate one way or the other --> fail
773     $Jobstep->{'arvados_task'}->{success} = 0;
774     $Jobstep->{'arvados_task'}->save;
775     $success = 0;
776   }
777
778   if (!$success)
779   {
780     my $no_incr_attempts;
781     $no_incr_attempts = 1 if $Jobstep->{node_fail};
782
783     ++$thisround_failed;
784     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
785
786     # Check for signs of a failed or misconfigured node
787     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
788         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
789       # Don't count this against jobstep failure thresholds if this
790       # node is already suspected faulty and srun exited quickly
791       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
792           $elapsed < 5 &&
793           $Jobstep->{attempts} > 1) {
794         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
795         $no_incr_attempts = 1;
796         --$Jobstep->{attempts};
797       }
798       ban_node_by_slot($proc{$pid}->{slot});
799     }
800
801     push @jobstep_todo, $jobstepid;
802     Log ($jobstepid, "failure in $elapsed seconds");
803
804     --$Jobstep->{attempts} if $no_incr_attempts;
805     $Job->{'tasks_summary'}->{'failed'}++;
806   }
807   else
808   {
809     ++$thisround_succeeded;
810     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
811     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
812     push @jobstep_done, $jobstepid;
813     Log ($jobstepid, "success in $elapsed seconds");
814   }
815   $Jobstep->{exitcode} = $exitcode;
816   $Jobstep->{finishtime} = time;
817   process_stderr ($jobstepid, $success);
818   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
819
820   close $reader{$jobstepid};
821   delete $reader{$jobstepid};
822   delete $slot[$proc{$pid}->{slot}]->{pid};
823   push @freeslot, $proc{$pid}->{slot};
824   delete $proc{$pid};
825
826   # Load new tasks
827   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
828     'where' => {
829       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
830     },
831     'order' => 'qsequence'
832   );
833   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
834     my $jobstep = {
835       'level' => $arvados_task->{'sequence'},
836       'attempts' => 0,
837       'arvados_task' => $arvados_task
838     };
839     push @jobstep, $jobstep;
840     push @jobstep_todo, $#jobstep;
841   }
842
843   $progress_is_dirty = 1;
844   1;
845 }
846
847
848 sub check_squeue
849 {
850   # return if the kill list was checked <4 seconds ago
851   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
852   {
853     return;
854   }
855   $squeue_kill_checked = time;
856
857   # use killem() on procs whose killtime is reached
858   for (keys %proc)
859   {
860     if (exists $proc{$_}->{killtime}
861         && $proc{$_}->{killtime} <= time)
862     {
863       killem ($_);
864     }
865   }
866
867   # return if the squeue was checked <60 seconds ago
868   if (defined $squeue_checked && $squeue_checked > time - 60)
869   {
870     return;
871   }
872   $squeue_checked = time;
873
874   if (!$have_slurm)
875   {
876     # here is an opportunity to check for mysterious problems with local procs
877     return;
878   }
879
880   # get a list of steps still running
881   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
882   chop @squeue;
883   if ($squeue[-1] ne "ok")
884   {
885     return;
886   }
887   pop @squeue;
888
889   # which of my jobsteps are running, according to squeue?
890   my %ok;
891   foreach (@squeue)
892   {
893     if (/^(\d+)\.(\d+) (\S+)/)
894     {
895       if ($1 eq $ENV{SLURM_JOBID})
896       {
897         $ok{$3} = 1;
898       }
899     }
900   }
901
902   # which of my active child procs (>60s old) were not mentioned by squeue?
903   foreach (keys %proc)
904   {
905     if ($proc{$_}->{time} < time - 60
906         && !exists $ok{$proc{$_}->{jobstepname}}
907         && !exists $proc{$_}->{killtime})
908     {
909       # kill this proc if it hasn't exited in 30 seconds
910       $proc{$_}->{killtime} = time + 30;
911     }
912   }
913 }
914
915
916 sub release_allocation
917 {
918   if ($have_slurm)
919   {
920     Log (undef, "release job allocation");
921     system "scancel $ENV{SLURM_JOBID}";
922   }
923 }
924
925
926 sub readfrompipes
927 {
928   my $gotsome = 0;
929   foreach my $job (keys %reader)
930   {
931     my $buf;
932     while (0 < sysread ($reader{$job}, $buf, 8192))
933     {
934       print STDERR $buf if $ENV{CRUNCH_DEBUG};
935       $jobstep[$job]->{stderr} .= $buf;
936       preprocess_stderr ($job);
937       if (length ($jobstep[$job]->{stderr}) > 16384)
938       {
939         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
940       }
941       $gotsome = 1;
942     }
943   }
944   return $gotsome;
945 }
946
947
948 sub preprocess_stderr
949 {
950   my $job = shift;
951
952   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
953     my $line = $1;
954     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
955     Log ($job, "stderr $line");
956     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
957       # whoa.
958       $main::please_freeze = 1;
959     }
960     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
961       $jobstep[$job]->{node_fail} = 1;
962       ban_node_by_slot($jobstep[$job]->{slotindex});
963     }
964   }
965 }
966
967
968 sub process_stderr
969 {
970   my $job = shift;
971   my $success = shift;
972   preprocess_stderr ($job);
973
974   map {
975     Log ($job, "stderr $_");
976   } split ("\n", $jobstep[$job]->{stderr});
977 }
978
979
980 sub collate_output
981 {
982   my $whc = Warehouse->new;
983   Log (undef, "collate");
984   $whc->write_start (1);
985   my $joboutput;
986   for (@jobstep)
987   {
988     next if (!exists $_->{'arvados_task'}->{output} ||
989              !$_->{'arvados_task'}->{'success'} ||
990              $_->{'exitcode'} != 0);
991     my $output = $_->{'arvados_task'}->{output};
992     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
993     {
994       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
995       $whc->write_data ($output);
996     }
997     elsif (@jobstep == 1)
998     {
999       $joboutput = $output;
1000       $whc->write_finish;
1001     }
1002     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1003     {
1004       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1005       $whc->write_data ($outblock);
1006     }
1007     else
1008     {
1009       my $errstr = $whc->errstr;
1010       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1011       $success = 0;
1012     }
1013   }
1014   $joboutput = $whc->write_finish if !defined $joboutput;
1015   if ($joboutput)
1016   {
1017     Log (undef, "output $joboutput");
1018     $Job->{'output'} = $joboutput;
1019     $Job->save if $job_has_uuid;
1020   }
1021   else
1022   {
1023     Log (undef, "output undef");
1024   }
1025   return $joboutput;
1026 }
1027
1028
1029 sub killem
1030 {
1031   foreach (@_)
1032   {
1033     my $sig = 2;                # SIGINT first
1034     if (exists $proc{$_}->{"sent_$sig"} &&
1035         time - $proc{$_}->{"sent_$sig"} > 4)
1036     {
1037       $sig = 15;                # SIGTERM if SIGINT doesn't work
1038     }
1039     if (exists $proc{$_}->{"sent_$sig"} &&
1040         time - $proc{$_}->{"sent_$sig"} > 4)
1041     {
1042       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1043     }
1044     if (!exists $proc{$_}->{"sent_$sig"})
1045     {
1046       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1047       kill $sig, $_;
1048       select (undef, undef, undef, 0.1);
1049       if ($sig == 2)
1050       {
1051         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1052       }
1053       $proc{$_}->{"sent_$sig"} = time;
1054       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1055     }
1056   }
1057 }
1058
1059
1060 sub fhbits
1061 {
1062   my($bits);
1063   for (@_) {
1064     vec($bits,fileno($_),1) = 1;
1065   }
1066   $bits;
1067 }
1068
1069
1070 sub Log                         # ($jobstep_id, $logmessage)
1071 {
1072   if ($_[1] =~ /\n/) {
1073     for my $line (split (/\n/, $_[1])) {
1074       Log ($_[0], $line);
1075     }
1076     return;
1077   }
1078   my $fh = select STDERR; $|=1; select $fh;
1079   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1080   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1081   $message .= "\n";
1082   my $datetime;
1083   if ($metastream || -t STDERR) {
1084     my @gmtime = gmtime;
1085     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1086                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1087   }
1088   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1089
1090   return if !$metastream;
1091   $metastream->write_data ($datetime . " " . $message);
1092 }
1093
1094
1095 sub croak
1096 {
1097   my ($package, $file, $line) = caller;
1098   my $message = "@_ at $file line $line\n";
1099   Log (undef, $message);
1100   freeze() if @jobstep_todo;
1101   collate_output() if @jobstep_todo;
1102   cleanup();
1103   save_meta() if $metastream;
1104   die;
1105 }
1106
1107
1108 sub cleanup
1109 {
1110   return if !$job_has_uuid;
1111   $Job->reload if $job_has_uuid;
1112   $Job->{'running'} = 0;
1113   $Job->{'success'} = 0;
1114   $Job->{'finished_at'} = gmtime;
1115   $Job->save if $job_has_uuid;
1116 }
1117
1118
1119 sub save_meta
1120 {
1121   my $justcheckpoint = shift; # false if this will be the last meta saved
1122   my $m = $metastream;
1123   $m = $m->copy if $justcheckpoint;
1124   $m->write_finish;
1125   my $loglocator = $m->as_key;
1126   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1127   Log (undef, "meta key is $loglocator");
1128   $Job->{'log'} = $loglocator;
1129   $Job->save if $job_has_uuid;
1130 }
1131
1132
1133 sub freeze_if_want_freeze
1134 {
1135   if ($main::please_freeze)
1136   {
1137     release_allocation();
1138     if (@_)
1139     {
1140       # kill some srun procs before freeze+stop
1141       map { $proc{$_} = {} } @_;
1142       while (%proc)
1143       {
1144         killem (keys %proc);
1145         select (undef, undef, undef, 0.1);
1146         my $died;
1147         while (($died = waitpid (-1, WNOHANG)) > 0)
1148         {
1149           delete $proc{$died};
1150         }
1151       }
1152     }
1153     freeze();
1154     collate_output();
1155     cleanup();
1156     save_meta();
1157     exit 0;
1158   }
1159 }
1160
1161
1162 sub freeze
1163 {
1164   Log (undef, "Freeze not implemented");
1165   return;
1166 }
1167
1168
1169 sub thaw
1170 {
1171   croak ("Thaw not implemented");
1172
1173   my $whc;
1174   my $key = shift;
1175   Log (undef, "thaw from $key");
1176
1177   @jobstep = ();
1178   @jobstep_done = ();
1179   @jobstep_todo = ();
1180   @jobstep_tomerge = ();
1181   $jobstep_tomerge_level = 0;
1182   my $frozenjob = {};
1183
1184   my $stream = new Warehouse::Stream ( whc => $whc,
1185                                        hash => [split (",", $key)] );
1186   $stream->rewind;
1187   while (my $dataref = $stream->read_until (undef, "\n\n"))
1188   {
1189     if ($$dataref =~ /^job /)
1190     {
1191       foreach (split ("\n", $$dataref))
1192       {
1193         my ($k, $v) = split ("=", $_, 2);
1194         $frozenjob->{$k} = freezeunquote ($v);
1195       }
1196       next;
1197     }
1198
1199     if ($$dataref =~ /^merge (\d+) (.*)/)
1200     {
1201       $jobstep_tomerge_level = $1;
1202       @jobstep_tomerge
1203           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1204       next;
1205     }
1206
1207     my $Jobstep = { };
1208     foreach (split ("\n", $$dataref))
1209     {
1210       my ($k, $v) = split ("=", $_, 2);
1211       $Jobstep->{$k} = freezeunquote ($v) if $k;
1212     }
1213     $Jobstep->{attempts} = 0;
1214     push @jobstep, $Jobstep;
1215
1216     if ($Jobstep->{exitcode} eq "0")
1217     {
1218       push @jobstep_done, $#jobstep;
1219     }
1220     else
1221     {
1222       push @jobstep_todo, $#jobstep;
1223     }
1224   }
1225
1226   foreach (qw (script script_version script_parameters))
1227   {
1228     $Job->{$_} = $frozenjob->{$_};
1229   }
1230   $Job->save if $job_has_uuid;
1231 }
1232
1233
1234 sub freezequote
1235 {
1236   my $s = shift;
1237   $s =~ s/\\/\\\\/g;
1238   $s =~ s/\n/\\n/g;
1239   return $s;
1240 }
1241
1242
1243 sub freezeunquote
1244 {
1245   my $s = shift;
1246   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1247   return $s;
1248 }
1249
1250
1251 sub srun
1252 {
1253   my $srunargs = shift;
1254   my $execargs = shift;
1255   my $opts = shift || {};
1256   my $stdin = shift;
1257   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1258   print STDERR (join (" ",
1259                       map { / / ? "'$_'" : $_ }
1260                       (@$args)),
1261                 "\n")
1262       if $ENV{CRUNCH_DEBUG};
1263
1264   if (defined $stdin) {
1265     my $child = open STDIN, "-|";
1266     defined $child or die "no fork: $!";
1267     if ($child == 0) {
1268       print $stdin or die $!;
1269       close STDOUT or die $!;
1270       exit 0;
1271     }
1272   }
1273
1274   return system (@$args) if $opts->{fork};
1275
1276   exec @$args;
1277   warn "ENV size is ".length(join(" ",%ENV));
1278   die "exec failed: $!: @$args";
1279 }
1280
1281
1282 sub ban_node_by_slot {
1283   # Don't start any new jobsteps on this node for 60 seconds
1284   my $slotid = shift;
1285   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1286   $slot[$slotid]->{node}->{hold_count}++;
1287   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1288 }
1289
1290 __DATA__
1291 #!/usr/bin/perl
1292
1293 # checkout-and-build
1294
1295 use Fcntl ':flock';
1296
1297 my $destdir = $ENV{"CRUNCH_SRC"};
1298 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1299 my $repo = $ENV{"CRUNCH_SRC_URL"};
1300
1301 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1302 flock L, LOCK_EX;
1303 if (readlink ("$destdir.commit") eq $commit) {
1304     exit 0;
1305 }
1306
1307 unlink "$destdir.commit";
1308 open STDOUT, ">", "$destdir.log";
1309 open STDERR, ">&STDOUT";
1310
1311 mkdir $destdir;
1312 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1313 print TARX <DATA>;
1314 if(!close(TARX)) {
1315   die "'tar -C $destdir -xf -' exited $?: $!";
1316 }
1317
1318 my $pwd;
1319 chomp ($pwd = `pwd`);
1320 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1321 mkdir $install_dir;
1322 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1323     # Old version
1324     shell_or_die ("./tests/autotests.sh", $install_dir);
1325 } elsif (-e "./install.sh") {
1326     shell_or_die ("./install.sh", $install_dir);
1327 }
1328
1329 if ($commit) {
1330     unlink "$destdir.commit.new";
1331     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1332     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1333 }
1334
1335 close L;
1336
1337 exit 0;
1338
1339 sub shell_or_die
1340 {
1341   if ($ENV{"DEBUG"}) {
1342     print STDERR "@_\n";
1343   }
1344   system (@_) == 0
1345       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1346 }
1347
1348 __DATA__