move crunch-job to arv-crunch-job and add to arvados-cli gem
[arvados.git] / sdk / cli / bin / arv-crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 if ($ENV{"USER"} ne "crunch" && $< != 0) {
80   # use a tmp dir unique for my uid
81   $ENV{"CRUNCH_TMP"} .= "-$<";
82 }
83 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
84 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
85 mkdir ($ENV{"JOB_WORK"});
86
87 my $force_unlock;
88 my $git_dir;
89 my $jobspec;
90 my $job_api_token;
91 my $resume_stash;
92 GetOptions('force-unlock' => \$force_unlock,
93            'git-dir=s' => \$git_dir,
94            'job=s' => \$jobspec,
95            'job-api-token=s' => \$job_api_token,
96            'resume-stash=s' => \$resume_stash,
97     );
98
99 if (defined $job_api_token) {
100   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
101 }
102
103 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
104 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
105 my $local_job = !$job_has_uuid;
106
107
108 $SIG{'HUP'} = sub
109 {
110   1;
111 };
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new;
124 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
125 $metastream->clear;
126 $metastream->write_start('log.txt');
127
128 my $User = $arv->{'users'}->{'current'}->execute;
129
130 my $Job = {};
131 my $job_id;
132 my $dbh;
133 my $sth;
134 if ($job_has_uuid)
135 {
136   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
137   if (!$force_unlock) {
138     if ($Job->{'is_locked_by_uuid'}) {
139       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
140     }
141     if ($Job->{'success'} ne undef) {
142       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
143     }
144     if ($Job->{'running'}) {
145       croak("Job 'running' flag is already set");
146     }
147     if ($Job->{'started_at'}) {
148       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
149     }
150   }
151 }
152 else
153 {
154   $Job = JSON::decode_json($jobspec);
155
156   if (!$resume_stash)
157   {
158     map { croak ("No $_ specified") unless $Job->{$_} }
159     qw(script script_version script_parameters);
160   }
161
162   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
163   $Job->{'started_at'} = gmtime;
164
165   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
166
167   $job_has_uuid = 1;
168 }
169 $job_id = $Job->{'uuid'};
170
171
172
173 $Job->{'resource_limits'} ||= {};
174 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
175 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
176
177
178 Log (undef, "check slurm allocation");
179 my @slot;
180 my @node;
181 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
182 my @sinfo;
183 if (!$have_slurm)
184 {
185   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
186   push @sinfo, "$localcpus localhost";
187 }
188 if (exists $ENV{SLURM_NODELIST})
189 {
190   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
191 }
192 foreach (@sinfo)
193 {
194   my ($ncpus, $slurm_nodelist) = split;
195   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
196
197   my @nodelist;
198   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
199   {
200     my $nodelist = $1;
201     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
202     {
203       my $ranges = $1;
204       foreach (split (",", $ranges))
205       {
206         my ($a, $b);
207         if (/(\d+)-(\d+)/)
208         {
209           $a = $1;
210           $b = $2;
211         }
212         else
213         {
214           $a = $_;
215           $b = $_;
216         }
217         push @nodelist, map {
218           my $n = $nodelist;
219           $n =~ s/\[[-,\d]+\]/$_/;
220           $n;
221         } ($a..$b);
222       }
223     }
224     else
225     {
226       push @nodelist, $nodelist;
227     }
228   }
229   foreach my $nodename (@nodelist)
230   {
231     Log (undef, "node $nodename - $ncpus slots");
232     my $node = { name => $nodename,
233                  ncpus => $ncpus,
234                  losing_streak => 0,
235                  hold_until => 0 };
236     foreach my $cpu (1..$ncpus)
237     {
238       push @slot, { node => $node,
239                     cpu => $cpu };
240     }
241   }
242   push @node, @nodelist;
243 }
244
245
246
247 # Ensure that we get one jobstep running on each allocated node before
248 # we start overloading nodes with concurrent steps
249
250 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
251
252
253
254 my $jobmanager_id;
255 if ($job_has_uuid)
256 {
257   # Claim this job, and make sure nobody else does
258
259   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
260   $Job->{'started_at'} = gmtime;
261   $Job->{'running'} = 1;
262   $Job->{'success'} = undef;
263   $Job->{'tasks_summary'} = { 'failed' => 0,
264                               'todo' => 1,
265                               'running' => 0,
266                               'done' => 0 };
267   if ($job_has_uuid) {
268     unless ($Job->save() && $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
269       croak("Error while updating / locking job");
270     }
271   }
272 }
273
274
275 Log (undef, "start");
276 $SIG{'INT'} = sub { $main::please_freeze = 1; };
277 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
278 $SIG{'TERM'} = \&croak;
279 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
280 $SIG{'ALRM'} = sub { $main::please_info = 1; };
281 $SIG{'CONT'} = sub { $main::please_continue = 1; };
282 $main::please_freeze = 0;
283 $main::please_info = 0;
284 $main::please_continue = 0;
285 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
286
287 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
288 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
289 $ENV{"JOB_UUID"} = $job_id;
290
291
292 my @jobstep;
293 my @jobstep_todo = ();
294 my @jobstep_done = ();
295 my @jobstep_tomerge = ();
296 my $jobstep_tomerge_level = 0;
297 my $squeue_checked;
298 my $squeue_kill_checked;
299 my $output_in_keep = 0;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'attempts' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 }
333 else
334 {
335   do {
336     local $/ = undef;
337     $build_script = <DATA>;
338   };
339   Log (undef, "Install revision ".$Job->{script_version});
340   my $nodelist = join(",", @node);
341
342   # Clean out crunch_tmp/work and crunch_tmp/opt
343
344   my $cleanpid = fork();
345   if ($cleanpid == 0)
346   {
347     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
349     exit (1);
350   }
351   while (1)
352   {
353     last if $cleanpid == waitpid (-1, WNOHANG);
354     freeze_if_want_freeze ($cleanpid);
355     select (undef, undef, undef, 0.1);
356   }
357   Log (undef, "Clean-work-dir exited $?");
358
359   # Install requested code version
360
361   my @execargs;
362   my @srunargs = ("srun",
363                   "--nodelist=$nodelist",
364                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
365
366   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
369
370   my $commit;
371   my $git_archive;
372   my $treeish = $Job->{'script_version'};
373   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374   # Todo: let script_version specify repository instead of expecting
375   # parent process to figure it out.
376   $ENV{"CRUNCH_SRC_URL"} = $repo;
377
378   # Create/update our clone of the remote git repo
379
380   if (!-d $ENV{"CRUNCH_SRC"}) {
381     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382         or croak ("git clone $repo failed: exit ".($?>>8));
383     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
384   }
385   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
386
387   # If this looks like a subversion r#, look for it in git-svn commit messages
388
389   if ($treeish =~ m{^\d{1,4}$}) {
390     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
391     chomp $gitlog;
392     if ($gitlog =~ /^[a-f0-9]{40}$/) {
393       $commit = $gitlog;
394       Log (undef, "Using commit $commit for script_version $treeish");
395     }
396   }
397
398   # If that didn't work, try asking git to look it up as a tree-ish.
399
400   if (!defined $commit) {
401
402     my $cooked_treeish = $treeish;
403     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404       # Looks like a git branch name -- make sure git knows it's
405       # relative to the remote repo
406       $cooked_treeish = "origin/$treeish";
407     }
408
409     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
410     chomp $found;
411     if ($found =~ /^[0-9a-f]{40}$/s) {
412       $commit = $found;
413       if ($commit ne $treeish) {
414         # Make sure we record the real commit id in the database,
415         # frozentokey, logs, etc. -- instead of an abbreviation or a
416         # branch name which can become ambiguous or point to a
417         # different commit in the future.
418         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419         Log (undef, "Using commit $commit for tree-ish $treeish");
420         if ($commit ne $treeish) {
421           $Job->{'script_version'} = $commit;
422           !$job_has_uuid or $Job->save() or croak("Error while updating job");
423         }
424       }
425     }
426   }
427
428   if (defined $commit) {
429     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430     @execargs = ("sh", "-c",
431                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
432     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
433   }
434   else {
435     croak ("could not figure out commit id for $treeish");
436   }
437
438   my $installpid = fork();
439   if ($installpid == 0)
440   {
441     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
442     exit (1);
443   }
444   while (1)
445   {
446     last if $installpid == waitpid (-1, WNOHANG);
447     freeze_if_want_freeze ($installpid);
448     select (undef, undef, undef, 0.1);
449   }
450   Log (undef, "Install exited $?");
451 }
452
453
454
455 foreach (qw (script script_version script_parameters resource_limits))
456 {
457   Log (undef,
458        "$_ " .
459        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
460 }
461 foreach (split (/\n/, $Job->{knobs}))
462 {
463   Log (undef, "knob " . $_);
464 }
465
466
467
468 my $success;
469
470
471
472 ONELEVEL:
473
474 my $thisround_succeeded = 0;
475 my $thisround_failed = 0;
476 my $thisround_failed_multiple = 0;
477
478 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
479                        or $a <=> $b } @jobstep_todo;
480 my $level = $jobstep[$jobstep_todo[0]]->{level};
481 Log (undef, "start level $level");
482
483
484
485 my %proc;
486 my @freeslot = (0..$#slot);
487 my @holdslot;
488 my %reader;
489 my $progress_is_dirty = 1;
490 my $progress_stats_updated = 0;
491
492 update_progress_stats();
493
494
495
496 THISROUND:
497 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
498 {
499   $main::please_continue = 0;
500
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507   if ($Jobstep->{attempts} > 9)
508   {
509     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
510     $success = 0;
511     last THISROUND;
512   }
513
514   pipe $reader{$id}, "writer" or croak ($!);
515   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
516   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
517
518   my $childslot = $freeslot[0];
519   my $childnode = $slot[$childslot]->{node};
520   my $childslotname = join (".",
521                             $slot[$childslot]->{node}->{name},
522                             $slot[$childslot]->{cpu});
523   my $childpid = fork();
524   if ($childpid == 0)
525   {
526     $SIG{'INT'} = 'DEFAULT';
527     $SIG{'QUIT'} = 'DEFAULT';
528     $SIG{'TERM'} = 'DEFAULT';
529
530     foreach (values (%reader))
531     {
532       close($_);
533     }
534     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
535     open(STDOUT,">&writer");
536     open(STDERR,">&writer");
537
538     undef $dbh;
539     undef $sth;
540
541     delete $ENV{"GNUPGHOME"};
542     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
543     $ENV{"TASK_QSEQUENCE"} = $id;
544     $ENV{"TASK_SEQUENCE"} = $level;
545     $ENV{"JOB_SCRIPT"} = $Job->{script};
546     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
547       $param =~ tr/a-z/A-Z/;
548       $ENV{"JOB_PARAMETER_$param"} = $value;
549     }
550     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
551     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
552     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
553     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
554     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
555
556     $ENV{"GZIP"} = "-n";
557
558     my @srunargs = (
559       "srun",
560       "--nodelist=".$childnode->{name},
561       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
562       "--job-name=$job_id.$id.$$",
563         );
564     my @execargs = qw(sh);
565     my $build_script_to_send = "";
566     my $command =
567         "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
568         ."&& cd $ENV{CRUNCH_TMP} ";
569     if ($build_script)
570     {
571       $build_script_to_send = $build_script;
572       $command .=
573           "&& perl -";
574     }
575     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
576     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
577     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
578     $command .=
579         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580     my @execargs = ('bash', '-c', $command);
581     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
582     exit (1);
583   }
584   close("writer");
585   if (!defined $childpid)
586   {
587     close $reader{$id};
588     delete $reader{$id};
589     next;
590   }
591   shift @freeslot;
592   $proc{$childpid} = { jobstep => $id,
593                        time => time,
594                        slot => $childslot,
595                        jobstepname => "$job_id.$id.$childpid",
596                      };
597   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598   $slot[$childslot]->{pid} = $childpid;
599
600   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601   Log ($id, "child $childpid started on $childslotname");
602   $Jobstep->{attempts} ++;
603   $Jobstep->{starttime} = time;
604   $Jobstep->{node} = $childnode->{name};
605   $Jobstep->{slotindex} = $childslot;
606   delete $Jobstep->{stderr};
607   delete $Jobstep->{finishtime};
608
609   splice @jobstep_todo, $todo_ptr, 1;
610   --$todo_ptr;
611
612   $progress_is_dirty = 1;
613
614   while (!@freeslot
615          ||
616          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
617   {
618     last THISROUND if $main::please_freeze;
619     if ($main::please_info)
620     {
621       $main::please_info = 0;
622       freeze();
623       collate_output();
624       save_meta(1);
625       update_progress_stats();
626     }
627     my $gotsome
628         = readfrompipes ()
629         + reapchildren ();
630     if (!$gotsome)
631     {
632       check_squeue();
633       update_progress_stats();
634       select (undef, undef, undef, 0.1);
635     }
636     elsif (time - $progress_stats_updated >= 30)
637     {
638       update_progress_stats();
639     }
640     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
642     {
643       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644           .($thisround_failed+$thisround_succeeded)
645           .") -- giving up on this round";
646       Log (undef, $message);
647       last THISROUND;
648     }
649
650     # move slots from freeslot to holdslot (or back to freeslot) if necessary
651     for (my $i=$#freeslot; $i>=0; $i--) {
652       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653         push @holdslot, (splice @freeslot, $i, 1);
654       }
655     }
656     for (my $i=$#holdslot; $i>=0; $i--) {
657       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658         push @freeslot, (splice @holdslot, $i, 1);
659       }
660     }
661
662     # give up if no nodes are succeeding
663     if (!grep { $_->{node}->{losing_streak} == 0 &&
664                     $_->{node}->{hold_count} < 4 } @slot) {
665       my $message = "Every node has failed -- giving up on this round";
666       Log (undef, $message);
667       last THISROUND;
668     }
669   }
670 }
671
672
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
675
676
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
678 while (%proc)
679 {
680   goto THISROUND if $main::please_continue;
681   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
682   readfrompipes ();
683   if (!reapchildren())
684   {
685     check_squeue();
686     update_progress_stats();
687     select (undef, undef, undef, 0.1);
688     killem (keys %proc) if $main::please_freeze;
689   }
690 }
691
692 update_progress_stats();
693 freeze_if_want_freeze();
694
695
696 if (!defined $success)
697 {
698   if (@jobstep_todo &&
699       $thisround_succeeded == 0 &&
700       ($thisround_failed == 0 || $thisround_failed > 4))
701   {
702     my $message = "stop because $thisround_failed tasks failed and none succeeded";
703     Log (undef, $message);
704     $success = 0;
705   }
706   if (!@jobstep_todo)
707   {
708     $success = 1;
709   }
710 }
711
712 goto ONELEVEL if !defined $success;
713
714
715 release_allocation();
716 freeze();
717 $Job->reload;
718 $Job->{'output'} = &collate_output();
719 $Job->{'running'} = 0;
720 $Job->{'success'} = $Job->{'output'} && $success;
721 $Job->{'finished_at'} = gmtime;
722 $Job->save if $job_has_uuid;
723
724 if ($Job->{'output'})
725 {
726   eval {
727     my $manifest_text = capturex("whget", $Job->{'output'});
728     $arv->{'collections'}->{'create'}->execute('collection' => {
729       'uuid' => $Job->{'output'},
730       'manifest_text' => $manifest_text,
731     });
732   };
733   if ($@) {
734     Log (undef, "Failed to register output manifest: $@");
735   }
736 }
737
738 Log (undef, "finish");
739
740 save_meta();
741 exit 0;
742
743
744
745 sub update_progress_stats
746 {
747   $progress_stats_updated = time;
748   return if !$progress_is_dirty;
749   my ($todo, $done, $running) = (scalar @jobstep_todo,
750                                  scalar @jobstep_done,
751                                  scalar @slot - scalar @freeslot - scalar @holdslot);
752   $Job->{'tasks_summary'} ||= {};
753   $Job->{'tasks_summary'}->{'todo'} = $todo;
754   $Job->{'tasks_summary'}->{'done'} = $done;
755   $Job->{'tasks_summary'}->{'running'} = $running;
756   $Job->save if $job_has_uuid;
757   Log (undef, "status: $done done, $running running, $todo todo");
758   $progress_is_dirty = 0;
759 }
760
761
762
763 sub reapchildren
764 {
765   my $pid = waitpid (-1, WNOHANG);
766   return 0 if $pid <= 0;
767
768   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
769                   . "."
770                   . $slot[$proc{$pid}->{slot}]->{cpu});
771   my $jobstepid = $proc{$pid}->{jobstep};
772   my $elapsed = time - $proc{$pid}->{time};
773   my $Jobstep = $jobstep[$jobstepid];
774
775   my $exitcode = $?;
776   my $exitinfo = "exit $exitcode";
777   $Jobstep->{'arvados_task'}->reload;
778   my $success = $Jobstep->{'arvados_task'}->{success};
779
780   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
781
782   if (!defined $success) {
783     # task did not indicate one way or the other --> fail
784     $Jobstep->{'arvados_task'}->{success} = 0;
785     $Jobstep->{'arvados_task'}->save;
786     $success = 0;
787   }
788
789   if (!$success)
790   {
791     my $no_incr_attempts;
792     $no_incr_attempts = 1 if $Jobstep->{node_fail};
793
794     ++$thisround_failed;
795     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
796
797     # Check for signs of a failed or misconfigured node
798     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
799         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
800       # Don't count this against jobstep failure thresholds if this
801       # node is already suspected faulty and srun exited quickly
802       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
803           $elapsed < 5 &&
804           $Jobstep->{attempts} > 1) {
805         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
806         $no_incr_attempts = 1;
807         --$Jobstep->{attempts};
808       }
809       ban_node_by_slot($proc{$pid}->{slot});
810     }
811
812     push @jobstep_todo, $jobstepid;
813     Log ($jobstepid, "failure in $elapsed seconds");
814
815     --$Jobstep->{attempts} if $no_incr_attempts;
816     $Job->{'tasks_summary'}->{'failed'}++;
817   }
818   else
819   {
820     ++$thisround_succeeded;
821     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
822     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
823     push @jobstep_done, $jobstepid;
824     Log ($jobstepid, "success in $elapsed seconds");
825   }
826   $Jobstep->{exitcode} = $exitcode;
827   $Jobstep->{finishtime} = time;
828   process_stderr ($jobstepid, $success);
829   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
830
831   close $reader{$jobstepid};
832   delete $reader{$jobstepid};
833   delete $slot[$proc{$pid}->{slot}]->{pid};
834   push @freeslot, $proc{$pid}->{slot};
835   delete $proc{$pid};
836
837   # Load new tasks
838   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
839     'where' => {
840       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
841     },
842     'order' => 'qsequence'
843   );
844   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
845     my $jobstep = {
846       'level' => $arvados_task->{'sequence'},
847       'attempts' => 0,
848       'arvados_task' => $arvados_task
849     };
850     push @jobstep, $jobstep;
851     push @jobstep_todo, $#jobstep;
852   }
853
854   $progress_is_dirty = 1;
855   1;
856 }
857
858
859 sub check_squeue
860 {
861   # return if the kill list was checked <4 seconds ago
862   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
863   {
864     return;
865   }
866   $squeue_kill_checked = time;
867
868   # use killem() on procs whose killtime is reached
869   for (keys %proc)
870   {
871     if (exists $proc{$_}->{killtime}
872         && $proc{$_}->{killtime} <= time)
873     {
874       killem ($_);
875     }
876   }
877
878   # return if the squeue was checked <60 seconds ago
879   if (defined $squeue_checked && $squeue_checked > time - 60)
880   {
881     return;
882   }
883   $squeue_checked = time;
884
885   if (!$have_slurm)
886   {
887     # here is an opportunity to check for mysterious problems with local procs
888     return;
889   }
890
891   # get a list of steps still running
892   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
893   chop @squeue;
894   if ($squeue[-1] ne "ok")
895   {
896     return;
897   }
898   pop @squeue;
899
900   # which of my jobsteps are running, according to squeue?
901   my %ok;
902   foreach (@squeue)
903   {
904     if (/^(\d+)\.(\d+) (\S+)/)
905     {
906       if ($1 eq $ENV{SLURM_JOBID})
907       {
908         $ok{$3} = 1;
909       }
910     }
911   }
912
913   # which of my active child procs (>60s old) were not mentioned by squeue?
914   foreach (keys %proc)
915   {
916     if ($proc{$_}->{time} < time - 60
917         && !exists $ok{$proc{$_}->{jobstepname}}
918         && !exists $proc{$_}->{killtime})
919     {
920       # kill this proc if it hasn't exited in 30 seconds
921       $proc{$_}->{killtime} = time + 30;
922     }
923   }
924 }
925
926
927 sub release_allocation
928 {
929   if ($have_slurm)
930   {
931     Log (undef, "release job allocation");
932     system "scancel $ENV{SLURM_JOBID}";
933   }
934 }
935
936
937 sub readfrompipes
938 {
939   my $gotsome = 0;
940   foreach my $job (keys %reader)
941   {
942     my $buf;
943     while (0 < sysread ($reader{$job}, $buf, 8192))
944     {
945       print STDERR $buf if $ENV{CRUNCH_DEBUG};
946       $jobstep[$job]->{stderr} .= $buf;
947       preprocess_stderr ($job);
948       if (length ($jobstep[$job]->{stderr}) > 16384)
949       {
950         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
951       }
952       $gotsome = 1;
953     }
954   }
955   return $gotsome;
956 }
957
958
959 sub preprocess_stderr
960 {
961   my $job = shift;
962
963   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
964     my $line = $1;
965     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
966     Log ($job, "stderr $line");
967     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
968       # whoa.
969       $main::please_freeze = 1;
970     }
971     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
972       $jobstep[$job]->{node_fail} = 1;
973       ban_node_by_slot($jobstep[$job]->{slotindex});
974     }
975   }
976 }
977
978
979 sub process_stderr
980 {
981   my $job = shift;
982   my $success = shift;
983   preprocess_stderr ($job);
984
985   map {
986     Log ($job, "stderr $_");
987   } split ("\n", $jobstep[$job]->{stderr});
988 }
989
990
991 sub collate_output
992 {
993   my $whc = Warehouse->new;
994   Log (undef, "collate");
995   $whc->write_start (1);
996   my $joboutput;
997   for (@jobstep)
998   {
999     next if (!exists $_->{'arvados_task'}->{output} ||
1000              !$_->{'arvados_task'}->{'success'} ||
1001              $_->{'exitcode'} != 0);
1002     my $output = $_->{'arvados_task'}->{output};
1003     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1004     {
1005       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1006       $whc->write_data ($output);
1007     }
1008     elsif (@jobstep == 1)
1009     {
1010       $joboutput = $output;
1011       $whc->write_finish;
1012     }
1013     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1014     {
1015       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1016       $whc->write_data ($outblock);
1017     }
1018     else
1019     {
1020       my $errstr = $whc->errstr;
1021       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1022       $success = 0;
1023     }
1024   }
1025   $joboutput = $whc->write_finish if !defined $joboutput;
1026   if ($joboutput)
1027   {
1028     Log (undef, "output $joboutput");
1029     $Job->{'output'} = $joboutput;
1030     $Job->save if $job_has_uuid;
1031   }
1032   else
1033   {
1034     Log (undef, "output undef");
1035   }
1036   return $joboutput;
1037 }
1038
1039
1040 sub killem
1041 {
1042   foreach (@_)
1043   {
1044     my $sig = 2;                # SIGINT first
1045     if (exists $proc{$_}->{"sent_$sig"} &&
1046         time - $proc{$_}->{"sent_$sig"} > 4)
1047     {
1048       $sig = 15;                # SIGTERM if SIGINT doesn't work
1049     }
1050     if (exists $proc{$_}->{"sent_$sig"} &&
1051         time - $proc{$_}->{"sent_$sig"} > 4)
1052     {
1053       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1054     }
1055     if (!exists $proc{$_}->{"sent_$sig"})
1056     {
1057       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1058       kill $sig, $_;
1059       select (undef, undef, undef, 0.1);
1060       if ($sig == 2)
1061       {
1062         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1063       }
1064       $proc{$_}->{"sent_$sig"} = time;
1065       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1066     }
1067   }
1068 }
1069
1070
1071 sub fhbits
1072 {
1073   my($bits);
1074   for (@_) {
1075     vec($bits,fileno($_),1) = 1;
1076   }
1077   $bits;
1078 }
1079
1080
1081 sub Log                         # ($jobstep_id, $logmessage)
1082 {
1083   if ($_[1] =~ /\n/) {
1084     for my $line (split (/\n/, $_[1])) {
1085       Log ($_[0], $line);
1086     }
1087     return;
1088   }
1089   my $fh = select STDERR; $|=1; select $fh;
1090   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1091   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1092   $message .= "\n";
1093   my $datetime;
1094   if ($metastream || -t STDERR) {
1095     my @gmtime = gmtime;
1096     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1097                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1098   }
1099   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1100
1101   return if !$metastream;
1102   $metastream->write_data ($datetime . " " . $message);
1103 }
1104
1105
1106 sub croak
1107 {
1108   my ($package, $file, $line) = caller;
1109   my $message = "@_ at $file line $line\n";
1110   Log (undef, $message);
1111   freeze() if @jobstep_todo;
1112   collate_output() if @jobstep_todo;
1113   cleanup();
1114   save_meta() if $metastream;
1115   die;
1116 }
1117
1118
1119 sub cleanup
1120 {
1121   return if !$job_has_uuid;
1122   $Job->reload;
1123   $Job->{'running'} = 0;
1124   $Job->{'success'} = 0;
1125   $Job->{'finished_at'} = gmtime;
1126   $Job->save;
1127 }
1128
1129
1130 sub save_meta
1131 {
1132   my $justcheckpoint = shift; # false if this will be the last meta saved
1133   my $m = $metastream;
1134   $m = $m->copy if $justcheckpoint;
1135   $m->write_finish;
1136   my $loglocator = $m->as_key;
1137   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1138   Log (undef, "meta key is $loglocator");
1139   $Job->{'log'} = $loglocator;
1140   $Job->save if $job_has_uuid;
1141 }
1142
1143
1144 sub freeze_if_want_freeze
1145 {
1146   if ($main::please_freeze)
1147   {
1148     release_allocation();
1149     if (@_)
1150     {
1151       # kill some srun procs before freeze+stop
1152       map { $proc{$_} = {} } @_;
1153       while (%proc)
1154       {
1155         killem (keys %proc);
1156         select (undef, undef, undef, 0.1);
1157         my $died;
1158         while (($died = waitpid (-1, WNOHANG)) > 0)
1159         {
1160           delete $proc{$died};
1161         }
1162       }
1163     }
1164     freeze();
1165     collate_output();
1166     cleanup();
1167     save_meta();
1168     exit 0;
1169   }
1170 }
1171
1172
1173 sub freeze
1174 {
1175   Log (undef, "Freeze not implemented");
1176   return;
1177 }
1178
1179
1180 sub thaw
1181 {
1182   croak ("Thaw not implemented");
1183
1184   my $whc;
1185   my $key = shift;
1186   Log (undef, "thaw from $key");
1187
1188   @jobstep = ();
1189   @jobstep_done = ();
1190   @jobstep_todo = ();
1191   @jobstep_tomerge = ();
1192   $jobstep_tomerge_level = 0;
1193   my $frozenjob = {};
1194
1195   my $stream = new Warehouse::Stream ( whc => $whc,
1196                                        hash => [split (",", $key)] );
1197   $stream->rewind;
1198   while (my $dataref = $stream->read_until (undef, "\n\n"))
1199   {
1200     if ($$dataref =~ /^job /)
1201     {
1202       foreach (split ("\n", $$dataref))
1203       {
1204         my ($k, $v) = split ("=", $_, 2);
1205         $frozenjob->{$k} = freezeunquote ($v);
1206       }
1207       next;
1208     }
1209
1210     if ($$dataref =~ /^merge (\d+) (.*)/)
1211     {
1212       $jobstep_tomerge_level = $1;
1213       @jobstep_tomerge
1214           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1215       next;
1216     }
1217
1218     my $Jobstep = { };
1219     foreach (split ("\n", $$dataref))
1220     {
1221       my ($k, $v) = split ("=", $_, 2);
1222       $Jobstep->{$k} = freezeunquote ($v) if $k;
1223     }
1224     $Jobstep->{attempts} = 0;
1225     push @jobstep, $Jobstep;
1226
1227     if ($Jobstep->{exitcode} eq "0")
1228     {
1229       push @jobstep_done, $#jobstep;
1230     }
1231     else
1232     {
1233       push @jobstep_todo, $#jobstep;
1234     }
1235   }
1236
1237   foreach (qw (script script_version script_parameters))
1238   {
1239     $Job->{$_} = $frozenjob->{$_};
1240   }
1241   $Job->save if $job_has_uuid;
1242 }
1243
1244
1245 sub freezequote
1246 {
1247   my $s = shift;
1248   $s =~ s/\\/\\\\/g;
1249   $s =~ s/\n/\\n/g;
1250   return $s;
1251 }
1252
1253
1254 sub freezeunquote
1255 {
1256   my $s = shift;
1257   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1258   return $s;
1259 }
1260
1261
1262 sub srun
1263 {
1264   my $srunargs = shift;
1265   my $execargs = shift;
1266   my $opts = shift || {};
1267   my $stdin = shift;
1268   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1269   print STDERR (join (" ",
1270                       map { / / ? "'$_'" : $_ }
1271                       (@$args)),
1272                 "\n")
1273       if $ENV{CRUNCH_DEBUG};
1274
1275   if (defined $stdin) {
1276     my $child = open STDIN, "-|";
1277     defined $child or die "no fork: $!";
1278     if ($child == 0) {
1279       print $stdin or die $!;
1280       close STDOUT or die $!;
1281       exit 0;
1282     }
1283   }
1284
1285   return system (@$args) if $opts->{fork};
1286
1287   exec @$args;
1288   warn "ENV size is ".length(join(" ",%ENV));
1289   die "exec failed: $!: @$args";
1290 }
1291
1292
1293 sub ban_node_by_slot {
1294   # Don't start any new jobsteps on this node for 60 seconds
1295   my $slotid = shift;
1296   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1297   $slot[$slotid]->{node}->{hold_count}++;
1298   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1299 }
1300
1301 __DATA__
1302 #!/usr/bin/perl
1303
1304 # checkout-and-build
1305
1306 use Fcntl ':flock';
1307
1308 my $destdir = $ENV{"CRUNCH_SRC"};
1309 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1310 my $repo = $ENV{"CRUNCH_SRC_URL"};
1311
1312 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1313 flock L, LOCK_EX;
1314 if (readlink ("$destdir.commit") eq $commit) {
1315     exit 0;
1316 }
1317
1318 unlink "$destdir.commit";
1319 open STDOUT, ">", "$destdir.log";
1320 open STDERR, ">&STDOUT";
1321
1322 mkdir $destdir;
1323 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1324 print TARX <DATA>;
1325 if(!close(TARX)) {
1326   die "'tar -C $destdir -xf -' exited $?: $!";
1327 }
1328
1329 my $pwd;
1330 chomp ($pwd = `pwd`);
1331 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1332 mkdir $install_dir;
1333 if (-e "$destdir/crunch_scripts/install") {
1334     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1335 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1336     # Old version
1337     shell_or_die ("./tests/autotests.sh", $install_dir);
1338 } elsif (-e "./install.sh") {
1339     shell_or_die ("./install.sh", $install_dir);
1340 }
1341
1342 if ($commit) {
1343     unlink "$destdir.commit.new";
1344     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1345     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1346 }
1347
1348 close L;
1349
1350 exit 0;
1351
1352 sub shell_or_die
1353 {
1354   if ($ENV{"DEBUG"}) {
1355     print STDERR "@_\n";
1356   }
1357   system (@_) == 0
1358       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1359 }
1360
1361 __DATA__