20b65af998297037e14ff45f5943921d6d865fc5
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 unless (defined $ENV{"CRUNCH_TMP"}) {
79   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
80   if ($ENV{"USER"} ne "crunch" && $< != 0) {
81     # use a tmp dir unique for my uid
82     $ENV{"CRUNCH_TMP"} .= "-$<";
83   }
84 }
85 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
86 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
87 mkdir ($ENV{"JOB_WORK"});
88
89 my $force_unlock;
90 my $git_dir;
91 my $jobspec;
92 my $job_api_token;
93 my $resume_stash;
94 GetOptions('force-unlock' => \$force_unlock,
95            'git-dir=s' => \$git_dir,
96            'job=s' => \$jobspec,
97            'job-api-token=s' => \$job_api_token,
98            'resume-stash=s' => \$resume_stash,
99     );
100
101 if (defined $job_api_token) {
102   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
103 }
104
105 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
106 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
107 my $local_job = !$job_has_uuid;
108
109
110 $SIG{'HUP'} = sub
111 {
112   1;
113 };
114 $SIG{'USR1'} = sub
115 {
116   $main::ENV{CRUNCH_DEBUG} = 1;
117 };
118 $SIG{'USR2'} = sub
119 {
120   $main::ENV{CRUNCH_DEBUG} = 0;
121 };
122
123
124
125 my $arv = Arvados->new;
126 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
127 $metastream->clear;
128 $metastream->write_start('log.txt');
129
130 my $User = $arv->{'users'}->{'current'}->execute;
131
132 my $Job = {};
133 my $job_id;
134 my $dbh;
135 my $sth;
136 if ($job_has_uuid)
137 {
138   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
139   if (!$force_unlock) {
140     if ($Job->{'is_locked_by_uuid'}) {
141       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
142     }
143     if ($Job->{'success'} ne undef) {
144       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
145     }
146     if ($Job->{'running'}) {
147       croak("Job 'running' flag is already set");
148     }
149     if ($Job->{'started_at'}) {
150       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
151     }
152   }
153 }
154 else
155 {
156   $Job = JSON::decode_json($jobspec);
157
158   if (!$resume_stash)
159   {
160     map { croak ("No $_ specified") unless $Job->{$_} }
161     qw(script script_version script_parameters);
162   }
163
164   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
165   $Job->{'started_at'} = gmtime;
166
167   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
168
169   $job_has_uuid = 1;
170 }
171 $job_id = $Job->{'uuid'};
172
173
174
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
178
179
180 Log (undef, "check slurm allocation");
181 my @slot;
182 my @node;
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my @sinfo;
185 if (!$have_slurm)
186 {
187   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188   push @sinfo, "$localcpus localhost";
189 }
190 if (exists $ENV{SLURM_NODELIST})
191 {
192   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 }
194 foreach (@sinfo)
195 {
196   my ($ncpus, $slurm_nodelist) = split;
197   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198
199   my @nodelist;
200   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201   {
202     my $nodelist = $1;
203     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204     {
205       my $ranges = $1;
206       foreach (split (",", $ranges))
207       {
208         my ($a, $b);
209         if (/(\d+)-(\d+)/)
210         {
211           $a = $1;
212           $b = $2;
213         }
214         else
215         {
216           $a = $_;
217           $b = $_;
218         }
219         push @nodelist, map {
220           my $n = $nodelist;
221           $n =~ s/\[[-,\d]+\]/$_/;
222           $n;
223         } ($a..$b);
224       }
225     }
226     else
227     {
228       push @nodelist, $nodelist;
229     }
230   }
231   foreach my $nodename (@nodelist)
232   {
233     Log (undef, "node $nodename - $ncpus slots");
234     my $node = { name => $nodename,
235                  ncpus => $ncpus,
236                  losing_streak => 0,
237                  hold_until => 0 };
238     foreach my $cpu (1..$ncpus)
239     {
240       push @slot, { node => $node,
241                     cpu => $cpu };
242     }
243   }
244   push @node, @nodelist;
245 }
246
247
248
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
251
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
253
254
255
256 my $jobmanager_id;
257 if ($job_has_uuid)
258 {
259   # Claim this job, and make sure nobody else does
260
261   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
262   $Job->{'started_at'} = gmtime;
263   $Job->{'running'} = 1;
264   $Job->{'success'} = undef;
265   $Job->{'tasks_summary'} = { 'failed' => 0,
266                               'todo' => 1,
267                               'running' => 0,
268                               'done' => 0 };
269   if ($job_has_uuid) {
270     unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
271       croak("Error while updating / locking job");
272     }
273   }
274 }
275
276
277 Log (undef, "start");
278 $SIG{'INT'} = sub { $main::please_freeze = 1; };
279 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
280 $SIG{'TERM'} = \&croak;
281 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
282 $SIG{'ALRM'} = sub { $main::please_info = 1; };
283 $SIG{'CONT'} = sub { $main::please_continue = 1; };
284 $main::please_freeze = 0;
285 $main::please_info = 0;
286 $main::please_continue = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302
303
304
305 if (defined $Job->{thawedfromkey})
306 {
307   thaw ($Job->{thawedfromkey});
308 }
309 else
310 {
311   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
312     'job_uuid' => $Job->{'uuid'},
313     'sequence' => 0,
314     'qsequence' => 0,
315     'parameters' => {},
316                                                           });
317   push @jobstep, { 'level' => 0,
318                    'attempts' => 0,
319                    'arvados_task' => $first_task,
320                  };
321   push @jobstep_todo, 0;
322 }
323
324
325 my $build_script;
326
327
328 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
329
330 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
331 if ($skip_install)
332 {
333   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
334 }
335 else
336 {
337   do {
338     local $/ = undef;
339     $build_script = <DATA>;
340   };
341   Log (undef, "Install revision ".$Job->{script_version});
342   my $nodelist = join(",", @node);
343
344   # Clean out crunch_tmp/work and crunch_tmp/opt
345
346   my $cleanpid = fork();
347   if ($cleanpid == 0)
348   {
349     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
350           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
351     exit (1);
352   }
353   while (1)
354   {
355     last if $cleanpid == waitpid (-1, WNOHANG);
356     freeze_if_want_freeze ($cleanpid);
357     select (undef, undef, undef, 0.1);
358   }
359   Log (undef, "Clean-work-dir exited $?");
360
361   # Install requested code version
362
363   my @execargs;
364   my @srunargs = ("srun",
365                   "--nodelist=$nodelist",
366                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
367
368   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
369   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
370   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
371
372   my $commit;
373   my $git_archive;
374   my $treeish = $Job->{'script_version'};
375   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
376   # Todo: let script_version specify repository instead of expecting
377   # parent process to figure it out.
378   $ENV{"CRUNCH_SRC_URL"} = $repo;
379
380   # Create/update our clone of the remote git repo
381
382   if (!-d $ENV{"CRUNCH_SRC"}) {
383     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
384         or croak ("git clone $repo failed: exit ".($?>>8));
385     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
386   }
387   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
388
389   # If this looks like a subversion r#, look for it in git-svn commit messages
390
391   if ($treeish =~ m{^\d{1,4}$}) {
392     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
393     chomp $gitlog;
394     if ($gitlog =~ /^[a-f0-9]{40}$/) {
395       $commit = $gitlog;
396       Log (undef, "Using commit $commit for script_version $treeish");
397     }
398   }
399
400   # If that didn't work, try asking git to look it up as a tree-ish.
401
402   if (!defined $commit) {
403
404     my $cooked_treeish = $treeish;
405     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
406       # Looks like a git branch name -- make sure git knows it's
407       # relative to the remote repo
408       $cooked_treeish = "origin/$treeish";
409     }
410
411     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
412     chomp $found;
413     if ($found =~ /^[0-9a-f]{40}$/s) {
414       $commit = $found;
415       if ($commit ne $treeish) {
416         # Make sure we record the real commit id in the database,
417         # frozentokey, logs, etc. -- instead of an abbreviation or a
418         # branch name which can become ambiguous or point to a
419         # different commit in the future.
420         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421         Log (undef, "Using commit $commit for tree-ish $treeish");
422         if ($commit ne $treeish) {
423           $Job->{'script_version'} = $commit;
424           !$job_has_uuid or $Job->save() or croak("Error while updating job");
425         }
426       }
427     }
428   }
429
430   if (defined $commit) {
431     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432     @execargs = ("sh", "-c",
433                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
435   }
436   else {
437     croak ("could not figure out commit id for $treeish");
438   }
439
440   my $installpid = fork();
441   if ($installpid == 0)
442   {
443     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
444     exit (1);
445   }
446   while (1)
447   {
448     last if $installpid == waitpid (-1, WNOHANG);
449     freeze_if_want_freeze ($installpid);
450     select (undef, undef, undef, 0.1);
451   }
452   Log (undef, "Install exited $?");
453 }
454
455
456
457 foreach (qw (script script_version script_parameters runtime_constraints))
458 {
459   Log (undef,
460        "$_ " .
461        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
462 }
463 foreach (split (/\n/, $Job->{knobs}))
464 {
465   Log (undef, "knob " . $_);
466 }
467
468
469
470 my $success;
471
472
473
474 ONELEVEL:
475
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
479
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481                        or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
484
485
486
487 my %proc;
488 my @freeslot = (0..$#slot);
489 my @holdslot;
490 my %reader;
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
493
494 update_progress_stats();
495
496
497
498 THISROUND:
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
500 {
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507   if ($Jobstep->{attempts} > 2)
508   {
509     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
510     $success = 0;
511     last THISROUND;
512   }
513
514   pipe $reader{$id}, "writer" or croak ($!);
515   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
517
518   my $childslot = $freeslot[0];
519   my $childnode = $slot[$childslot]->{node};
520   my $childslotname = join (".",
521                             $slot[$childslot]->{node}->{name},
522                             $slot[$childslot]->{cpu});
523   my $childpid = fork();
524   if ($childpid == 0)
525   {
526     $SIG{'INT'} = 'DEFAULT';
527     $SIG{'QUIT'} = 'DEFAULT';
528     $SIG{'TERM'} = 'DEFAULT';
529
530     foreach (values (%reader))
531     {
532       close($_);
533     }
534     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535     open(STDOUT,">&writer");
536     open(STDERR,">&writer");
537
538     undef $dbh;
539     undef $sth;
540
541     delete $ENV{"GNUPGHOME"};
542     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543     $ENV{"TASK_QSEQUENCE"} = $id;
544     $ENV{"TASK_SEQUENCE"} = $level;
545     $ENV{"JOB_SCRIPT"} = $Job->{script};
546     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547       $param =~ tr/a-z/A-Z/;
548       $ENV{"JOB_PARAMETER_$param"} = $value;
549     }
550     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
555
556     $ENV{"GZIP"} = "-n";
557
558     my @srunargs = (
559       "srun",
560       "--nodelist=".$childnode->{name},
561       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562       "--job-name=$job_id.$id.$$",
563         );
564     my @execargs = qw(sh);
565     my $build_script_to_send = "";
566     my $command =
567         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568         ."&& cd $ENV{CRUNCH_TMP} ";
569     if ($build_script)
570     {
571       $build_script_to_send = $build_script;
572       $command .=
573           "&& perl -";
574     }
575     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
576     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
577     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
578     $command .=
579         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580     my @execargs = ('bash', '-c', $command);
581     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
582     exit (1);
583   }
584   close("writer");
585   if (!defined $childpid)
586   {
587     close $reader{$id};
588     delete $reader{$id};
589     next;
590   }
591   shift @freeslot;
592   $proc{$childpid} = { jobstep => $id,
593                        time => time,
594                        slot => $childslot,
595                        jobstepname => "$job_id.$id.$childpid",
596                      };
597   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598   $slot[$childslot]->{pid} = $childpid;
599
600   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601   Log ($id, "child $childpid started on $childslotname");
602   $Jobstep->{attempts} ++;
603   $Jobstep->{starttime} = time;
604   $Jobstep->{node} = $childnode->{name};
605   $Jobstep->{slotindex} = $childslot;
606   delete $Jobstep->{stderr};
607   delete $Jobstep->{finishtime};
608
609   splice @jobstep_todo, $todo_ptr, 1;
610   --$todo_ptr;
611
612   $progress_is_dirty = 1;
613
614   while (!@freeslot
615          ||
616          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
617   {
618     last THISROUND if $main::please_freeze;
619     if ($main::please_info)
620     {
621       $main::please_info = 0;
622       freeze();
623       collate_output();
624       save_meta(1);
625       update_progress_stats();
626     }
627     my $gotsome
628         = readfrompipes ()
629         + reapchildren ();
630     if (!$gotsome)
631     {
632       check_squeue();
633       update_progress_stats();
634       select (undef, undef, undef, 0.1);
635     }
636     elsif (time - $progress_stats_updated >= 30)
637     {
638       update_progress_stats();
639     }
640     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
642     {
643       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644           .($thisround_failed+$thisround_succeeded)
645           .") -- giving up on this round";
646       Log (undef, $message);
647       last THISROUND;
648     }
649
650     # move slots from freeslot to holdslot (or back to freeslot) if necessary
651     for (my $i=$#freeslot; $i>=0; $i--) {
652       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653         push @holdslot, (splice @freeslot, $i, 1);
654       }
655     }
656     for (my $i=$#holdslot; $i>=0; $i--) {
657       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658         push @freeslot, (splice @holdslot, $i, 1);
659       }
660     }
661
662     # give up if no nodes are succeeding
663     if (!grep { $_->{node}->{losing_streak} == 0 &&
664                     $_->{node}->{hold_count} < 4 } @slot) {
665       my $message = "Every node has failed -- giving up on this round";
666       Log (undef, $message);
667       last THISROUND;
668     }
669   }
670 }
671
672
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
675
676
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
678 while (%proc)
679 {
680   if ($main::please_continue) {
681     $main::please_continue = 0;
682     goto THISROUND;
683   }
684   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
685   readfrompipes ();
686   if (!reapchildren())
687   {
688     check_squeue();
689     update_progress_stats();
690     select (undef, undef, undef, 0.1);
691     killem (keys %proc) if $main::please_freeze;
692   }
693 }
694
695 update_progress_stats();
696 freeze_if_want_freeze();
697
698
699 if (!defined $success)
700 {
701   if (@jobstep_todo &&
702       $thisround_succeeded == 0 &&
703       ($thisround_failed == 0 || $thisround_failed > 4))
704   {
705     my $message = "stop because $thisround_failed tasks failed and none succeeded";
706     Log (undef, $message);
707     $success = 0;
708   }
709   if (!@jobstep_todo)
710   {
711     $success = 1;
712   }
713 }
714
715 goto ONELEVEL if !defined $success;
716
717
718 release_allocation();
719 freeze();
720 $Job->reload;
721 $Job->{'output'} = &collate_output();
722 $Job->{'running'} = 0;
723 $Job->{'success'} = $Job->{'output'} && $success;
724 $Job->{'finished_at'} = gmtime;
725 $Job->save if $job_has_uuid;
726
727 if ($Job->{'output'})
728 {
729   eval {
730     my $manifest_text = capturex("whget", $Job->{'output'});
731     $arv->{'collections'}->{'create'}->execute('collection' => {
732       'uuid' => $Job->{'output'},
733       'manifest_text' => $manifest_text,
734     });
735   };
736   if ($@) {
737     Log (undef, "Failed to register output manifest: $@");
738   }
739 }
740
741 Log (undef, "finish");
742
743 save_meta();
744 exit 0;
745
746
747
748 sub update_progress_stats
749 {
750   $progress_stats_updated = time;
751   return if !$progress_is_dirty;
752   my ($todo, $done, $running) = (scalar @jobstep_todo,
753                                  scalar @jobstep_done,
754                                  scalar @slot - scalar @freeslot - scalar @holdslot);
755   $Job->{'tasks_summary'} ||= {};
756   $Job->{'tasks_summary'}->{'todo'} = $todo;
757   $Job->{'tasks_summary'}->{'done'} = $done;
758   $Job->{'tasks_summary'}->{'running'} = $running;
759   $Job->save if $job_has_uuid;
760   Log (undef, "status: $done done, $running running, $todo todo");
761   $progress_is_dirty = 0;
762 }
763
764
765
766 sub reapchildren
767 {
768   my $pid = waitpid (-1, WNOHANG);
769   return 0 if $pid <= 0;
770
771   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
772                   . "."
773                   . $slot[$proc{$pid}->{slot}]->{cpu});
774   my $jobstepid = $proc{$pid}->{jobstep};
775   my $elapsed = time - $proc{$pid}->{time};
776   my $Jobstep = $jobstep[$jobstepid];
777
778   my $exitcode = $?;
779   my $exitinfo = "exit $exitcode";
780   $Jobstep->{'arvados_task'}->reload;
781   my $success = $Jobstep->{'arvados_task'}->{success};
782
783   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
784
785   if (!defined $success) {
786     # task did not indicate one way or the other --> fail
787     $Jobstep->{'arvados_task'}->{success} = 0;
788     $Jobstep->{'arvados_task'}->save;
789     $success = 0;
790   }
791
792   if (!$success)
793   {
794     my $no_incr_attempts;
795     $no_incr_attempts = 1 if $Jobstep->{node_fail};
796
797     ++$thisround_failed;
798     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
799
800     # Check for signs of a failed or misconfigured node
801     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
802         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
803       # Don't count this against jobstep failure thresholds if this
804       # node is already suspected faulty and srun exited quickly
805       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
806           $elapsed < 5 &&
807           $Jobstep->{attempts} > 1) {
808         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
809         $no_incr_attempts = 1;
810       }
811       ban_node_by_slot($proc{$pid}->{slot});
812     }
813
814     push @jobstep_todo, $jobstepid;
815     Log ($jobstepid, "failure in $elapsed seconds");
816
817     --$Jobstep->{attempts} if $no_incr_attempts;
818     $Job->{'tasks_summary'}->{'failed'}++;
819   }
820   else
821   {
822     ++$thisround_succeeded;
823     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
824     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
825     push @jobstep_done, $jobstepid;
826     Log ($jobstepid, "success in $elapsed seconds");
827   }
828   $Jobstep->{exitcode} = $exitcode;
829   $Jobstep->{finishtime} = time;
830   process_stderr ($jobstepid, $success);
831   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
832
833   close $reader{$jobstepid};
834   delete $reader{$jobstepid};
835   delete $slot[$proc{$pid}->{slot}]->{pid};
836   push @freeslot, $proc{$pid}->{slot};
837   delete $proc{$pid};
838
839   # Load new tasks
840   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
841     'where' => {
842       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
843     },
844     'order' => 'qsequence'
845   );
846   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
847     my $jobstep = {
848       'level' => $arvados_task->{'sequence'},
849       'attempts' => 0,
850       'arvados_task' => $arvados_task
851     };
852     push @jobstep, $jobstep;
853     push @jobstep_todo, $#jobstep;
854   }
855
856   $progress_is_dirty = 1;
857   1;
858 }
859
860
861 sub check_squeue
862 {
863   # return if the kill list was checked <4 seconds ago
864   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
865   {
866     return;
867   }
868   $squeue_kill_checked = time;
869
870   # use killem() on procs whose killtime is reached
871   for (keys %proc)
872   {
873     if (exists $proc{$_}->{killtime}
874         && $proc{$_}->{killtime} <= time)
875     {
876       killem ($_);
877     }
878   }
879
880   # return if the squeue was checked <60 seconds ago
881   if (defined $squeue_checked && $squeue_checked > time - 60)
882   {
883     return;
884   }
885   $squeue_checked = time;
886
887   if (!$have_slurm)
888   {
889     # here is an opportunity to check for mysterious problems with local procs
890     return;
891   }
892
893   # get a list of steps still running
894   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
895   chop @squeue;
896   if ($squeue[-1] ne "ok")
897   {
898     return;
899   }
900   pop @squeue;
901
902   # which of my jobsteps are running, according to squeue?
903   my %ok;
904   foreach (@squeue)
905   {
906     if (/^(\d+)\.(\d+) (\S+)/)
907     {
908       if ($1 eq $ENV{SLURM_JOBID})
909       {
910         $ok{$3} = 1;
911       }
912     }
913   }
914
915   # which of my active child procs (>60s old) were not mentioned by squeue?
916   foreach (keys %proc)
917   {
918     if ($proc{$_}->{time} < time - 60
919         && !exists $ok{$proc{$_}->{jobstepname}}
920         && !exists $proc{$_}->{killtime})
921     {
922       # kill this proc if it hasn't exited in 30 seconds
923       $proc{$_}->{killtime} = time + 30;
924     }
925   }
926 }
927
928
929 sub release_allocation
930 {
931   if ($have_slurm)
932   {
933     Log (undef, "release job allocation");
934     system "scancel $ENV{SLURM_JOBID}";
935   }
936 }
937
938
939 sub readfrompipes
940 {
941   my $gotsome = 0;
942   foreach my $job (keys %reader)
943   {
944     my $buf;
945     while (0 < sysread ($reader{$job}, $buf, 8192))
946     {
947       print STDERR $buf if $ENV{CRUNCH_DEBUG};
948       $jobstep[$job]->{stderr} .= $buf;
949       preprocess_stderr ($job);
950       if (length ($jobstep[$job]->{stderr}) > 16384)
951       {
952         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
953       }
954       $gotsome = 1;
955     }
956   }
957   return $gotsome;
958 }
959
960
961 sub preprocess_stderr
962 {
963   my $job = shift;
964
965   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
966     my $line = $1;
967     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
968     Log ($job, "stderr $line");
969     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
970       # whoa.
971       $main::please_freeze = 1;
972     }
973     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
974       $jobstep[$job]->{node_fail} = 1;
975       ban_node_by_slot($jobstep[$job]->{slotindex});
976     }
977   }
978 }
979
980
981 sub process_stderr
982 {
983   my $job = shift;
984   my $success = shift;
985   preprocess_stderr ($job);
986
987   map {
988     Log ($job, "stderr $_");
989   } split ("\n", $jobstep[$job]->{stderr});
990 }
991
992
993 sub collate_output
994 {
995   my $whc = Warehouse->new;
996   Log (undef, "collate");
997   $whc->write_start (1);
998   my $joboutput;
999   for (@jobstep)
1000   {
1001     next if (!exists $_->{'arvados_task'}->{output} ||
1002              !$_->{'arvados_task'}->{'success'} ||
1003              $_->{'exitcode'} != 0);
1004     my $output = $_->{'arvados_task'}->{output};
1005     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1006     {
1007       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1008       $whc->write_data ($output);
1009     }
1010     elsif (@jobstep == 1)
1011     {
1012       $joboutput = $output;
1013       $whc->write_finish;
1014     }
1015     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1016     {
1017       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1018       $whc->write_data ($outblock);
1019     }
1020     else
1021     {
1022       my $errstr = $whc->errstr;
1023       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1024       $success = 0;
1025     }
1026   }
1027   $joboutput = $whc->write_finish if !defined $joboutput;
1028   if ($joboutput)
1029   {
1030     Log (undef, "output $joboutput");
1031     $Job->{'output'} = $joboutput;
1032     $Job->save if $job_has_uuid;
1033   }
1034   else
1035   {
1036     Log (undef, "output undef");
1037   }
1038   return $joboutput;
1039 }
1040
1041
1042 sub killem
1043 {
1044   foreach (@_)
1045   {
1046     my $sig = 2;                # SIGINT first
1047     if (exists $proc{$_}->{"sent_$sig"} &&
1048         time - $proc{$_}->{"sent_$sig"} > 4)
1049     {
1050       $sig = 15;                # SIGTERM if SIGINT doesn't work
1051     }
1052     if (exists $proc{$_}->{"sent_$sig"} &&
1053         time - $proc{$_}->{"sent_$sig"} > 4)
1054     {
1055       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1056     }
1057     if (!exists $proc{$_}->{"sent_$sig"})
1058     {
1059       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1060       kill $sig, $_;
1061       select (undef, undef, undef, 0.1);
1062       if ($sig == 2)
1063       {
1064         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1065       }
1066       $proc{$_}->{"sent_$sig"} = time;
1067       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1068     }
1069   }
1070 }
1071
1072
1073 sub fhbits
1074 {
1075   my($bits);
1076   for (@_) {
1077     vec($bits,fileno($_),1) = 1;
1078   }
1079   $bits;
1080 }
1081
1082
1083 sub Log                         # ($jobstep_id, $logmessage)
1084 {
1085   if ($_[1] =~ /\n/) {
1086     for my $line (split (/\n/, $_[1])) {
1087       Log ($_[0], $line);
1088     }
1089     return;
1090   }
1091   my $fh = select STDERR; $|=1; select $fh;
1092   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1093   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1094   $message .= "\n";
1095   my $datetime;
1096   if ($metastream || -t STDERR) {
1097     my @gmtime = gmtime;
1098     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1099                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1100   }
1101   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1102
1103   return if !$metastream;
1104   $metastream->write_data ($datetime . " " . $message);
1105 }
1106
1107
1108 sub croak
1109 {
1110   my ($package, $file, $line) = caller;
1111   my $message = "@_ at $file line $line\n";
1112   Log (undef, $message);
1113   freeze() if @jobstep_todo;
1114   collate_output() if @jobstep_todo;
1115   cleanup();
1116   save_meta() if $metastream;
1117   die;
1118 }
1119
1120
1121 sub cleanup
1122 {
1123   return if !$job_has_uuid;
1124   $Job->reload;
1125   $Job->{'running'} = 0;
1126   $Job->{'success'} = 0;
1127   $Job->{'finished_at'} = gmtime;
1128   $Job->save;
1129 }
1130
1131
1132 sub save_meta
1133 {
1134   my $justcheckpoint = shift; # false if this will be the last meta saved
1135   my $m = $metastream;
1136   $m = $m->copy if $justcheckpoint;
1137   $m->write_finish;
1138   my $loglocator = $m->as_key;
1139   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1140   Log (undef, "meta key is $loglocator");
1141   $Job->{'log'} = $loglocator;
1142   $Job->save if $job_has_uuid;
1143 }
1144
1145
1146 sub freeze_if_want_freeze
1147 {
1148   if ($main::please_freeze)
1149   {
1150     release_allocation();
1151     if (@_)
1152     {
1153       # kill some srun procs before freeze+stop
1154       map { $proc{$_} = {} } @_;
1155       while (%proc)
1156       {
1157         killem (keys %proc);
1158         select (undef, undef, undef, 0.1);
1159         my $died;
1160         while (($died = waitpid (-1, WNOHANG)) > 0)
1161         {
1162           delete $proc{$died};
1163         }
1164       }
1165     }
1166     freeze();
1167     collate_output();
1168     cleanup();
1169     save_meta();
1170     exit 0;
1171   }
1172 }
1173
1174
1175 sub freeze
1176 {
1177   Log (undef, "Freeze not implemented");
1178   return;
1179 }
1180
1181
1182 sub thaw
1183 {
1184   croak ("Thaw not implemented");
1185
1186   my $whc;
1187   my $key = shift;
1188   Log (undef, "thaw from $key");
1189
1190   @jobstep = ();
1191   @jobstep_done = ();
1192   @jobstep_todo = ();
1193   @jobstep_tomerge = ();
1194   $jobstep_tomerge_level = 0;
1195   my $frozenjob = {};
1196
1197   my $stream = new Warehouse::Stream ( whc => $whc,
1198                                        hash => [split (",", $key)] );
1199   $stream->rewind;
1200   while (my $dataref = $stream->read_until (undef, "\n\n"))
1201   {
1202     if ($$dataref =~ /^job /)
1203     {
1204       foreach (split ("\n", $$dataref))
1205       {
1206         my ($k, $v) = split ("=", $_, 2);
1207         $frozenjob->{$k} = freezeunquote ($v);
1208       }
1209       next;
1210     }
1211
1212     if ($$dataref =~ /^merge (\d+) (.*)/)
1213     {
1214       $jobstep_tomerge_level = $1;
1215       @jobstep_tomerge
1216           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1217       next;
1218     }
1219
1220     my $Jobstep = { };
1221     foreach (split ("\n", $$dataref))
1222     {
1223       my ($k, $v) = split ("=", $_, 2);
1224       $Jobstep->{$k} = freezeunquote ($v) if $k;
1225     }
1226     $Jobstep->{attempts} = 0;
1227     push @jobstep, $Jobstep;
1228
1229     if ($Jobstep->{exitcode} eq "0")
1230     {
1231       push @jobstep_done, $#jobstep;
1232     }
1233     else
1234     {
1235       push @jobstep_todo, $#jobstep;
1236     }
1237   }
1238
1239   foreach (qw (script script_version script_parameters))
1240   {
1241     $Job->{$_} = $frozenjob->{$_};
1242   }
1243   $Job->save if $job_has_uuid;
1244 }
1245
1246
1247 sub freezequote
1248 {
1249   my $s = shift;
1250   $s =~ s/\\/\\\\/g;
1251   $s =~ s/\n/\\n/g;
1252   return $s;
1253 }
1254
1255
1256 sub freezeunquote
1257 {
1258   my $s = shift;
1259   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1260   return $s;
1261 }
1262
1263
1264 sub srun
1265 {
1266   my $srunargs = shift;
1267   my $execargs = shift;
1268   my $opts = shift || {};
1269   my $stdin = shift;
1270   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1271   print STDERR (join (" ",
1272                       map { / / ? "'$_'" : $_ }
1273                       (@$args)),
1274                 "\n")
1275       if $ENV{CRUNCH_DEBUG};
1276
1277   if (defined $stdin) {
1278     my $child = open STDIN, "-|";
1279     defined $child or die "no fork: $!";
1280     if ($child == 0) {
1281       print $stdin or die $!;
1282       close STDOUT or die $!;
1283       exit 0;
1284     }
1285   }
1286
1287   return system (@$args) if $opts->{fork};
1288
1289   exec @$args;
1290   warn "ENV size is ".length(join(" ",%ENV));
1291   die "exec failed: $!: @$args";
1292 }
1293
1294
1295 sub ban_node_by_slot {
1296   # Don't start any new jobsteps on this node for 60 seconds
1297   my $slotid = shift;
1298   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1299   $slot[$slotid]->{node}->{hold_count}++;
1300   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1301 }
1302
1303 __DATA__
1304 #!/usr/bin/perl
1305
1306 # checkout-and-build
1307
1308 use Fcntl ':flock';
1309
1310 my $destdir = $ENV{"CRUNCH_SRC"};
1311 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1312 my $repo = $ENV{"CRUNCH_SRC_URL"};
1313
1314 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1315 flock L, LOCK_EX;
1316 if (readlink ("$destdir.commit") eq $commit) {
1317     exit 0;
1318 }
1319
1320 unlink "$destdir.commit";
1321 open STDOUT, ">", "$destdir.log";
1322 open STDERR, ">&STDOUT";
1323
1324 mkdir $destdir;
1325 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1326 print TARX <DATA>;
1327 if(!close(TARX)) {
1328   die "'tar -C $destdir -xf -' exited $?: $!";
1329 }
1330
1331 my $pwd;
1332 chomp ($pwd = `pwd`);
1333 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1334 mkdir $install_dir;
1335 if (-e "$destdir/crunch_scripts/install") {
1336     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1337 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1338     # Old version
1339     shell_or_die ("./tests/autotests.sh", $install_dir);
1340 } elsif (-e "./install.sh") {
1341     shell_or_die ("./install.sh", $install_dir);
1342 }
1343
1344 if ($commit) {
1345     unlink "$destdir.commit.new";
1346     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1347     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1348 }
1349
1350 close L;
1351
1352 exit 0;
1353
1354 sub shell_or_die
1355 {
1356   if ($ENV{"DEBUG"}) {
1357     print STDERR "@_\n";
1358   }
1359   system (@_) == 0
1360       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1361 }
1362
1363 __DATA__