fix dev job id
[arvados.git] / services / crunch / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated). Currently this is a no-op.
62
63 =back
64
65 =cut
66
67
68 use strict;
69 use POSIX ':sys_wait_h';
70 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
71 use Arvados;
72 use Getopt::Long;
73 use Warehouse;
74 use Warehouse::Stream;
75 use IPC::System::Simple qw(capturex);
76
77 $ENV{"TMPDIR"} ||= "/tmp";
78 $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
79 $ENV{"CRUNCH_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
80 mkdir ($ENV{"CRUNCH_TMP"});
81
82 my $force_unlock;
83 my $git_dir;
84 my $jobspec;
85 my $job_api_token;
86 my $resume_stash;
87 GetOptions('force-unlock' => \$force_unlock,
88            'git-dir=s' => \$git_dir,
89            'job=s' => \$jobspec,
90            'job-api-token=s' => \$job_api_token,
91            'resume-stash=s' => \$resume_stash,
92     );
93
94 if (defined $job_api_token) {
95   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
96 }
97
98 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
99 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
100
101
102 $SIG{'HUP'} = sub
103 {
104   1;
105 };
106 $SIG{'USR1'} = sub
107 {
108   $main::ENV{CRUNCH_DEBUG} = 1;
109 };
110 $SIG{'USR2'} = sub
111 {
112   $main::ENV{CRUNCH_DEBUG} = 0;
113 };
114
115
116
117 my $arv = Arvados->new;
118 my $metastream = Warehouse::Stream->new(whc => new Warehouse);
119 $metastream->clear;
120 $metastream->write_start('log.txt');
121
122 my $User = {};
123 my $Job = {};
124 my $job_id;
125 my $dbh;
126 my $sth;
127 if ($job_has_uuid)
128 {
129   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
130   $User = $arv->{'users'}->{'current'}->execute;
131   if (!$force_unlock) {
132     if ($Job->{'is_locked_by'}) {
133       croak("Job is locked: " . $Job->{'is_locked_by'});
134     }
135     if ($Job->{'success'} ne undef) {
136       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
137     }
138     if ($Job->{'running'}) {
139       croak("Job 'running' flag is already set");
140     }
141     if ($Job->{'started_at'}) {
142       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
143     }
144   }
145 }
146 else
147 {
148   $Job = JSON::decode_json($jobspec);
149
150   if (!$resume_stash)
151   {
152     map { croak ("No $_ specified") unless $Job->{$_} }
153     qw(script script_version script_parameters);
154   }
155
156   if (!defined $Job->{'uuid'}) {
157     my $hostname = `hostname -s`;
158     chomp $hostname;
159     $Job->{'uuid'} = sprintf ("%s-t%d-p%d", $hostname, time, $$);
160   }
161 }
162 $job_id = $Job->{'uuid'};
163
164
165
166 $Job->{'resource_limits'} ||= {};
167 $Job->{'resource_limits'}->{'max_tasks_per_node'} ||= 0;
168 my $max_ncpus = $Job->{'resource_limits'}->{'max_tasks_per_node'};
169
170
171 Log (undef, "check slurm allocation");
172 my @slot;
173 my @node;
174 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
175 my @sinfo;
176 if (!$have_slurm)
177 {
178   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
179   push @sinfo, "$localcpus localhost";
180 }
181 if (exists $ENV{SLURM_NODELIST})
182 {
183   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
184 }
185 foreach (@sinfo)
186 {
187   my ($ncpus, $slurm_nodelist) = split;
188   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
189
190   my @nodelist;
191   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
192   {
193     my $nodelist = $1;
194     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
195     {
196       my $ranges = $1;
197       foreach (split (",", $ranges))
198       {
199         my ($a, $b);
200         if (/(\d+)-(\d+)/)
201         {
202           $a = $1;
203           $b = $2;
204         }
205         else
206         {
207           $a = $_;
208           $b = $_;
209         }
210         push @nodelist, map {
211           my $n = $nodelist;
212           $n =~ s/\[[-,\d]+\]/$_/;
213           $n;
214         } ($a..$b);
215       }
216     }
217     else
218     {
219       push @nodelist, $nodelist;
220     }
221   }
222   foreach my $nodename (@nodelist)
223   {
224     Log (undef, "node $nodename - $ncpus slots");
225     my $node = { name => $nodename,
226                  ncpus => $ncpus,
227                  losing_streak => 0,
228                  hold_until => 0 };
229     foreach my $cpu (1..$ncpus)
230     {
231       push @slot, { node => $node,
232                     cpu => $cpu };
233     }
234   }
235   push @node, @nodelist;
236 }
237
238
239
240 # Ensure that we get one jobstep running on each allocated node before
241 # we start overloading nodes with concurrent steps
242
243 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
244
245
246
247 my $jobmanager_id;
248 if ($job_has_uuid)
249 {
250   # Claim this job, and make sure nobody else does
251
252   $Job->{'is_locked_by'} = $User->{'uuid'};
253   $Job->{'started_at'} = gmtime;
254   $Job->{'running'} = 1;
255   $Job->{'success'} = undef;
256   $Job->{'tasks_summary'} = { 'failed' => 0,
257                               'todo' => 1,
258                               'running' => 0,
259                               'done' => 0 };
260   unless ($Job->save() && $Job->{'is_locked_by'} == $User->{'uuid'}) {
261     croak("Error while updating / locking job");
262   }
263 }
264
265
266 Log (undef, "start");
267 $SIG{'INT'} = sub { $main::please_freeze = 1; };
268 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
269 $SIG{'TERM'} = \&croak;
270 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
271 $SIG{'ALRM'} = sub { $main::please_info = 1; };
272 $SIG{'CONT'} = sub { $main::please_continue = 1; };
273 $main::please_freeze = 0;
274 $main::please_info = 0;
275 $main::please_continue = 0;
276 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
277
278 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
279 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
280 $ENV{"JOB_UUID"} = $job_id;
281
282
283 my @jobstep;
284 my @jobstep_todo = ();
285 my @jobstep_done = ();
286 my @jobstep_tomerge = ();
287 my $jobstep_tomerge_level = 0;
288 my $squeue_checked;
289 my $squeue_kill_checked;
290 my $output_in_keep = 0;
291
292
293
294 if (defined $Job->{thawedfromkey})
295 {
296   thaw ($Job->{thawedfromkey});
297 }
298 else
299 {
300   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
301     'job_uuid' => $Job->{'uuid'},
302     'sequence' => 0,
303     'qsequence' => 0,
304     'parameters' => {},
305                                                           });
306   push @jobstep, { 'level' => 0,
307                    'attempts' => 0,
308                    'arvados_task' => $first_task,
309                  };
310   push @jobstep_todo, 0;
311 }
312
313
314 my $build_script;
315 do {
316   local $/ = undef;
317   $build_script = <DATA>;
318 };
319
320
321 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{revision};
322
323 my $skip_install = (!$have_slurm && $Job->{revision} =~ m{^/});
324 if ($skip_install)
325 {
326   $ENV{"CRUNCH_SRC"} = $Job->{revision};
327 }
328 else
329 {
330   Log (undef, "Install revision ".$Job->{revision});
331   my $nodelist = join(",", @node);
332
333   # Clean out crunch_tmp/work and crunch_tmp/opt
334
335   my $cleanpid = fork();
336   if ($cleanpid == 0)
337   {
338     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
339           ['bash', '-c', 'if mount | grep -q $CRUNCH_WORK/; then sudo /bin/umount $CRUNCH_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $CRUNCH_WORK $CRUNCH_TMP/opt']);
340     exit (1);
341   }
342   while (1)
343   {
344     last if $cleanpid == waitpid (-1, WNOHANG);
345     freeze_if_want_freeze ($cleanpid);
346     select (undef, undef, undef, 0.1);
347   }
348   Log (undef, "Clean-work-dir exited $?");
349
350   # Install requested code version
351
352   my @execargs;
353   my @srunargs = ("srun",
354                   "--nodelist=$nodelist",
355                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
356
357   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
358   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
359   $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
360
361   my $commit;
362   my $git_archive;
363   my $treeish = $Job->{'script_version'};
364   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
365   # Todo: let script_version specify repository instead of expecting
366   # parent process to figure it out.
367   $ENV{"CRUNCH_SRC_URL"} = $repo;
368
369   # Create/update our clone of the remote git repo
370
371   if (!-d $ENV{"CRUNCH_SRC"}) {
372     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
373         or croak ("git clone $repo failed: exit ".($?>>8));
374     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
375   }
376   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
377
378   # If this looks like a subversion r#, look for it in git-svn commit messages
379
380   if ($treeish =~ m{^\d{1,4}$}) {
381     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
382     chomp $gitlog;
383     if ($gitlog =~ /^[a-f0-9]{40}$/) {
384       $commit = $gitlog;
385       Log (undef, "Using commit $commit for revision $treeish");
386     }
387   }
388
389   # If that didn't work, try asking git to look it up as a tree-ish.
390
391   if (!defined $commit) {
392
393     my $cooked_treeish = $treeish;
394     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
395       # Looks like a git branch name -- make sure git knows it's
396       # relative to the remote repo
397       $cooked_treeish = "origin/$treeish";
398     }
399
400     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
401     chomp $found;
402     if ($found =~ /^[0-9a-f]{40}$/s) {
403       $commit = $found;
404       if ($commit ne $treeish) {
405         # Make sure we record the real commit id in the database,
406         # frozentokey, logs, etc. -- instead of an abbreviation or a
407         # branch name which can become ambiguous or point to a
408         # different commit in the future.
409         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
410         Log (undef, "Using commit $commit for tree-ish $treeish");
411         if ($commit ne $treeish) {
412           $Job->{'script_version'} = $commit;
413           $Job->save() or croak("Error while updating job");
414         }
415       }
416     }
417   }
418
419   if (defined $commit) {
420     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
421     @execargs = ("sh", "-c",
422                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
423     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
424   }
425   else {
426     croak ("could not figure out commit id for $treeish");
427   }
428
429   my $installpid = fork();
430   if ($installpid == 0)
431   {
432     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
433     exit (1);
434   }
435   while (1)
436   {
437     last if $installpid == waitpid (-1, WNOHANG);
438     freeze_if_want_freeze ($installpid);
439     select (undef, undef, undef, 0.1);
440   }
441   Log (undef, "Install exited $?");
442 }
443
444
445
446 foreach (qw (script script_version script_parameters resource_limits))
447 {
448   Log (undef,
449        "$_ " .
450        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
451 }
452 foreach (split (/\n/, $Job->{knobs}))
453 {
454   Log (undef, "knob " . $_);
455 }
456
457
458
459 my $success;
460
461
462
463 ONELEVEL:
464
465 my $thisround_succeeded = 0;
466 my $thisround_failed = 0;
467 my $thisround_failed_multiple = 0;
468
469 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
470                        or $a <=> $b } @jobstep_todo;
471 my $level = $jobstep[$jobstep_todo[0]]->{level};
472 Log (undef, "start level $level");
473
474
475
476 my %proc;
477 my @freeslot = (0..$#slot);
478 my @holdslot;
479 my %reader;
480 my $progress_is_dirty = 1;
481 my $progress_stats_updated = 0;
482
483 update_progress_stats();
484
485
486
487 THISROUND:
488 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
489 {
490   $main::please_continue = 0;
491
492   my $id = $jobstep_todo[$todo_ptr];
493   my $Jobstep = $jobstep[$id];
494   if ($Jobstep->{level} != $level)
495   {
496     next;
497   }
498   if ($Jobstep->{attempts} > 9)
499   {
500     Log ($id, "jobstep $id failed $$Jobstep{attempts} times -- giving up");
501     $success = 0;
502     last THISROUND;
503   }
504
505   pipe $reader{$id}, "writer" or croak ($!);
506   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
507   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
508
509   my $childslot = $freeslot[0];
510   my $childnode = $slot[$childslot]->{node};
511   my $childslotname = join (".",
512                             $slot[$childslot]->{node}->{name},
513                             $slot[$childslot]->{cpu});
514   my $childpid = fork();
515   if ($childpid == 0)
516   {
517     $SIG{'INT'} = 'DEFAULT';
518     $SIG{'QUIT'} = 'DEFAULT';
519     $SIG{'TERM'} = 'DEFAULT';
520
521     foreach (values (%reader))
522     {
523       close($_);
524     }
525     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
526     open(STDOUT,">&writer");
527     open(STDERR,">&writer");
528
529     undef $dbh;
530     undef $sth;
531
532     delete $ENV{"GNUPGHOME"};
533     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
534     $ENV{"TASK_QSEQUENCE"} = $id;
535     $ENV{"TASK_SEQUENCE"} = $level;
536     $ENV{"JOB_SCRIPT"} = $Job->{script};
537     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
538       $param =~ tr/a-z/A-Z/;
539       $ENV{"JOB_PARAMETER_$param"} = $value;
540     }
541     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
542     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
543     $ENV{"TASK_TMPDIR"} = $ENV{"CRUNCH_WORK"}.$slot[$childslot]->{cpu};
544     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
545
546     $ENV{"GZIP"} = "-n";
547
548     my @srunargs = (
549       "srun",
550       "--nodelist=".$childnode->{name},
551       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
552       "--job-name=$job_id.$id.$$",
553         );
554     my @execargs = qw(sh);
555     my $build_script_to_send = "";
556     my $command =
557         "mkdir -p $ENV{CRUNCH_TMP}/revision "
558         ."&& cd $ENV{CRUNCH_TMP} ";
559     if ($build_script)
560     {
561       $build_script_to_send = $build_script;
562       $command .=
563           "&& perl -";
564     }
565     elsif (!$skip_install)
566     {
567       $command .=
568           "&& "
569           ."( "
570           ."  [ -e '$ENV{CRUNCH_INSTALL}/.tested' ] "
571           ."|| "
572           ."  ( svn export --quiet '$ENV{INSTALL_REPOS}/installrevision' "
573           ."    && ./installrevision "
574           ."  ) "
575           .") ";
576     }
577     $ENV{"PYTHONPATH"} = "$ENV{CRUNCH_SRC}/sdk/python"; # xxx hack
578     $command .=
579         "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
580     my @execargs = ('bash', '-c', $command);
581     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
582     exit (1);
583   }
584   close("writer");
585   if (!defined $childpid)
586   {
587     close $reader{$id};
588     delete $reader{$id};
589     next;
590   }
591   shift @freeslot;
592   $proc{$childpid} = { jobstep => $id,
593                        time => time,
594                        slot => $childslot,
595                        jobstepname => "$job_id.$id.$childpid",
596                      };
597   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
598   $slot[$childslot]->{pid} = $childpid;
599
600   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
601   Log ($id, "child $childpid started on $childslotname");
602   $Jobstep->{attempts} ++;
603   $Jobstep->{starttime} = time;
604   $Jobstep->{node} = $childnode->{name};
605   $Jobstep->{slotindex} = $childslot;
606   delete $Jobstep->{stderr};
607   delete $Jobstep->{finishtime};
608
609   splice @jobstep_todo, $todo_ptr, 1;
610   --$todo_ptr;
611
612   $progress_is_dirty = 1;
613
614   while (!@freeslot
615          ||
616          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
617   {
618     last THISROUND if $main::please_freeze;
619     if ($main::please_info)
620     {
621       $main::please_info = 0;
622       freeze();
623       collate_output();
624       save_meta(1);
625       update_progress_stats();
626     }
627     my $gotsome
628         = readfrompipes ()
629         + reapchildren ();
630     if (!$gotsome)
631     {
632       check_squeue();
633       update_progress_stats();
634       select (undef, undef, undef, 0.1);
635     }
636     elsif (time - $progress_stats_updated >= 30)
637     {
638       update_progress_stats();
639     }
640     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
641         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
642     {
643       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
644           .($thisround_failed+$thisround_succeeded)
645           .") -- giving up on this round";
646       Log (undef, $message);
647       last THISROUND;
648     }
649
650     # move slots from freeslot to holdslot (or back to freeslot) if necessary
651     for (my $i=$#freeslot; $i>=0; $i--) {
652       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
653         push @holdslot, (splice @freeslot, $i, 1);
654       }
655     }
656     for (my $i=$#holdslot; $i>=0; $i--) {
657       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
658         push @freeslot, (splice @holdslot, $i, 1);
659       }
660     }
661
662     # give up if no nodes are succeeding
663     if (!grep { $_->{node}->{losing_streak} == 0 &&
664                     $_->{node}->{hold_count} < 4 } @slot) {
665       my $message = "Every node has failed -- giving up on this round";
666       Log (undef, $message);
667       last THISROUND;
668     }
669   }
670 }
671
672
673 push @freeslot, splice @holdslot;
674 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
675
676
677 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
678 while (%proc)
679 {
680   goto THISROUND if $main::please_continue;
681   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
682   readfrompipes ();
683   if (!reapchildren())
684   {
685     check_squeue();
686     update_progress_stats();
687     select (undef, undef, undef, 0.1);
688     killem (keys %proc) if $main::please_freeze;
689   }
690 }
691
692 update_progress_stats();
693 freeze_if_want_freeze();
694
695
696 if (!defined $success)
697 {
698   if (@jobstep_todo &&
699       $thisround_succeeded == 0 &&
700       ($thisround_failed == 0 || $thisround_failed > 4))
701   {
702     my $message = "stop because $thisround_failed tasks failed and none succeeded";
703     Log (undef, $message);
704     $success = 0;
705   }
706   if (!@jobstep_todo)
707   {
708     $success = 1;
709   }
710 }
711
712 goto ONELEVEL if !defined $success;
713
714
715 release_allocation();
716 freeze();
717 $Job->{'output'} = &collate_output();
718 $Job->{'success'} = $Job->{'output'} && $success;
719 $Job->save;
720
721 if ($Job->{'output'})
722 {
723   eval {
724     my $manifest_text = capturex("whget", $Job->{'output'});
725     $arv->{'collections'}->{'create'}->execute('collection' => {
726       'uuid' => $Job->{'output'},
727       'manifest_text' => $manifest_text,
728     });
729   };
730   if ($@) {
731     Log (undef, "Failed to register output manifest: $@");
732   }
733 }
734
735 Log (undef, "finish");
736
737 save_meta();
738 exit 0;
739
740
741
742 sub update_progress_stats
743 {
744   $progress_stats_updated = time;
745   return if !$progress_is_dirty;
746   my ($todo, $done, $running) = (scalar @jobstep_todo,
747                                  scalar @jobstep_done,
748                                  scalar @slot - scalar @freeslot - scalar @holdslot);
749   $Job->{'tasks_summary'} ||= {};
750   $Job->{'tasks_summary'}->{'todo'} = $todo;
751   $Job->{'tasks_summary'}->{'done'} = $done;
752   $Job->{'tasks_summary'}->{'running'} = $running;
753   $Job->save;
754   Log (undef, "status: $done done, $running running, $todo todo");
755   $progress_is_dirty = 0;
756 }
757
758
759
760 sub reapchildren
761 {
762   my $pid = waitpid (-1, WNOHANG);
763   return 0 if $pid <= 0;
764
765   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
766                   . "."
767                   . $slot[$proc{$pid}->{slot}]->{cpu});
768   my $jobstepid = $proc{$pid}->{jobstep};
769   my $elapsed = time - $proc{$pid}->{time};
770   my $Jobstep = $jobstep[$jobstepid];
771
772   my $exitcode = $?;
773   my $exitinfo = "exit $exitcode";
774   $Jobstep->{'arvados_task'}->reload;
775   my $success = $Jobstep->{'arvados_task'}->{success};
776
777   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$success");
778
779   if (!defined $success) {
780     # task did not indicate one way or the other --> fail
781     $Jobstep->{'arvados_task'}->{success} = 0;
782     $Jobstep->{'arvados_task'}->save;
783     $success = 0;
784   }
785
786   if (!$success)
787   {
788     my $no_incr_attempts;
789     $no_incr_attempts = 1 if $Jobstep->{node_fail};
790
791     ++$thisround_failed;
792     ++$thisround_failed_multiple if $Jobstep->{attempts} > 1;
793
794     # Check for signs of a failed or misconfigured node
795     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
796         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
797       # Don't count this against jobstep failure thresholds if this
798       # node is already suspected faulty and srun exited quickly
799       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
800           $elapsed < 5 &&
801           $Jobstep->{attempts} > 1) {
802         Log ($jobstepid, "blaming failure on suspect node " . $slot[$proc{$pid}->{slot}]->{node}->{name} . " instead of incrementing jobstep attempts");
803         $no_incr_attempts = 1;
804         --$Jobstep->{attempts};
805       }
806       ban_node_by_slot($proc{$pid}->{slot});
807     }
808
809     push @jobstep_todo, $jobstepid;
810     Log ($jobstepid, "failure in $elapsed seconds");
811
812     --$Jobstep->{attempts} if $no_incr_attempts;
813     $Job->{'tasks_summary'}->{'failed'}++;
814   }
815   else
816   {
817     ++$thisround_succeeded;
818     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
819     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
820     push @jobstep_done, $jobstepid;
821     Log ($jobstepid, "success in $elapsed seconds");
822   }
823   $Jobstep->{exitcode} = $exitcode;
824   $Jobstep->{finishtime} = time;
825   process_stderr ($jobstepid, $success);
826   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
827
828   close $reader{$jobstepid};
829   delete $reader{$jobstepid};
830   delete $slot[$proc{$pid}->{slot}]->{pid};
831   push @freeslot, $proc{$pid}->{slot};
832   delete $proc{$pid};
833
834   # Load new tasks
835   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
836     'where' => {
837       'created_by_job_task' => $Jobstep->{'arvados_task'}->{uuid}
838     },
839     'order' => 'qsequence'
840   );
841   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
842     my $jobstep = {
843       'level' => $arvados_task->{'sequence'},
844       'attempts' => 0,
845       'arvados_task' => $arvados_task
846     };
847     push @jobstep, $jobstep;
848     push @jobstep_todo, $#jobstep;
849   }
850
851   $progress_is_dirty = 1;
852   1;
853 }
854
855
856 sub check_squeue
857 {
858   # return if the kill list was checked <4 seconds ago
859   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
860   {
861     return;
862   }
863   $squeue_kill_checked = time;
864
865   # use killem() on procs whose killtime is reached
866   for (keys %proc)
867   {
868     if (exists $proc{$_}->{killtime}
869         && $proc{$_}->{killtime} <= time)
870     {
871       killem ($_);
872     }
873   }
874
875   # return if the squeue was checked <60 seconds ago
876   if (defined $squeue_checked && $squeue_checked > time - 60)
877   {
878     return;
879   }
880   $squeue_checked = time;
881
882   if (!$have_slurm)
883   {
884     # here is an opportunity to check for mysterious problems with local procs
885     return;
886   }
887
888   # get a list of steps still running
889   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
890   chop @squeue;
891   if ($squeue[-1] ne "ok")
892   {
893     return;
894   }
895   pop @squeue;
896
897   # which of my jobsteps are running, according to squeue?
898   my %ok;
899   foreach (@squeue)
900   {
901     if (/^(\d+)\.(\d+) (\S+)/)
902     {
903       if ($1 eq $ENV{SLURM_JOBID})
904       {
905         $ok{$3} = 1;
906       }
907     }
908   }
909
910   # which of my active child procs (>60s old) were not mentioned by squeue?
911   foreach (keys %proc)
912   {
913     if ($proc{$_}->{time} < time - 60
914         && !exists $ok{$proc{$_}->{jobstepname}}
915         && !exists $proc{$_}->{killtime})
916     {
917       # kill this proc if it hasn't exited in 30 seconds
918       $proc{$_}->{killtime} = time + 30;
919     }
920   }
921 }
922
923
924 sub release_allocation
925 {
926   if ($have_slurm)
927   {
928     Log (undef, "release job allocation");
929     system "scancel $ENV{SLURM_JOBID}";
930   }
931 }
932
933
934 sub readfrompipes
935 {
936   my $gotsome = 0;
937   foreach my $job (keys %reader)
938   {
939     my $buf;
940     while (0 < sysread ($reader{$job}, $buf, 8192))
941     {
942       print STDERR $buf if $ENV{CRUNCH_DEBUG};
943       $jobstep[$job]->{stderr} .= $buf;
944       preprocess_stderr ($job);
945       if (length ($jobstep[$job]->{stderr}) > 16384)
946       {
947         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
948       }
949       $gotsome = 1;
950     }
951   }
952   return $gotsome;
953 }
954
955
956 sub preprocess_stderr
957 {
958   my $job = shift;
959
960   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
961     my $line = $1;
962     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
963     Log ($job, "stderr $line");
964     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOBID} has expired|Unable to confirm allocation for job) /) {
965       # whoa.
966       $main::please_freeze = 1;
967     }
968     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
969       $jobstep[$job]->{node_fail} = 1;
970       ban_node_by_slot($jobstep[$job]->{slotindex});
971     }
972   }
973 }
974
975
976 sub process_stderr
977 {
978   my $job = shift;
979   my $success = shift;
980   preprocess_stderr ($job);
981
982   map {
983     Log ($job, "stderr $_");
984   } split ("\n", $jobstep[$job]->{stderr});
985 }
986
987
988 sub collate_output
989 {
990   my $whc = Warehouse->new;
991   Log (undef, "collate");
992   $whc->write_start (1);
993   my $joboutput;
994   for (@jobstep)
995   {
996     next if (!exists $_->{'arvados_task'}->{output} ||
997              !$_->{'arvados_task'}->{'success'} ||
998              $_->{'exitcode'} != 0);
999     my $output = $_->{'arvados_task'}->{output};
1000     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1001     {
1002       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1003       $whc->write_data ($output);
1004     }
1005     elsif (@jobstep == 1)
1006     {
1007       $joboutput = $output;
1008       $whc->write_finish;
1009     }
1010     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1011     {
1012       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1013       $whc->write_data ($outblock);
1014     }
1015     else
1016     {
1017       my $errstr = $whc->errstr;
1018       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1019       $success = 0;
1020     }
1021   }
1022   $joboutput = $whc->write_finish if !defined $joboutput;
1023   if ($joboutput)
1024   {
1025     Log (undef, "output $joboutput");
1026     $Job->{'output'} = $joboutput;
1027     $Job->save;
1028   }
1029   else
1030   {
1031     Log (undef, "output undef");
1032   }
1033   return $joboutput;
1034 }
1035
1036
1037 sub killem
1038 {
1039   foreach (@_)
1040   {
1041     my $sig = 2;                # SIGINT first
1042     if (exists $proc{$_}->{"sent_$sig"} &&
1043         time - $proc{$_}->{"sent_$sig"} > 4)
1044     {
1045       $sig = 15;                # SIGTERM if SIGINT doesn't work
1046     }
1047     if (exists $proc{$_}->{"sent_$sig"} &&
1048         time - $proc{$_}->{"sent_$sig"} > 4)
1049     {
1050       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1051     }
1052     if (!exists $proc{$_}->{"sent_$sig"})
1053     {
1054       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1055       kill $sig, $_;
1056       select (undef, undef, undef, 0.1);
1057       if ($sig == 2)
1058       {
1059         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1060       }
1061       $proc{$_}->{"sent_$sig"} = time;
1062       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1063     }
1064   }
1065 }
1066
1067
1068 sub fhbits
1069 {
1070   my($bits);
1071   for (@_) {
1072     vec($bits,fileno($_),1) = 1;
1073   }
1074   $bits;
1075 }
1076
1077
1078 sub Log                         # ($jobstep_id, $logmessage)
1079 {
1080   if ($_[1] =~ /\n/) {
1081     for my $line (split (/\n/, $_[1])) {
1082       Log ($_[0], $line);
1083     }
1084     return;
1085   }
1086   my $fh = select STDERR; $|=1; select $fh;
1087   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1088   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1089   $message .= "\n";
1090   my $datetime;
1091   if ($metastream || -t STDERR) {
1092     my @gmtime = gmtime;
1093     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1094                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1095   }
1096   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1097
1098   return if !$metastream;
1099   $metastream->write_data ($datetime . " " . $message);
1100 }
1101
1102
1103 sub croak
1104 {
1105   my ($package, $file, $line) = caller;
1106   my $message = "@_ at $file line $line\n";
1107   Log (undef, $message);
1108   freeze() if @jobstep_todo;
1109   collate_output() if @jobstep_todo;
1110   cleanup();
1111   save_meta() if $metastream;
1112   die;
1113 }
1114
1115
1116 sub cleanup
1117 {
1118   return if !$job_has_uuid;
1119   $Job->reload;
1120   $Job->{'running'} = 0;
1121   $Job->{'success'} = 0;
1122   $Job->{'finished_at'} = gmtime;
1123   $Job->save;
1124 }
1125
1126
1127 sub save_meta
1128 {
1129   my $justcheckpoint = shift; # false if this will be the last meta saved
1130   my $m = $metastream;
1131   $m = $m->copy if $justcheckpoint;
1132   $m->write_finish;
1133   my $loglocator = $m->as_key;
1134   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1135   Log (undef, "meta key is $loglocator");
1136   $Job->{'log'} = $loglocator;
1137   $Job->save;
1138 }
1139
1140
1141 sub freeze_if_want_freeze
1142 {
1143   if ($main::please_freeze)
1144   {
1145     release_allocation();
1146     if (@_)
1147     {
1148       # kill some srun procs before freeze+stop
1149       map { $proc{$_} = {} } @_;
1150       while (%proc)
1151       {
1152         killem (keys %proc);
1153         select (undef, undef, undef, 0.1);
1154         my $died;
1155         while (($died = waitpid (-1, WNOHANG)) > 0)
1156         {
1157           delete $proc{$died};
1158         }
1159       }
1160     }
1161     freeze();
1162     collate_output();
1163     cleanup();
1164     save_meta();
1165     exit 0;
1166   }
1167 }
1168
1169
1170 sub freeze
1171 {
1172   Log (undef, "Freeze not implemented");
1173   return;
1174 }
1175
1176
1177 sub thaw
1178 {
1179   croak ("Thaw not implemented");
1180
1181   my $whc;
1182   my $key = shift;
1183   Log (undef, "thaw from $key");
1184
1185   @jobstep = ();
1186   @jobstep_done = ();
1187   @jobstep_todo = ();
1188   @jobstep_tomerge = ();
1189   $jobstep_tomerge_level = 0;
1190   my $frozenjob = {};
1191
1192   my $stream = new Warehouse::Stream ( whc => $whc,
1193                                        hash => [split (",", $key)] );
1194   $stream->rewind;
1195   while (my $dataref = $stream->read_until (undef, "\n\n"))
1196   {
1197     if ($$dataref =~ /^job /)
1198     {
1199       foreach (split ("\n", $$dataref))
1200       {
1201         my ($k, $v) = split ("=", $_, 2);
1202         $frozenjob->{$k} = freezeunquote ($v);
1203       }
1204       next;
1205     }
1206
1207     if ($$dataref =~ /^merge (\d+) (.*)/)
1208     {
1209       $jobstep_tomerge_level = $1;
1210       @jobstep_tomerge
1211           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1212       next;
1213     }
1214
1215     my $Jobstep = { };
1216     foreach (split ("\n", $$dataref))
1217     {
1218       my ($k, $v) = split ("=", $_, 2);
1219       $Jobstep->{$k} = freezeunquote ($v) if $k;
1220     }
1221     $Jobstep->{attempts} = 0;
1222     push @jobstep, $Jobstep;
1223
1224     if ($Jobstep->{exitcode} eq "0")
1225     {
1226       push @jobstep_done, $#jobstep;
1227     }
1228     else
1229     {
1230       push @jobstep_todo, $#jobstep;
1231     }
1232   }
1233
1234   foreach (qw (script script_version script_parameters))
1235   {
1236     $Job->{$_} = $frozenjob->{$_};
1237   }
1238   $Job->save;
1239 }
1240
1241
1242 sub freezequote
1243 {
1244   my $s = shift;
1245   $s =~ s/\\/\\\\/g;
1246   $s =~ s/\n/\\n/g;
1247   return $s;
1248 }
1249
1250
1251 sub freezeunquote
1252 {
1253   my $s = shift;
1254   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1255   return $s;
1256 }
1257
1258
1259 sub srun
1260 {
1261   my $srunargs = shift;
1262   my $execargs = shift;
1263   my $opts = shift || {};
1264   my $stdin = shift;
1265   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1266   print STDERR (join (" ",
1267                       map { / / ? "'$_'" : $_ }
1268                       (@$args)),
1269                 "\n")
1270       if $ENV{CRUNCH_DEBUG};
1271
1272   if (defined $stdin) {
1273     my $child = open STDIN, "-|";
1274     defined $child or die "no fork: $!";
1275     if ($child == 0) {
1276       print $stdin or die $!;
1277       close STDOUT or die $!;
1278       exit 0;
1279     }
1280   }
1281
1282   return system (@$args) if $opts->{fork};
1283
1284   exec @$args;
1285   warn "ENV size is ".length(join(" ",%ENV));
1286   die "exec failed: $!: @$args";
1287 }
1288
1289
1290 sub ban_node_by_slot {
1291   # Don't start any new jobsteps on this node for 60 seconds
1292   my $slotid = shift;
1293   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1294   $slot[$slotid]->{node}->{hold_count}++;
1295   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1296 }
1297
1298 __DATA__
1299 #!/usr/bin/perl
1300
1301 # checkout-and-build
1302
1303 use Fcntl ':flock';
1304
1305 my $destdir = $ENV{"CRUNCH_SRC"};
1306 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1307 my $repo = $ENV{"CRUNCH_SRC_URL"};
1308
1309 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1310 flock L, LOCK_EX;
1311 if (readlink ("$destdir.commit") eq $commit) {
1312     exit 0;
1313 }
1314
1315 unlink "$destdir.commit";
1316 open STDOUT, ">", "$destdir.log";
1317 open STDERR, ">&STDOUT";
1318
1319 mkdir $destdir;
1320 open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1321 print TARX <DATA>;
1322 if(!close(TARX)) {
1323   die "'tar -C $destdir -xf -' exited $?: $!";
1324 }
1325
1326 my $pwd;
1327 chomp ($pwd = `pwd`);
1328 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1329 mkdir $install_dir;
1330 if (!-e "./install.sh" && -e "./tests/autotests.sh") {
1331     # Old version
1332     shell_or_die ("./tests/autotests.sh", $install_dir);
1333 } elsif (-e "./install.sh") {
1334     shell_or_die ("./install.sh", $install_dir);
1335 }
1336
1337 if ($commit) {
1338     unlink "$destdir.commit.new";
1339     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1340     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1341 }
1342
1343 close L;
1344
1345 exit 0;
1346
1347 sub shell_or_die
1348 {
1349   if ($ENV{"DEBUG"}) {
1350     print STDERR "@_\n";
1351   }
1352   system (@_) == 0
1353       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1354 }
1355
1356 __DATA__