Clear TASK_WORK before starting a task.
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
88 mkdir ($ENV{"JOB_WORK"});
89
90 my $force_unlock;
91 my $git_dir;
92 my $jobspec;
93 my $job_api_token;
94 my $resume_stash;
95 GetOptions('force-unlock' => \$force_unlock,
96            'git-dir=s' => \$git_dir,
97            'job=s' => \$jobspec,
98            'job-api-token=s' => \$job_api_token,
99            'resume-stash=s' => \$resume_stash,
100     );
101
102 if (defined $job_api_token) {
103   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
104 }
105
106 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
107 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
108 my $local_job = !$job_has_uuid;
109
110
111 $SIG{'USR1'} = sub
112 {
113   $main::ENV{CRUNCH_DEBUG} = 1;
114 };
115 $SIG{'USR2'} = sub
116 {
117   $main::ENV{CRUNCH_DEBUG} = 0;
118 };
119
120
121
122 my $arv = Arvados->new;
123 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
124 $metastream->clear;
125 $metastream->write_start('log.txt');
126
127 my $User = $arv->{'users'}->{'current'}->execute;
128
129 my $Job = {};
130 my $job_id;
131 my $dbh;
132 my $sth;
133 if ($job_has_uuid)
134 {
135   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
136   if (!$force_unlock) {
137     if ($Job->{'is_locked_by_uuid'}) {
138       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
139     }
140     if ($Job->{'success'} ne undef) {
141       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
142     }
143     if ($Job->{'running'}) {
144       croak("Job 'running' flag is already set");
145     }
146     if ($Job->{'started_at'}) {
147       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
148     }
149   }
150 }
151 else
152 {
153   $Job = JSON::decode_json($jobspec);
154
155   if (!$resume_stash)
156   {
157     map { croak ("No $_ specified") unless $Job->{$_} }
158     qw(script script_version script_parameters);
159   }
160
161   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
162   $Job->{'started_at'} = gmtime;
163
164   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
165
166   $job_has_uuid = 1;
167 }
168 $job_id = $Job->{'uuid'};
169
170
171
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
175
176
177 Log (undef, "check slurm allocation");
178 my @slot;
179 my @node;
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
181 my @sinfo;
182 if (!$have_slurm)
183 {
184   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185   push @sinfo, "$localcpus localhost";
186 }
187 if (exists $ENV{SLURM_NODELIST})
188 {
189   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
190 }
191 foreach (@sinfo)
192 {
193   my ($ncpus, $slurm_nodelist) = split;
194   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
195
196   my @nodelist;
197   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
198   {
199     my $nodelist = $1;
200     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
201     {
202       my $ranges = $1;
203       foreach (split (",", $ranges))
204       {
205         my ($a, $b);
206         if (/(\d+)-(\d+)/)
207         {
208           $a = $1;
209           $b = $2;
210         }
211         else
212         {
213           $a = $_;
214           $b = $_;
215         }
216         push @nodelist, map {
217           my $n = $nodelist;
218           $n =~ s/\[[-,\d]+\]/$_/;
219           $n;
220         } ($a..$b);
221       }
222     }
223     else
224     {
225       push @nodelist, $nodelist;
226     }
227   }
228   foreach my $nodename (@nodelist)
229   {
230     Log (undef, "node $nodename - $ncpus slots");
231     my $node = { name => $nodename,
232                  ncpus => $ncpus,
233                  losing_streak => 0,
234                  hold_until => 0 };
235     foreach my $cpu (1..$ncpus)
236     {
237       push @slot, { node => $node,
238                     cpu => $cpu };
239     }
240   }
241   push @node, @nodelist;
242 }
243
244
245
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
248
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250
251
252
253 my $jobmanager_id;
254 if ($job_has_uuid)
255 {
256   # Claim this job, and make sure nobody else does
257   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261   $Job->update_attributes('started_at' => scalar gmtime,
262                           'running' => 1,
263                           'success' => undef,
264                           'tasks_summary' => { 'failed' => 0,
265                                                'todo' => 1,
266                                                'running' => 0,
267                                                'done' => 0 });
268 }
269
270
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
279
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
285
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
289
290
291 my @jobstep;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
296 my $squeue_checked;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299 my $latest_refresh = scalar time;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'failures' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332 }
333 else
334 {
335   do {
336     local $/ = undef;
337     $build_script = <DATA>;
338   };
339   Log (undef, "Install revision ".$Job->{script_version});
340   my $nodelist = join(",", @node);
341
342   # Clean out crunch_tmp/work and crunch_tmp/opt
343
344   my $cleanpid = fork();
345   if ($cleanpid == 0)
346   {
347     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
348           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']);
349     exit (1);
350   }
351   while (1)
352   {
353     last if $cleanpid == waitpid (-1, WNOHANG);
354     freeze_if_want_freeze ($cleanpid);
355     select (undef, undef, undef, 0.1);
356   }
357   Log (undef, "Clean-work-dir exited $?");
358
359   # Install requested code version
360
361   my @execargs;
362   my @srunargs = ("srun",
363                   "--nodelist=$nodelist",
364                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
365
366   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
367   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
368   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
369
370   my $commit;
371   my $git_archive;
372   my $treeish = $Job->{'script_version'};
373   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
374   # Todo: let script_version specify repository instead of expecting
375   # parent process to figure it out.
376   $ENV{"CRUNCH_SRC_URL"} = $repo;
377
378   # Create/update our clone of the remote git repo
379
380   if (!-d $ENV{"CRUNCH_SRC"}) {
381     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
382         or croak ("git clone $repo failed: exit ".($?>>8));
383     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
384   }
385   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
386
387   # If this looks like a subversion r#, look for it in git-svn commit messages
388
389   if ($treeish =~ m{^\d{1,4}$}) {
390     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
391     chomp $gitlog;
392     if ($gitlog =~ /^[a-f0-9]{40}$/) {
393       $commit = $gitlog;
394       Log (undef, "Using commit $commit for script_version $treeish");
395     }
396   }
397
398   # If that didn't work, try asking git to look it up as a tree-ish.
399
400   if (!defined $commit) {
401
402     my $cooked_treeish = $treeish;
403     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
404       # Looks like a git branch name -- make sure git knows it's
405       # relative to the remote repo
406       $cooked_treeish = "origin/$treeish";
407     }
408
409     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
410     chomp $found;
411     if ($found =~ /^[0-9a-f]{40}$/s) {
412       $commit = $found;
413       if ($commit ne $treeish) {
414         # Make sure we record the real commit id in the database,
415         # frozentokey, logs, etc. -- instead of an abbreviation or a
416         # branch name which can become ambiguous or point to a
417         # different commit in the future.
418         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
419         Log (undef, "Using commit $commit for tree-ish $treeish");
420         if ($commit ne $treeish) {
421           $Job->{'script_version'} = $commit;
422           !$job_has_uuid or
423               $Job->update_attributes('script_version' => $commit) or
424               croak("Error while updating job");
425         }
426       }
427     }
428   }
429
430   if (defined $commit) {
431     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
432     @execargs = ("sh", "-c",
433                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
434     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
435   }
436   else {
437     croak ("could not figure out commit id for $treeish");
438   }
439
440   my $installpid = fork();
441   if ($installpid == 0)
442   {
443     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
444     exit (1);
445   }
446   while (1)
447   {
448     last if $installpid == waitpid (-1, WNOHANG);
449     freeze_if_want_freeze ($installpid);
450     select (undef, undef, undef, 0.1);
451   }
452   Log (undef, "Install exited $?");
453 }
454
455
456
457 foreach (qw (script script_version script_parameters runtime_constraints))
458 {
459   Log (undef,
460        "$_ " .
461        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
462 }
463 foreach (split (/\n/, $Job->{knobs}))
464 {
465   Log (undef, "knob " . $_);
466 }
467
468
469
470 $main::success = undef;
471
472
473
474 ONELEVEL:
475
476 my $thisround_succeeded = 0;
477 my $thisround_failed = 0;
478 my $thisround_failed_multiple = 0;
479
480 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
481                        or $a <=> $b } @jobstep_todo;
482 my $level = $jobstep[$jobstep_todo[0]]->{level};
483 Log (undef, "start level $level");
484
485
486
487 my %proc;
488 my @freeslot = (0..$#slot);
489 my @holdslot;
490 my %reader;
491 my $progress_is_dirty = 1;
492 my $progress_stats_updated = 0;
493
494 update_progress_stats();
495
496
497
498 THISROUND:
499 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
500 {
501   my $id = $jobstep_todo[$todo_ptr];
502   my $Jobstep = $jobstep[$id];
503   if ($Jobstep->{level} != $level)
504   {
505     next;
506   }
507
508   pipe $reader{$id}, "writer" or croak ($!);
509   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
510   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
511
512   my $childslot = $freeslot[0];
513   my $childnode = $slot[$childslot]->{node};
514   my $childslotname = join (".",
515                             $slot[$childslot]->{node}->{name},
516                             $slot[$childslot]->{cpu});
517   my $childpid = fork();
518   if ($childpid == 0)
519   {
520     $SIG{'INT'} = 'DEFAULT';
521     $SIG{'QUIT'} = 'DEFAULT';
522     $SIG{'TERM'} = 'DEFAULT';
523
524     foreach (values (%reader))
525     {
526       close($_);
527     }
528     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
529     open(STDOUT,">&writer");
530     open(STDERR,">&writer");
531
532     undef $dbh;
533     undef $sth;
534
535     delete $ENV{"GNUPGHOME"};
536     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
537     $ENV{"TASK_QSEQUENCE"} = $id;
538     $ENV{"TASK_SEQUENCE"} = $level;
539     $ENV{"JOB_SCRIPT"} = $Job->{script};
540     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
541       $param =~ tr/a-z/A-Z/;
542       $ENV{"JOB_PARAMETER_$param"} = $value;
543     }
544     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
545     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
546     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
547     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
548     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
549
550     $ENV{"GZIP"} = "-n";
551
552     my @srunargs = (
553       "srun",
554       "--nodelist=".$childnode->{name},
555       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
556       "--job-name=$job_id.$id.$$",
557         );
558     my @execargs = qw(sh);
559     my $build_script_to_send = "";
560     my $command =
561         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
562         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} "
563         ."&& cd $ENV{CRUNCH_TMP} ";
564     if ($build_script)
565     {
566       $build_script_to_send = $build_script;
567       $command .=
568           "&& perl -";
569     }
570     $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"};
571     $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack
572     $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack
573     $command .=
574         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
575     my @execargs = ('bash', '-c', $command);
576     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
577     exit (111);
578   }
579   close("writer");
580   if (!defined $childpid)
581   {
582     close $reader{$id};
583     delete $reader{$id};
584     next;
585   }
586   shift @freeslot;
587   $proc{$childpid} = { jobstep => $id,
588                        time => time,
589                        slot => $childslot,
590                        jobstepname => "$job_id.$id.$childpid",
591                      };
592   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
593   $slot[$childslot]->{pid} = $childpid;
594
595   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
596   Log ($id, "child $childpid started on $childslotname");
597   $Jobstep->{starttime} = time;
598   $Jobstep->{node} = $childnode->{name};
599   $Jobstep->{slotindex} = $childslot;
600   delete $Jobstep->{stderr};
601   delete $Jobstep->{finishtime};
602
603   splice @jobstep_todo, $todo_ptr, 1;
604   --$todo_ptr;
605
606   $progress_is_dirty = 1;
607
608   while (!@freeslot
609          ||
610          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
611   {
612     last THISROUND if $main::please_freeze;
613     if ($main::please_info)
614     {
615       $main::please_info = 0;
616       freeze();
617       collate_output();
618       save_meta(1);
619       update_progress_stats();
620     }
621     my $gotsome
622         = readfrompipes ()
623         + reapchildren ();
624     if (!$gotsome)
625     {
626       check_refresh_wanted();
627       check_squeue();
628       update_progress_stats();
629       select (undef, undef, undef, 0.1);
630     }
631     elsif (time - $progress_stats_updated >= 30)
632     {
633       update_progress_stats();
634     }
635     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
636         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
637     {
638       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
639           .($thisround_failed+$thisround_succeeded)
640           .") -- giving up on this round";
641       Log (undef, $message);
642       last THISROUND;
643     }
644
645     # move slots from freeslot to holdslot (or back to freeslot) if necessary
646     for (my $i=$#freeslot; $i>=0; $i--) {
647       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
648         push @holdslot, (splice @freeslot, $i, 1);
649       }
650     }
651     for (my $i=$#holdslot; $i>=0; $i--) {
652       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
653         push @freeslot, (splice @holdslot, $i, 1);
654       }
655     }
656
657     # give up if no nodes are succeeding
658     if (!grep { $_->{node}->{losing_streak} == 0 &&
659                     $_->{node}->{hold_count} < 4 } @slot) {
660       my $message = "Every node has failed -- giving up on this round";
661       Log (undef, $message);
662       last THISROUND;
663     }
664   }
665 }
666
667
668 push @freeslot, splice @holdslot;
669 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
670
671
672 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
673 while (%proc)
674 {
675   if ($main::please_continue) {
676     $main::please_continue = 0;
677     goto THISROUND;
678   }
679   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
680   readfrompipes ();
681   if (!reapchildren())
682   {
683     check_refresh_wanted();
684     check_squeue();
685     update_progress_stats();
686     select (undef, undef, undef, 0.1);
687     killem (keys %proc) if $main::please_freeze;
688   }
689 }
690
691 update_progress_stats();
692 freeze_if_want_freeze();
693
694
695 if (!defined $main::success)
696 {
697   if (@jobstep_todo &&
698       $thisround_succeeded == 0 &&
699       ($thisround_failed == 0 || $thisround_failed > 4))
700   {
701     my $message = "stop because $thisround_failed tasks failed and none succeeded";
702     Log (undef, $message);
703     $main::success = 0;
704   }
705   if (!@jobstep_todo)
706   {
707     $main::success = 1;
708   }
709 }
710
711 goto ONELEVEL if !defined $main::success;
712
713
714 release_allocation();
715 freeze();
716 if ($job_has_uuid) {
717   $Job->update_attributes('output' => &collate_output(),
718                           'running' => 0,
719                           'success' => $Job->{'output'} && $main::success,
720                           'finished_at' => scalar gmtime)
721 }
722
723 if ($Job->{'output'})
724 {
725   eval {
726     my $manifest_text = capturex("whget", $Job->{'output'});
727     $arv->{'collections'}->{'create'}->execute('collection' => {
728       'uuid' => $Job->{'output'},
729       'manifest_text' => $manifest_text,
730     });
731   };
732   if ($@) {
733     Log (undef, "Failed to register output manifest: $@");
734   }
735 }
736
737 Log (undef, "finish");
738
739 save_meta();
740 exit 0;
741
742
743
744 sub update_progress_stats
745 {
746   $progress_stats_updated = time;
747   return if !$progress_is_dirty;
748   my ($todo, $done, $running) = (scalar @jobstep_todo,
749                                  scalar @jobstep_done,
750                                  scalar @slot - scalar @freeslot - scalar @holdslot);
751   $Job->{'tasks_summary'} ||= {};
752   $Job->{'tasks_summary'}->{'todo'} = $todo;
753   $Job->{'tasks_summary'}->{'done'} = $done;
754   $Job->{'tasks_summary'}->{'running'} = $running;
755   if ($job_has_uuid) {
756     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
757   }
758   Log (undef, "status: $done done, $running running, $todo todo");
759   $progress_is_dirty = 0;
760 }
761
762
763
764 sub reapchildren
765 {
766   my $pid = waitpid (-1, WNOHANG);
767   return 0 if $pid <= 0;
768
769   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
770                   . "."
771                   . $slot[$proc{$pid}->{slot}]->{cpu});
772   my $jobstepid = $proc{$pid}->{jobstep};
773   my $elapsed = time - $proc{$pid}->{time};
774   my $Jobstep = $jobstep[$jobstepid];
775
776   my $childstatus = $?;
777   my $exitvalue = $childstatus >> 8;
778   my $exitinfo = sprintf("exit %d signal %d%s",
779                          $exitvalue,
780                          $childstatus & 127,
781                          ($childstatus & 128 ? ' core dump' : ''));
782   $Jobstep->{'arvados_task'}->reload;
783   my $task_success = $Jobstep->{'arvados_task'}->{success};
784
785   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
786
787   if (!defined $task_success) {
788     # task did not indicate one way or the other --> fail
789     $Jobstep->{'arvados_task'}->{success} = 0;
790     $Jobstep->{'arvados_task'}->save;
791     $task_success = 0;
792   }
793
794   if (!$task_success)
795   {
796     my $temporary_fail;
797     $temporary_fail ||= $Jobstep->{node_fail};
798     $temporary_fail ||= ($exitvalue == 111);
799
800     ++$thisround_failed;
801     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
802
803     # Check for signs of a failed or misconfigured node
804     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
805         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
806       # Don't count this against jobstep failure thresholds if this
807       # node is already suspected faulty and srun exited quickly
808       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
809           $elapsed < 5) {
810         Log ($jobstepid, "blaming failure on suspect node " .
811              $slot[$proc{$pid}->{slot}]->{node}->{name});
812         $temporary_fail ||= 1;
813       }
814       ban_node_by_slot($proc{$pid}->{slot});
815     }
816
817     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
818                              ++$Jobstep->{'failures'},
819                              $temporary_fail ? 'temporary ' : 'permanent',
820                              $elapsed));
821
822     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
823       # Give up on this task, and the whole job
824       $main::success = 0;
825       $main::please_freeze = 1;
826     }
827     else {
828       # Put this task back on the todo queue
829       push @jobstep_todo, $jobstepid;
830     }
831     $Job->{'tasks_summary'}->{'failed'}++;
832   }
833   else
834   {
835     ++$thisround_succeeded;
836     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
837     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
838     push @jobstep_done, $jobstepid;
839     Log ($jobstepid, "success in $elapsed seconds");
840   }
841   $Jobstep->{exitcode} = $childstatus;
842   $Jobstep->{finishtime} = time;
843   process_stderr ($jobstepid, $task_success);
844   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
845
846   close $reader{$jobstepid};
847   delete $reader{$jobstepid};
848   delete $slot[$proc{$pid}->{slot}]->{pid};
849   push @freeslot, $proc{$pid}->{slot};
850   delete $proc{$pid};
851
852   # Load new tasks
853   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
854     'where' => {
855       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
856     },
857     'order' => 'qsequence'
858   );
859   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
860     my $jobstep = {
861       'level' => $arvados_task->{'sequence'},
862       'failures' => 0,
863       'arvados_task' => $arvados_task
864     };
865     push @jobstep, $jobstep;
866     push @jobstep_todo, $#jobstep;
867   }
868
869   $progress_is_dirty = 1;
870   1;
871 }
872
873 sub check_refresh_wanted
874 {
875   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
876   if (@stat && $stat[9] > $latest_refresh) {
877     $latest_refresh = scalar time;
878     if ($job_has_uuid) {
879       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
880       for my $attr ('cancelled_at',
881                     'cancelled_by_user_uuid',
882                     'cancelled_by_client_uuid') {
883         $Job->{$attr} = $Job2->{$attr};
884       }
885       if ($Job->{'cancelled_at'}) {
886         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
887              " by user " . $Job->{cancelled_by_user_uuid});
888         $main::success = 0;
889         $main::please_freeze = 1;
890       }
891     }
892   }
893 }
894
895 sub check_squeue
896 {
897   # return if the kill list was checked <4 seconds ago
898   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
899   {
900     return;
901   }
902   $squeue_kill_checked = time;
903
904   # use killem() on procs whose killtime is reached
905   for (keys %proc)
906   {
907     if (exists $proc{$_}->{killtime}
908         && $proc{$_}->{killtime} <= time)
909     {
910       killem ($_);
911     }
912   }
913
914   # return if the squeue was checked <60 seconds ago
915   if (defined $squeue_checked && $squeue_checked > time - 60)
916   {
917     return;
918   }
919   $squeue_checked = time;
920
921   if (!$have_slurm)
922   {
923     # here is an opportunity to check for mysterious problems with local procs
924     return;
925   }
926
927   # get a list of steps still running
928   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
929   chop @squeue;
930   if ($squeue[-1] ne "ok")
931   {
932     return;
933   }
934   pop @squeue;
935
936   # which of my jobsteps are running, according to squeue?
937   my %ok;
938   foreach (@squeue)
939   {
940     if (/^(\d+)\.(\d+) (\S+)/)
941     {
942       if ($1 eq $ENV{SLURM_JOBID})
943       {
944         $ok{$3} = 1;
945       }
946     }
947   }
948
949   # which of my active child procs (>60s old) were not mentioned by squeue?
950   foreach (keys %proc)
951   {
952     if ($proc{$_}->{time} < time - 60
953         && !exists $ok{$proc{$_}->{jobstepname}}
954         && !exists $proc{$_}->{killtime})
955     {
956       # kill this proc if it hasn't exited in 30 seconds
957       $proc{$_}->{killtime} = time + 30;
958     }
959   }
960 }
961
962
963 sub release_allocation
964 {
965   if ($have_slurm)
966   {
967     Log (undef, "release job allocation");
968     system "scancel $ENV{SLURM_JOBID}";
969   }
970 }
971
972
973 sub readfrompipes
974 {
975   my $gotsome = 0;
976   foreach my $job (keys %reader)
977   {
978     my $buf;
979     while (0 < sysread ($reader{$job}, $buf, 8192))
980     {
981       print STDERR $buf if $ENV{CRUNCH_DEBUG};
982       $jobstep[$job]->{stderr} .= $buf;
983       preprocess_stderr ($job);
984       if (length ($jobstep[$job]->{stderr}) > 16384)
985       {
986         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
987       }
988       $gotsome = 1;
989     }
990   }
991   return $gotsome;
992 }
993
994
995 sub preprocess_stderr
996 {
997   my $job = shift;
998
999   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1000     my $line = $1;
1001     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1002     Log ($job, "stderr $line");
1003     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1004       # whoa.
1005       $main::please_freeze = 1;
1006     }
1007     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1008       $jobstep[$job]->{node_fail} = 1;
1009       ban_node_by_slot($jobstep[$job]->{slotindex});
1010     }
1011   }
1012 }
1013
1014
1015 sub process_stderr
1016 {
1017   my $job = shift;
1018   my $task_success = shift;
1019   preprocess_stderr ($job);
1020
1021   map {
1022     Log ($job, "stderr $_");
1023   } split ("\n", $jobstep[$job]->{stderr});
1024 }
1025
1026
1027 sub collate_output
1028 {
1029   my $whc = Warehouse->new;
1030   Log (undef, "collate");
1031   $whc->write_start (1);
1032   my $joboutput;
1033   for (@jobstep)
1034   {
1035     next if (!exists $_->{'arvados_task'}->{output} ||
1036              !$_->{'arvados_task'}->{'success'} ||
1037              $_->{'exitcode'} != 0);
1038     my $output = $_->{'arvados_task'}->{output};
1039     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1040     {
1041       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1042       $whc->write_data ($output);
1043     }
1044     elsif (@jobstep == 1)
1045     {
1046       $joboutput = $output;
1047       $whc->write_finish;
1048     }
1049     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1050     {
1051       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1052       $whc->write_data ($outblock);
1053     }
1054     else
1055     {
1056       my $errstr = $whc->errstr;
1057       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1058       $main::success = 0;
1059     }
1060   }
1061   $joboutput = $whc->write_finish if !defined $joboutput;
1062   if ($joboutput)
1063   {
1064     Log (undef, "output $joboutput");
1065     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1066   }
1067   else
1068   {
1069     Log (undef, "output undef");
1070   }
1071   return $joboutput;
1072 }
1073
1074
1075 sub killem
1076 {
1077   foreach (@_)
1078   {
1079     my $sig = 2;                # SIGINT first
1080     if (exists $proc{$_}->{"sent_$sig"} &&
1081         time - $proc{$_}->{"sent_$sig"} > 4)
1082     {
1083       $sig = 15;                # SIGTERM if SIGINT doesn't work
1084     }
1085     if (exists $proc{$_}->{"sent_$sig"} &&
1086         time - $proc{$_}->{"sent_$sig"} > 4)
1087     {
1088       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1089     }
1090     if (!exists $proc{$_}->{"sent_$sig"})
1091     {
1092       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1093       kill $sig, $_;
1094       select (undef, undef, undef, 0.1);
1095       if ($sig == 2)
1096       {
1097         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1098       }
1099       $proc{$_}->{"sent_$sig"} = time;
1100       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1101     }
1102   }
1103 }
1104
1105
1106 sub fhbits
1107 {
1108   my($bits);
1109   for (@_) {
1110     vec($bits,fileno($_),1) = 1;
1111   }
1112   $bits;
1113 }
1114
1115
1116 sub Log                         # ($jobstep_id, $logmessage)
1117 {
1118   if ($_[1] =~ /\n/) {
1119     for my $line (split (/\n/, $_[1])) {
1120       Log ($_[0], $line);
1121     }
1122     return;
1123   }
1124   my $fh = select STDERR; $|=1; select $fh;
1125   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1126   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1127   $message .= "\n";
1128   my $datetime;
1129   if ($metastream || -t STDERR) {
1130     my @gmtime = gmtime;
1131     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1132                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1133   }
1134   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1135
1136   return if !$metastream;
1137   $metastream->write_data ($datetime . " " . $message);
1138 }
1139
1140
1141 sub croak
1142 {
1143   my ($package, $file, $line) = caller;
1144   my $message = "@_ at $file line $line\n";
1145   Log (undef, $message);
1146   freeze() if @jobstep_todo;
1147   collate_output() if @jobstep_todo;
1148   cleanup();
1149   save_meta() if $metastream;
1150   die;
1151 }
1152
1153
1154 sub cleanup
1155 {
1156   return if !$job_has_uuid;
1157   $Job->update_attributes('running' => 0,
1158                           'success' => 0,
1159                           'finished_at' => scalar gmtime);
1160 }
1161
1162
1163 sub save_meta
1164 {
1165   my $justcheckpoint = shift; # false if this will be the last meta saved
1166   my $m = $metastream;
1167   $m = $m->copy if $justcheckpoint;
1168   $m->write_finish;
1169   my $loglocator = $m->as_key;
1170   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1171   Log (undef, "meta key is $loglocator");
1172   $Job->{'log'} = $loglocator;
1173   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1174 }
1175
1176
1177 sub freeze_if_want_freeze
1178 {
1179   if ($main::please_freeze)
1180   {
1181     release_allocation();
1182     if (@_)
1183     {
1184       # kill some srun procs before freeze+stop
1185       map { $proc{$_} = {} } @_;
1186       while (%proc)
1187       {
1188         killem (keys %proc);
1189         select (undef, undef, undef, 0.1);
1190         my $died;
1191         while (($died = waitpid (-1, WNOHANG)) > 0)
1192         {
1193           delete $proc{$died};
1194         }
1195       }
1196     }
1197     freeze();
1198     collate_output();
1199     cleanup();
1200     save_meta();
1201     exit 0;
1202   }
1203 }
1204
1205
1206 sub freeze
1207 {
1208   Log (undef, "Freeze not implemented");
1209   return;
1210 }
1211
1212
1213 sub thaw
1214 {
1215   croak ("Thaw not implemented");
1216
1217   my $whc;
1218   my $key = shift;
1219   Log (undef, "thaw from $key");
1220
1221   @jobstep = ();
1222   @jobstep_done = ();
1223   @jobstep_todo = ();
1224   @jobstep_tomerge = ();
1225   $jobstep_tomerge_level = 0;
1226   my $frozenjob = {};
1227
1228   my $stream = new Warehouse::Stream ( whc => $whc,
1229                                        hash => [split (",", $key)] );
1230   $stream->rewind;
1231   while (my $dataref = $stream->read_until (undef, "\n\n"))
1232   {
1233     if ($$dataref =~ /^job /)
1234     {
1235       foreach (split ("\n", $$dataref))
1236       {
1237         my ($k, $v) = split ("=", $_, 2);
1238         $frozenjob->{$k} = freezeunquote ($v);
1239       }
1240       next;
1241     }
1242
1243     if ($$dataref =~ /^merge (\d+) (.*)/)
1244     {
1245       $jobstep_tomerge_level = $1;
1246       @jobstep_tomerge
1247           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1248       next;
1249     }
1250
1251     my $Jobstep = { };
1252     foreach (split ("\n", $$dataref))
1253     {
1254       my ($k, $v) = split ("=", $_, 2);
1255       $Jobstep->{$k} = freezeunquote ($v) if $k;
1256     }
1257     $Jobstep->{'failures'} = 0;
1258     push @jobstep, $Jobstep;
1259
1260     if ($Jobstep->{exitcode} eq "0")
1261     {
1262       push @jobstep_done, $#jobstep;
1263     }
1264     else
1265     {
1266       push @jobstep_todo, $#jobstep;
1267     }
1268   }
1269
1270   foreach (qw (script script_version script_parameters))
1271   {
1272     $Job->{$_} = $frozenjob->{$_};
1273   }
1274   $Job->save if $job_has_uuid;
1275 }
1276
1277
1278 sub freezequote
1279 {
1280   my $s = shift;
1281   $s =~ s/\\/\\\\/g;
1282   $s =~ s/\n/\\n/g;
1283   return $s;
1284 }
1285
1286
1287 sub freezeunquote
1288 {
1289   my $s = shift;
1290   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1291   return $s;
1292 }
1293
1294
1295 sub srun
1296 {
1297   my $srunargs = shift;
1298   my $execargs = shift;
1299   my $opts = shift || {};
1300   my $stdin = shift;
1301   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1302   print STDERR (join (" ",
1303                       map { / / ? "'$_'" : $_ }
1304                       (@$args)),
1305                 "\n")
1306       if $ENV{CRUNCH_DEBUG};
1307
1308   if (defined $stdin) {
1309     my $child = open STDIN, "-|";
1310     defined $child or die "no fork: $!";
1311     if ($child == 0) {
1312       print $stdin or die $!;
1313       close STDOUT or die $!;
1314       exit 0;
1315     }
1316   }
1317
1318   return system (@$args) if $opts->{fork};
1319
1320   exec @$args;
1321   warn "ENV size is ".length(join(" ",%ENV));
1322   die "exec failed: $!: @$args";
1323 }
1324
1325
1326 sub ban_node_by_slot {
1327   # Don't start any new jobsteps on this node for 60 seconds
1328   my $slotid = shift;
1329   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1330   $slot[$slotid]->{node}->{hold_count}++;
1331   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1332 }
1333
1334 __DATA__
1335 #!/usr/bin/perl
1336
1337 # checkout-and-build
1338
1339 use Fcntl ':flock';
1340
1341 my $destdir = $ENV{"CRUNCH_SRC"};
1342 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1343 my $repo = $ENV{"CRUNCH_SRC_URL"};
1344
1345 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1346 flock L, LOCK_EX;
1347 if (readlink ("$destdir.commit") eq $commit) {
1348     exit 0;
1349 }
1350
1351 unlink "$destdir.commit";
1352 open STDOUT, ">", "$destdir.log";
1353 open STDERR, ">&STDOUT";
1354
1355 mkdir $destdir;
1356 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1357 print TARX <DATA>;
1358 if(!close(TARX)) {
1359   die "'tar -C $destdir -xf -' exited $?: $!";
1360 }
1361
1362 my $pwd;
1363 chomp ($pwd = `pwd`);
1364 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1365 mkdir $install_dir;
1366 if (-e "$destdir/crunch_scripts/install") {
1367     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1368 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1369     # Old version
1370     shell_or_die ("./tests/autotests.sh", $install_dir);
1371 } elsif (-e "./install.sh") {
1372     shell_or_die ("./install.sh", $install_dir);
1373 }
1374
1375 if ($commit) {
1376     unlink "$destdir.commit.new";
1377     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1378     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1379 }
1380
1381 close L;
1382
1383 exit 0;
1384
1385 sub shell_or_die
1386 {
1387   if ($ENV{"DEBUG"}) {
1388     print STDERR "@_\n";
1389   }
1390   system (@_) == 0
1391       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1392 }
1393
1394 __DATA__