Semicolon fix (refs #2325, refs #2221).
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
404   # Todo: let script_version specify repository instead of expecting
405   # parent process to figure it out.
406   $ENV{"CRUNCH_SRC_URL"} = $repo;
407
408   # Create/update our clone of the remote git repo
409
410   if (!-d $ENV{"CRUNCH_SRC"}) {
411     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
412         or croak ("git clone $repo failed: exit ".($?>>8));
413     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
414   }
415   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
416
417   # If this looks like a subversion r#, look for it in git-svn commit messages
418
419   if ($treeish =~ m{^\d{1,4}$}) {
420     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
421     chomp $gitlog;
422     if ($gitlog =~ /^[a-f0-9]{40}$/) {
423       $commit = $gitlog;
424       Log (undef, "Using commit $commit for script_version $treeish");
425     }
426   }
427
428   # If that didn't work, try asking git to look it up as a tree-ish.
429
430   if (!defined $commit) {
431
432     my $cooked_treeish = $treeish;
433     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
434       # Looks like a git branch name -- make sure git knows it's
435       # relative to the remote repo
436       $cooked_treeish = "origin/$treeish";
437     }
438
439     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
440     chomp $found;
441     if ($found =~ /^[0-9a-f]{40}$/s) {
442       $commit = $found;
443       if ($commit ne $treeish) {
444         # Make sure we record the real commit id in the database,
445         # frozentokey, logs, etc. -- instead of an abbreviation or a
446         # branch name which can become ambiguous or point to a
447         # different commit in the future.
448         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
449         Log (undef, "Using commit $commit for tree-ish $treeish");
450         if ($commit ne $treeish) {
451           $Job->{'script_version'} = $commit;
452           !$job_has_uuid or
453               $Job->update_attributes('script_version' => $commit) or
454               croak("Error while updating job");
455         }
456       }
457     }
458   }
459
460   if (defined $commit) {
461     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
462     @execargs = ("sh", "-c",
463                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
464     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
465   }
466   else {
467     croak ("could not figure out commit id for $treeish");
468   }
469
470   my $installpid = fork();
471   if ($installpid == 0)
472   {
473     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
474     exit (1);
475   }
476   while (1)
477   {
478     last if $installpid == waitpid (-1, WNOHANG);
479     freeze_if_want_freeze ($installpid);
480     select (undef, undef, undef, 0.1);
481   }
482   Log (undef, "Install exited $?");
483 }
484
485 if (!$have_slurm)
486 {
487   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
488   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
489 }
490
491
492
493 foreach (qw (script script_version script_parameters runtime_constraints))
494 {
495   Log (undef,
496        "$_ " .
497        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
498 }
499 foreach (split (/\n/, $Job->{knobs}))
500 {
501   Log (undef, "knob " . $_);
502 }
503
504
505
506 $main::success = undef;
507
508
509
510 ONELEVEL:
511
512 my $thisround_succeeded = 0;
513 my $thisround_failed = 0;
514 my $thisround_failed_multiple = 0;
515
516 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
517                        or $a <=> $b } @jobstep_todo;
518 my $level = $jobstep[$jobstep_todo[0]]->{level};
519 Log (undef, "start level $level");
520
521
522
523 my %proc;
524 my @freeslot = (0..$#slot);
525 my @holdslot;
526 my %reader;
527 my $progress_is_dirty = 1;
528 my $progress_stats_updated = 0;
529
530 update_progress_stats();
531
532
533
534 THISROUND:
535 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
536 {
537   my $id = $jobstep_todo[$todo_ptr];
538   my $Jobstep = $jobstep[$id];
539   if ($Jobstep->{level} != $level)
540   {
541     next;
542   }
543
544   pipe $reader{$id}, "writer" or croak ($!);
545   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
546   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
547
548   my $childslot = $freeslot[0];
549   my $childnode = $slot[$childslot]->{node};
550   my $childslotname = join (".",
551                             $slot[$childslot]->{node}->{name},
552                             $slot[$childslot]->{cpu});
553   my $childpid = fork();
554   if ($childpid == 0)
555   {
556     $SIG{'INT'} = 'DEFAULT';
557     $SIG{'QUIT'} = 'DEFAULT';
558     $SIG{'TERM'} = 'DEFAULT';
559
560     foreach (values (%reader))
561     {
562       close($_);
563     }
564     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
565     open(STDOUT,">&writer");
566     open(STDERR,">&writer");
567
568     undef $dbh;
569     undef $sth;
570
571     delete $ENV{"GNUPGHOME"};
572     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
573     $ENV{"TASK_QSEQUENCE"} = $id;
574     $ENV{"TASK_SEQUENCE"} = $level;
575     $ENV{"JOB_SCRIPT"} = $Job->{script};
576     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
577       $param =~ tr/a-z/A-Z/;
578       $ENV{"JOB_PARAMETER_$param"} = $value;
579     }
580     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
581     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
582     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
583     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
584     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
585     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
586     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
587
588     $ENV{"GZIP"} = "-n";
589
590     my @srunargs = (
591       "srun",
592       "--nodelist=".$childnode->{name},
593       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
594       "--job-name=$job_id.$id.$$",
595         );
596     my @execargs = qw(sh);
597     my $build_script_to_send = "";
598     my $command =
599         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
600         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
601         ."&& cd $ENV{CRUNCH_TMP} ";
602     if ($build_script)
603     {
604       $build_script_to_send = $build_script;
605       $command .=
606           "&& perl -";
607     }
608     $command .=
609         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
610     my @execargs = ('bash', '-c', $command);
611     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
612     exit (111);
613   }
614   close("writer");
615   if (!defined $childpid)
616   {
617     close $reader{$id};
618     delete $reader{$id};
619     next;
620   }
621   shift @freeslot;
622   $proc{$childpid} = { jobstep => $id,
623                        time => time,
624                        slot => $childslot,
625                        jobstepname => "$job_id.$id.$childpid",
626                      };
627   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
628   $slot[$childslot]->{pid} = $childpid;
629
630   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
631   Log ($id, "child $childpid started on $childslotname");
632   $Jobstep->{starttime} = time;
633   $Jobstep->{node} = $childnode->{name};
634   $Jobstep->{slotindex} = $childslot;
635   delete $Jobstep->{stderr};
636   delete $Jobstep->{finishtime};
637
638   splice @jobstep_todo, $todo_ptr, 1;
639   --$todo_ptr;
640
641   $progress_is_dirty = 1;
642
643   while (!@freeslot
644          ||
645          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
646   {
647     last THISROUND if $main::please_freeze;
648     if ($main::please_info)
649     {
650       $main::please_info = 0;
651       freeze();
652       collate_output();
653       save_meta(1);
654       update_progress_stats();
655     }
656     my $gotsome
657         = readfrompipes ()
658         + reapchildren ();
659     if (!$gotsome)
660     {
661       check_refresh_wanted();
662       check_squeue();
663       update_progress_stats();
664       select (undef, undef, undef, 0.1);
665     }
666     elsif (time - $progress_stats_updated >= 30)
667     {
668       update_progress_stats();
669     }
670     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
671         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
672     {
673       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
674           .($thisround_failed+$thisround_succeeded)
675           .") -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679
680     # move slots from freeslot to holdslot (or back to freeslot) if necessary
681     for (my $i=$#freeslot; $i>=0; $i--) {
682       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
683         push @holdslot, (splice @freeslot, $i, 1);
684       }
685     }
686     for (my $i=$#holdslot; $i>=0; $i--) {
687       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
688         push @freeslot, (splice @holdslot, $i, 1);
689       }
690     }
691
692     # give up if no nodes are succeeding
693     if (!grep { $_->{node}->{losing_streak} == 0 &&
694                     $_->{node}->{hold_count} < 4 } @slot) {
695       my $message = "Every node has failed -- giving up on this round";
696       Log (undef, $message);
697       last THISROUND;
698     }
699   }
700 }
701
702
703 push @freeslot, splice @holdslot;
704 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
705
706
707 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
708 while (%proc)
709 {
710   if ($main::please_continue) {
711     $main::please_continue = 0;
712     goto THISROUND;
713   }
714   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
715   readfrompipes ();
716   if (!reapchildren())
717   {
718     check_refresh_wanted();
719     check_squeue();
720     update_progress_stats();
721     select (undef, undef, undef, 0.1);
722     killem (keys %proc) if $main::please_freeze;
723   }
724 }
725
726 update_progress_stats();
727 freeze_if_want_freeze();
728
729
730 if (!defined $main::success)
731 {
732   if (@jobstep_todo &&
733       $thisround_succeeded == 0 &&
734       ($thisround_failed == 0 || $thisround_failed > 4))
735   {
736     my $message = "stop because $thisround_failed tasks failed and none succeeded";
737     Log (undef, $message);
738     $main::success = 0;
739   }
740   if (!@jobstep_todo)
741   {
742     $main::success = 1;
743   }
744 }
745
746 goto ONELEVEL if !defined $main::success;
747
748
749 release_allocation();
750 freeze();
751 if ($job_has_uuid) {
752   $Job->update_attributes('output' => &collate_output(),
753                           'running' => 0,
754                           'success' => $Job->{'output'} && $main::success,
755                           'finished_at' => scalar gmtime)
756 }
757
758 if ($Job->{'output'})
759 {
760   eval {
761     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
762     $arv->{'collections'}->{'create'}->execute('collection' => {
763       'uuid' => $Job->{'output'},
764       'manifest_text' => $manifest_text,
765     });
766   };
767   if ($@) {
768     Log (undef, "Failed to register output manifest: $@");
769   }
770 }
771
772 Log (undef, "finish");
773
774 save_meta();
775 exit 0;
776
777
778
779 sub update_progress_stats
780 {
781   $progress_stats_updated = time;
782   return if !$progress_is_dirty;
783   my ($todo, $done, $running) = (scalar @jobstep_todo,
784                                  scalar @jobstep_done,
785                                  scalar @slot - scalar @freeslot - scalar @holdslot);
786   $Job->{'tasks_summary'} ||= {};
787   $Job->{'tasks_summary'}->{'todo'} = $todo;
788   $Job->{'tasks_summary'}->{'done'} = $done;
789   $Job->{'tasks_summary'}->{'running'} = $running;
790   if ($job_has_uuid) {
791     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
792   }
793   Log (undef, "status: $done done, $running running, $todo todo");
794   $progress_is_dirty = 0;
795 }
796
797
798
799 sub reapchildren
800 {
801   my $pid = waitpid (-1, WNOHANG);
802   return 0 if $pid <= 0;
803
804   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
805                   . "."
806                   . $slot[$proc{$pid}->{slot}]->{cpu});
807   my $jobstepid = $proc{$pid}->{jobstep};
808   my $elapsed = time - $proc{$pid}->{time};
809   my $Jobstep = $jobstep[$jobstepid];
810
811   my $childstatus = $?;
812   my $exitvalue = $childstatus >> 8;
813   my $exitinfo = sprintf("exit %d signal %d%s",
814                          $exitvalue,
815                          $childstatus & 127,
816                          ($childstatus & 128 ? ' core dump' : ''));
817   $Jobstep->{'arvados_task'}->reload;
818   my $task_success = $Jobstep->{'arvados_task'}->{success};
819
820   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
821
822   if (!defined $task_success) {
823     # task did not indicate one way or the other --> fail
824     $Jobstep->{'arvados_task'}->{success} = 0;
825     $Jobstep->{'arvados_task'}->save;
826     $task_success = 0;
827   }
828
829   if (!$task_success)
830   {
831     my $temporary_fail;
832     $temporary_fail ||= $Jobstep->{node_fail};
833     $temporary_fail ||= ($exitvalue == 111);
834
835     ++$thisround_failed;
836     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
837
838     # Check for signs of a failed or misconfigured node
839     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
840         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
841       # Don't count this against jobstep failure thresholds if this
842       # node is already suspected faulty and srun exited quickly
843       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
844           $elapsed < 5) {
845         Log ($jobstepid, "blaming failure on suspect node " .
846              $slot[$proc{$pid}->{slot}]->{node}->{name});
847         $temporary_fail ||= 1;
848       }
849       ban_node_by_slot($proc{$pid}->{slot});
850     }
851
852     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
853                              ++$Jobstep->{'failures'},
854                              $temporary_fail ? 'temporary ' : 'permanent',
855                              $elapsed));
856
857     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
858       # Give up on this task, and the whole job
859       $main::success = 0;
860       $main::please_freeze = 1;
861     }
862     else {
863       # Put this task back on the todo queue
864       push @jobstep_todo, $jobstepid;
865     }
866     $Job->{'tasks_summary'}->{'failed'}++;
867   }
868   else
869   {
870     ++$thisround_succeeded;
871     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
872     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
873     push @jobstep_done, $jobstepid;
874     Log ($jobstepid, "success in $elapsed seconds");
875   }
876   $Jobstep->{exitcode} = $childstatus;
877   $Jobstep->{finishtime} = time;
878   process_stderr ($jobstepid, $task_success);
879   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
880
881   close $reader{$jobstepid};
882   delete $reader{$jobstepid};
883   delete $slot[$proc{$pid}->{slot}]->{pid};
884   push @freeslot, $proc{$pid}->{slot};
885   delete $proc{$pid};
886
887   # Load new tasks
888   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
889     'where' => {
890       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
891     },
892     'order' => 'qsequence'
893   );
894   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
895     my $jobstep = {
896       'level' => $arvados_task->{'sequence'},
897       'failures' => 0,
898       'arvados_task' => $arvados_task
899     };
900     push @jobstep, $jobstep;
901     push @jobstep_todo, $#jobstep;
902   }
903
904   $progress_is_dirty = 1;
905   1;
906 }
907
908 sub check_refresh_wanted
909 {
910   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
911   if (@stat && $stat[9] > $latest_refresh) {
912     $latest_refresh = scalar time;
913     if ($job_has_uuid) {
914       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
915       for my $attr ('cancelled_at',
916                     'cancelled_by_user_uuid',
917                     'cancelled_by_client_uuid') {
918         $Job->{$attr} = $Job2->{$attr};
919       }
920       if ($Job->{'cancelled_at'}) {
921         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
922              " by user " . $Job->{cancelled_by_user_uuid});
923         $main::success = 0;
924         $main::please_freeze = 1;
925       }
926     }
927   }
928 }
929
930 sub check_squeue
931 {
932   # return if the kill list was checked <4 seconds ago
933   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
934   {
935     return;
936   }
937   $squeue_kill_checked = time;
938
939   # use killem() on procs whose killtime is reached
940   for (keys %proc)
941   {
942     if (exists $proc{$_}->{killtime}
943         && $proc{$_}->{killtime} <= time)
944     {
945       killem ($_);
946     }
947   }
948
949   # return if the squeue was checked <60 seconds ago
950   if (defined $squeue_checked && $squeue_checked > time - 60)
951   {
952     return;
953   }
954   $squeue_checked = time;
955
956   if (!$have_slurm)
957   {
958     # here is an opportunity to check for mysterious problems with local procs
959     return;
960   }
961
962   # get a list of steps still running
963   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
964   chop @squeue;
965   if ($squeue[-1] ne "ok")
966   {
967     return;
968   }
969   pop @squeue;
970
971   # which of my jobsteps are running, according to squeue?
972   my %ok;
973   foreach (@squeue)
974   {
975     if (/^(\d+)\.(\d+) (\S+)/)
976     {
977       if ($1 eq $ENV{SLURM_JOBID})
978       {
979         $ok{$3} = 1;
980       }
981     }
982   }
983
984   # which of my active child procs (>60s old) were not mentioned by squeue?
985   foreach (keys %proc)
986   {
987     if ($proc{$_}->{time} < time - 60
988         && !exists $ok{$proc{$_}->{jobstepname}}
989         && !exists $proc{$_}->{killtime})
990     {
991       # kill this proc if it hasn't exited in 30 seconds
992       $proc{$_}->{killtime} = time + 30;
993     }
994   }
995 }
996
997
998 sub release_allocation
999 {
1000   if ($have_slurm)
1001   {
1002     Log (undef, "release job allocation");
1003     system "scancel $ENV{SLURM_JOBID}";
1004   }
1005 }
1006
1007
1008 sub readfrompipes
1009 {
1010   my $gotsome = 0;
1011   foreach my $job (keys %reader)
1012   {
1013     my $buf;
1014     while (0 < sysread ($reader{$job}, $buf, 8192))
1015     {
1016       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1017       $jobstep[$job]->{stderr} .= $buf;
1018       preprocess_stderr ($job);
1019       if (length ($jobstep[$job]->{stderr}) > 16384)
1020       {
1021         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1022       }
1023       $gotsome = 1;
1024     }
1025   }
1026   return $gotsome;
1027 }
1028
1029
1030 sub preprocess_stderr
1031 {
1032   my $job = shift;
1033
1034   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1035     my $line = $1;
1036     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1037     Log ($job, "stderr $line");
1038     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1039       # whoa.
1040       $main::please_freeze = 1;
1041     }
1042     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1043       $jobstep[$job]->{node_fail} = 1;
1044       ban_node_by_slot($jobstep[$job]->{slotindex});
1045     }
1046   }
1047 }
1048
1049
1050 sub process_stderr
1051 {
1052   my $job = shift;
1053   my $task_success = shift;
1054   preprocess_stderr ($job);
1055
1056   map {
1057     Log ($job, "stderr $_");
1058   } split ("\n", $jobstep[$job]->{stderr});
1059 }
1060
1061 sub fetch_block
1062 {
1063   my $hash = shift;
1064   my ($child_out, $output_block);
1065
1066   my $cmd = "arv keep get \Q$hash\E";
1067   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1068   sysread($keep, $output_block, 64 * 1024 * 1024);
1069   close $keep;
1070   return $output_block;
1071 }
1072
1073 sub collate_output
1074 {
1075   Log (undef, "collate");
1076
1077   my ($child_out, $child_in);
1078   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1079   my $joboutput;
1080   for (@jobstep)
1081   {
1082     next if (!exists $_->{'arvados_task'}->{output} ||
1083              !$_->{'arvados_task'}->{'success'} ||
1084              $_->{'exitcode'} != 0);
1085     my $output = $_->{'arvados_task'}->{output};
1086     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1087     {
1088       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1089       print $child_in $output;
1090     }
1091     elsif (@jobstep == 1)
1092     {
1093       $joboutput = $output;
1094       last;
1095     }
1096     elsif (defined (my $outblock = fetch_block ($output)))
1097     {
1098       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1099       print $child_in $outblock;
1100     }
1101     else
1102     {
1103       Log (undef, "XXX fetch_block($output) failed XXX");
1104       $main::success = 0;
1105     }
1106   }
1107   $child_in->close;
1108
1109   if (!defined $joboutput) {
1110     my $s = IO::Select->new($child_out);
1111     if ($s->can_read(120)) {
1112       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1113     } else {
1114       Log (undef, "timed out reading from 'arv keep put'");
1115     }
1116   }
1117   waitpid($pid, 0);
1118
1119   if ($joboutput)
1120   {
1121     Log (undef, "output $joboutput");
1122     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1123   }
1124   else
1125   {
1126     Log (undef, "output undef");
1127   }
1128   return $joboutput;
1129 }
1130
1131
1132 sub killem
1133 {
1134   foreach (@_)
1135   {
1136     my $sig = 2;                # SIGINT first
1137     if (exists $proc{$_}->{"sent_$sig"} &&
1138         time - $proc{$_}->{"sent_$sig"} > 4)
1139     {
1140       $sig = 15;                # SIGTERM if SIGINT doesn't work
1141     }
1142     if (exists $proc{$_}->{"sent_$sig"} &&
1143         time - $proc{$_}->{"sent_$sig"} > 4)
1144     {
1145       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1146     }
1147     if (!exists $proc{$_}->{"sent_$sig"})
1148     {
1149       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1150       kill $sig, $_;
1151       select (undef, undef, undef, 0.1);
1152       if ($sig == 2)
1153       {
1154         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1155       }
1156       $proc{$_}->{"sent_$sig"} = time;
1157       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1158     }
1159   }
1160 }
1161
1162
1163 sub fhbits
1164 {
1165   my($bits);
1166   for (@_) {
1167     vec($bits,fileno($_),1) = 1;
1168   }
1169   $bits;
1170 }
1171
1172
1173 sub Log                         # ($jobstep_id, $logmessage)
1174 {
1175   if ($_[1] =~ /\n/) {
1176     for my $line (split (/\n/, $_[1])) {
1177       Log ($_[0], $line);
1178     }
1179     return;
1180   }
1181   my $fh = select STDERR; $|=1; select $fh;
1182   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1183   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1184   $message .= "\n";
1185   my $datetime;
1186   if ($metastream || -t STDERR) {
1187     my @gmtime = gmtime;
1188     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1189                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1190   }
1191   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1192
1193   if ($metastream) {
1194     print $metastream $datetime . " " . $message;
1195   }
1196 }
1197
1198
1199 sub croak
1200 {
1201   my ($package, $file, $line) = caller;
1202   my $message = "@_ at $file line $line\n";
1203   Log (undef, $message);
1204   freeze() if @jobstep_todo;
1205   collate_output() if @jobstep_todo;
1206   cleanup();
1207   save_meta() if $metastream;
1208   die;
1209 }
1210
1211
1212 sub cleanup
1213 {
1214   return if !$job_has_uuid;
1215   $Job->update_attributes('running' => 0,
1216                           'success' => 0,
1217                           'finished_at' => scalar gmtime);
1218 }
1219
1220
1221 sub save_meta
1222 {
1223   my $justcheckpoint = shift; # false if this will be the last meta saved
1224   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1225
1226   $local_logfile->flush;
1227   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1228       . quotemeta($local_logfile->filename);
1229   my $loglocator = `$cmd`;
1230   die "system $cmd failed: $?" if $?;
1231
1232   $local_logfile = undef;   # the temp file is automatically deleted
1233   Log (undef, "log manifest is $loglocator");
1234   $Job->{'log'} = $loglocator;
1235   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1236 }
1237
1238
1239 sub freeze_if_want_freeze
1240 {
1241   if ($main::please_freeze)
1242   {
1243     release_allocation();
1244     if (@_)
1245     {
1246       # kill some srun procs before freeze+stop
1247       map { $proc{$_} = {} } @_;
1248       while (%proc)
1249       {
1250         killem (keys %proc);
1251         select (undef, undef, undef, 0.1);
1252         my $died;
1253         while (($died = waitpid (-1, WNOHANG)) > 0)
1254         {
1255           delete $proc{$died};
1256         }
1257       }
1258     }
1259     freeze();
1260     collate_output();
1261     cleanup();
1262     save_meta();
1263     exit 0;
1264   }
1265 }
1266
1267
1268 sub freeze
1269 {
1270   Log (undef, "Freeze not implemented");
1271   return;
1272 }
1273
1274
1275 sub thaw
1276 {
1277   croak ("Thaw not implemented");
1278 }
1279
1280
1281 sub freezequote
1282 {
1283   my $s = shift;
1284   $s =~ s/\\/\\\\/g;
1285   $s =~ s/\n/\\n/g;
1286   return $s;
1287 }
1288
1289
1290 sub freezeunquote
1291 {
1292   my $s = shift;
1293   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1294   return $s;
1295 }
1296
1297
1298 sub srun
1299 {
1300   my $srunargs = shift;
1301   my $execargs = shift;
1302   my $opts = shift || {};
1303   my $stdin = shift;
1304   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1305   print STDERR (join (" ",
1306                       map { / / ? "'$_'" : $_ }
1307                       (@$args)),
1308                 "\n")
1309       if $ENV{CRUNCH_DEBUG};
1310
1311   if (defined $stdin) {
1312     my $child = open STDIN, "-|";
1313     defined $child or die "no fork: $!";
1314     if ($child == 0) {
1315       print $stdin or die $!;
1316       close STDOUT or die $!;
1317       exit 0;
1318     }
1319   }
1320
1321   return system (@$args) if $opts->{fork};
1322
1323   exec @$args;
1324   warn "ENV size is ".length(join(" ",%ENV));
1325   die "exec failed: $!: @$args";
1326 }
1327
1328
1329 sub ban_node_by_slot {
1330   # Don't start any new jobsteps on this node for 60 seconds
1331   my $slotid = shift;
1332   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1333   $slot[$slotid]->{node}->{hold_count}++;
1334   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1335 }
1336
1337 sub must_lock_now
1338 {
1339   my ($lockfile, $error_message) = @_;
1340   open L, ">", $lockfile or croak("$lockfile: $!");
1341   if (!flock L, LOCK_EX|LOCK_NB) {
1342     croak("Can't lock $lockfile: $error_message\n");
1343   }
1344 }
1345
1346 __DATA__
1347 #!/usr/bin/perl
1348
1349 # checkout-and-build
1350
1351 use Fcntl ':flock';
1352
1353 my $destdir = $ENV{"CRUNCH_SRC"};
1354 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1355 my $repo = $ENV{"CRUNCH_SRC_URL"};
1356
1357 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1358 flock L, LOCK_EX;
1359 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1360     exit 0;
1361 }
1362
1363 unlink "$destdir.commit";
1364 open STDOUT, ">", "$destdir.log";
1365 open STDERR, ">&STDOUT";
1366
1367 mkdir $destdir;
1368 my @git_archive_data = <DATA>;
1369 if (@git_archive_data) {
1370   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1371   print TARX @git_archive_data;
1372   if(!close(TARX)) {
1373     die "'tar -C $destdir -xf -' exited $?: $!";
1374   }
1375 }
1376
1377 my $pwd;
1378 chomp ($pwd = `pwd`);
1379 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1380 mkdir $install_dir;
1381
1382 for my $src_path ("$destdir/arvados/sdk/python") {
1383   if (-d $src_path) {
1384     shell_or_die ("virtualenv", $install_dir);
1385     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1386   }
1387 }
1388
1389 if (-e "$destdir/crunch_scripts/install") {
1390     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1391 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1392     # Old version
1393     shell_or_die ("./tests/autotests.sh", $install_dir);
1394 } elsif (-e "./install.sh") {
1395     shell_or_die ("./install.sh", $install_dir);
1396 }
1397
1398 if ($commit) {
1399     unlink "$destdir.commit.new";
1400     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1401     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1402 }
1403
1404 close L;
1405
1406 exit 0;
1407
1408 sub shell_or_die
1409 {
1410   if ($ENV{"DEBUG"}) {
1411     print STDERR "@_\n";
1412   }
1413   system (@_) == 0
1414       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1415 }
1416
1417 __DATA__