Use quotemeta to protect shell escapes (refs #2221, #2325)
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
404   # Todo: let script_version specify repository instead of expecting
405   # parent process to figure it out.
406   $ENV{"CRUNCH_SRC_URL"} = $repo;
407
408   # Create/update our clone of the remote git repo
409
410   if (!-d $ENV{"CRUNCH_SRC"}) {
411     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
412         or croak ("git clone $repo failed: exit ".($?>>8));
413     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
414   }
415   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
416
417   # If this looks like a subversion r#, look for it in git-svn commit messages
418
419   if ($treeish =~ m{^\d{1,4}$}) {
420     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
421     chomp $gitlog;
422     if ($gitlog =~ /^[a-f0-9]{40}$/) {
423       $commit = $gitlog;
424       Log (undef, "Using commit $commit for script_version $treeish");
425     }
426   }
427
428   # If that didn't work, try asking git to look it up as a tree-ish.
429
430   if (!defined $commit) {
431
432     my $cooked_treeish = $treeish;
433     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
434       # Looks like a git branch name -- make sure git knows it's
435       # relative to the remote repo
436       $cooked_treeish = "origin/$treeish";
437     }
438
439     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
440     chomp $found;
441     if ($found =~ /^[0-9a-f]{40}$/s) {
442       $commit = $found;
443       if ($commit ne $treeish) {
444         # Make sure we record the real commit id in the database,
445         # frozentokey, logs, etc. -- instead of an abbreviation or a
446         # branch name which can become ambiguous or point to a
447         # different commit in the future.
448         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
449         Log (undef, "Using commit $commit for tree-ish $treeish");
450         if ($commit ne $treeish) {
451           $Job->{'script_version'} = $commit;
452           !$job_has_uuid or
453               $Job->update_attributes('script_version' => $commit) or
454               croak("Error while updating job");
455         }
456       }
457     }
458   }
459
460   if (defined $commit) {
461     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
462     @execargs = ("sh", "-c",
463                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
464     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
465   }
466   else {
467     croak ("could not figure out commit id for $treeish");
468   }
469
470   my $installpid = fork();
471   if ($installpid == 0)
472   {
473     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
474     exit (1);
475   }
476   while (1)
477   {
478     last if $installpid == waitpid (-1, WNOHANG);
479     freeze_if_want_freeze ($installpid);
480     select (undef, undef, undef, 0.1);
481   }
482   Log (undef, "Install exited $?");
483 }
484
485 if (!$have_slurm)
486 {
487   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
488   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
489 }
490
491
492
493 foreach (qw (script script_version script_parameters runtime_constraints))
494 {
495   Log (undef,
496        "$_ " .
497        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
498 }
499 foreach (split (/\n/, $Job->{knobs}))
500 {
501   Log (undef, "knob " . $_);
502 }
503
504
505
506 $main::success = undef;
507
508
509
510 ONELEVEL:
511
512 my $thisround_succeeded = 0;
513 my $thisround_failed = 0;
514 my $thisround_failed_multiple = 0;
515
516 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
517                        or $a <=> $b } @jobstep_todo;
518 my $level = $jobstep[$jobstep_todo[0]]->{level};
519 Log (undef, "start level $level");
520
521
522
523 my %proc;
524 my @freeslot = (0..$#slot);
525 my @holdslot;
526 my %reader;
527 my $progress_is_dirty = 1;
528 my $progress_stats_updated = 0;
529
530 update_progress_stats();
531
532
533
534 THISROUND:
535 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
536 {
537   my $id = $jobstep_todo[$todo_ptr];
538   my $Jobstep = $jobstep[$id];
539   if ($Jobstep->{level} != $level)
540   {
541     next;
542   }
543
544   pipe $reader{$id}, "writer" or croak ($!);
545   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
546   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
547
548   my $childslot = $freeslot[0];
549   my $childnode = $slot[$childslot]->{node};
550   my $childslotname = join (".",
551                             $slot[$childslot]->{node}->{name},
552                             $slot[$childslot]->{cpu});
553   my $childpid = fork();
554   if ($childpid == 0)
555   {
556     $SIG{'INT'} = 'DEFAULT';
557     $SIG{'QUIT'} = 'DEFAULT';
558     $SIG{'TERM'} = 'DEFAULT';
559
560     foreach (values (%reader))
561     {
562       close($_);
563     }
564     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
565     open(STDOUT,">&writer");
566     open(STDERR,">&writer");
567
568     undef $dbh;
569     undef $sth;
570
571     delete $ENV{"GNUPGHOME"};
572     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
573     $ENV{"TASK_QSEQUENCE"} = $id;
574     $ENV{"TASK_SEQUENCE"} = $level;
575     $ENV{"JOB_SCRIPT"} = $Job->{script};
576     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
577       $param =~ tr/a-z/A-Z/;
578       $ENV{"JOB_PARAMETER_$param"} = $value;
579     }
580     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
581     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
582     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
583     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
584     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
585     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
586     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
587
588     $ENV{"GZIP"} = "-n";
589
590     my @srunargs = (
591       "srun",
592       "--nodelist=".$childnode->{name},
593       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
594       "--job-name=$job_id.$id.$$",
595         );
596     my @execargs = qw(sh);
597     my $build_script_to_send = "";
598     my $command =
599         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
600         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
601         ."&& cd $ENV{CRUNCH_TMP} ";
602     if ($build_script)
603     {
604       $build_script_to_send = $build_script;
605       $command .=
606           "&& perl -";
607     }
608     $command .=
609         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
610     my @execargs = ('bash', '-c', $command);
611     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
612     exit (111);
613   }
614   close("writer");
615   if (!defined $childpid)
616   {
617     close $reader{$id};
618     delete $reader{$id};
619     next;
620   }
621   shift @freeslot;
622   $proc{$childpid} = { jobstep => $id,
623                        time => time,
624                        slot => $childslot,
625                        jobstepname => "$job_id.$id.$childpid",
626                      };
627   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
628   $slot[$childslot]->{pid} = $childpid;
629
630   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
631   Log ($id, "child $childpid started on $childslotname");
632   $Jobstep->{starttime} = time;
633   $Jobstep->{node} = $childnode->{name};
634   $Jobstep->{slotindex} = $childslot;
635   delete $Jobstep->{stderr};
636   delete $Jobstep->{finishtime};
637
638   splice @jobstep_todo, $todo_ptr, 1;
639   --$todo_ptr;
640
641   $progress_is_dirty = 1;
642
643   while (!@freeslot
644          ||
645          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
646   {
647     last THISROUND if $main::please_freeze;
648     if ($main::please_info)
649     {
650       $main::please_info = 0;
651       freeze();
652       collate_output();
653       save_meta(1);
654       update_progress_stats();
655     }
656     my $gotsome
657         = readfrompipes ()
658         + reapchildren ();
659     if (!$gotsome)
660     {
661       check_refresh_wanted();
662       check_squeue();
663       update_progress_stats();
664       select (undef, undef, undef, 0.1);
665     }
666     elsif (time - $progress_stats_updated >= 30)
667     {
668       update_progress_stats();
669     }
670     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
671         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
672     {
673       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
674           .($thisround_failed+$thisround_succeeded)
675           .") -- giving up on this round";
676       Log (undef, $message);
677       last THISROUND;
678     }
679
680     # move slots from freeslot to holdslot (or back to freeslot) if necessary
681     for (my $i=$#freeslot; $i>=0; $i--) {
682       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
683         push @holdslot, (splice @freeslot, $i, 1);
684       }
685     }
686     for (my $i=$#holdslot; $i>=0; $i--) {
687       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
688         push @freeslot, (splice @holdslot, $i, 1);
689       }
690     }
691
692     # give up if no nodes are succeeding
693     if (!grep { $_->{node}->{losing_streak} == 0 &&
694                     $_->{node}->{hold_count} < 4 } @slot) {
695       my $message = "Every node has failed -- giving up on this round";
696       Log (undef, $message);
697       last THISROUND;
698     }
699   }
700 }
701
702
703 push @freeslot, splice @holdslot;
704 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
705
706
707 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
708 while (%proc)
709 {
710   if ($main::please_continue) {
711     $main::please_continue = 0;
712     goto THISROUND;
713   }
714   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
715   readfrompipes ();
716   if (!reapchildren())
717   {
718     check_refresh_wanted();
719     check_squeue();
720     update_progress_stats();
721     select (undef, undef, undef, 0.1);
722     killem (keys %proc) if $main::please_freeze;
723   }
724 }
725
726 update_progress_stats();
727 freeze_if_want_freeze();
728
729
730 if (!defined $main::success)
731 {
732   if (@jobstep_todo &&
733       $thisround_succeeded == 0 &&
734       ($thisround_failed == 0 || $thisround_failed > 4))
735   {
736     my $message = "stop because $thisround_failed tasks failed and none succeeded";
737     Log (undef, $message);
738     $main::success = 0;
739   }
740   if (!@jobstep_todo)
741   {
742     $main::success = 1;
743   }
744 }
745
746 goto ONELEVEL if !defined $main::success;
747
748
749 release_allocation();
750 freeze();
751 if ($job_has_uuid) {
752   $Job->update_attributes('output' => &collate_output(),
753                           'running' => 0,
754                           'success' => $Job->{'output'} && $main::success,
755                           'finished_at' => scalar gmtime)
756 }
757
758 if ($Job->{'output'})
759 {
760   eval {
761     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
762     $arv->{'collections'}->{'create'}->execute('collection' => {
763       'uuid' => $Job->{'output'},
764       'manifest_text' => $manifest_text,
765     });
766   };
767   if ($@) {
768     Log (undef, "Failed to register output manifest: $@");
769   }
770 }
771
772 Log (undef, "finish");
773
774 save_meta();
775 exit 0;
776
777
778
779 sub update_progress_stats
780 {
781   $progress_stats_updated = time;
782   return if !$progress_is_dirty;
783   my ($todo, $done, $running) = (scalar @jobstep_todo,
784                                  scalar @jobstep_done,
785                                  scalar @slot - scalar @freeslot - scalar @holdslot);
786   $Job->{'tasks_summary'} ||= {};
787   $Job->{'tasks_summary'}->{'todo'} = $todo;
788   $Job->{'tasks_summary'}->{'done'} = $done;
789   $Job->{'tasks_summary'}->{'running'} = $running;
790   if ($job_has_uuid) {
791     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
792   }
793   Log (undef, "status: $done done, $running running, $todo todo");
794   $progress_is_dirty = 0;
795 }
796
797
798
799 sub reapchildren
800 {
801   my $pid = waitpid (-1, WNOHANG);
802   return 0 if $pid <= 0;
803
804   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
805                   . "."
806                   . $slot[$proc{$pid}->{slot}]->{cpu});
807   my $jobstepid = $proc{$pid}->{jobstep};
808   my $elapsed = time - $proc{$pid}->{time};
809   my $Jobstep = $jobstep[$jobstepid];
810
811   my $childstatus = $?;
812   my $exitvalue = $childstatus >> 8;
813   my $exitinfo = sprintf("exit %d signal %d%s",
814                          $exitvalue,
815                          $childstatus & 127,
816                          ($childstatus & 128 ? ' core dump' : ''));
817   $Jobstep->{'arvados_task'}->reload;
818   my $task_success = $Jobstep->{'arvados_task'}->{success};
819
820   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
821
822   if (!defined $task_success) {
823     # task did not indicate one way or the other --> fail
824     $Jobstep->{'arvados_task'}->{success} = 0;
825     $Jobstep->{'arvados_task'}->save;
826     $task_success = 0;
827   }
828
829   if (!$task_success)
830   {
831     my $temporary_fail;
832     $temporary_fail ||= $Jobstep->{node_fail};
833     $temporary_fail ||= ($exitvalue == 111);
834
835     ++$thisround_failed;
836     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
837
838     # Check for signs of a failed or misconfigured node
839     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
840         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
841       # Don't count this against jobstep failure thresholds if this
842       # node is already suspected faulty and srun exited quickly
843       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
844           $elapsed < 5) {
845         Log ($jobstepid, "blaming failure on suspect node " .
846              $slot[$proc{$pid}->{slot}]->{node}->{name});
847         $temporary_fail ||= 1;
848       }
849       ban_node_by_slot($proc{$pid}->{slot});
850     }
851
852     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
853                              ++$Jobstep->{'failures'},
854                              $temporary_fail ? 'temporary ' : 'permanent',
855                              $elapsed));
856
857     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
858       # Give up on this task, and the whole job
859       $main::success = 0;
860       $main::please_freeze = 1;
861     }
862     else {
863       # Put this task back on the todo queue
864       push @jobstep_todo, $jobstepid;
865     }
866     $Job->{'tasks_summary'}->{'failed'}++;
867   }
868   else
869   {
870     ++$thisround_succeeded;
871     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
872     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
873     push @jobstep_done, $jobstepid;
874     Log ($jobstepid, "success in $elapsed seconds");
875   }
876   $Jobstep->{exitcode} = $childstatus;
877   $Jobstep->{finishtime} = time;
878   process_stderr ($jobstepid, $task_success);
879   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
880
881   close $reader{$jobstepid};
882   delete $reader{$jobstepid};
883   delete $slot[$proc{$pid}->{slot}]->{pid};
884   push @freeslot, $proc{$pid}->{slot};
885   delete $proc{$pid};
886
887   # Load new tasks
888   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
889     'where' => {
890       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
891     },
892     'order' => 'qsequence'
893   );
894   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
895     my $jobstep = {
896       'level' => $arvados_task->{'sequence'},
897       'failures' => 0,
898       'arvados_task' => $arvados_task
899     };
900     push @jobstep, $jobstep;
901     push @jobstep_todo, $#jobstep;
902   }
903
904   $progress_is_dirty = 1;
905   1;
906 }
907
908 sub check_refresh_wanted
909 {
910   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
911   if (@stat && $stat[9] > $latest_refresh) {
912     $latest_refresh = scalar time;
913     if ($job_has_uuid) {
914       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
915       for my $attr ('cancelled_at',
916                     'cancelled_by_user_uuid',
917                     'cancelled_by_client_uuid') {
918         $Job->{$attr} = $Job2->{$attr};
919       }
920       if ($Job->{'cancelled_at'}) {
921         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
922              " by user " . $Job->{cancelled_by_user_uuid});
923         $main::success = 0;
924         $main::please_freeze = 1;
925       }
926     }
927   }
928 }
929
930 sub check_squeue
931 {
932   # return if the kill list was checked <4 seconds ago
933   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
934   {
935     return;
936   }
937   $squeue_kill_checked = time;
938
939   # use killem() on procs whose killtime is reached
940   for (keys %proc)
941   {
942     if (exists $proc{$_}->{killtime}
943         && $proc{$_}->{killtime} <= time)
944     {
945       killem ($_);
946     }
947   }
948
949   # return if the squeue was checked <60 seconds ago
950   if (defined $squeue_checked && $squeue_checked > time - 60)
951   {
952     return;
953   }
954   $squeue_checked = time;
955
956   if (!$have_slurm)
957   {
958     # here is an opportunity to check for mysterious problems with local procs
959     return;
960   }
961
962   # get a list of steps still running
963   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
964   chop @squeue;
965   if ($squeue[-1] ne "ok")
966   {
967     return;
968   }
969   pop @squeue;
970
971   # which of my jobsteps are running, according to squeue?
972   my %ok;
973   foreach (@squeue)
974   {
975     if (/^(\d+)\.(\d+) (\S+)/)
976     {
977       if ($1 eq $ENV{SLURM_JOBID})
978       {
979         $ok{$3} = 1;
980       }
981     }
982   }
983
984   # which of my active child procs (>60s old) were not mentioned by squeue?
985   foreach (keys %proc)
986   {
987     if ($proc{$_}->{time} < time - 60
988         && !exists $ok{$proc{$_}->{jobstepname}}
989         && !exists $proc{$_}->{killtime})
990     {
991       # kill this proc if it hasn't exited in 30 seconds
992       $proc{$_}->{killtime} = time + 30;
993     }
994   }
995 }
996
997
998 sub release_allocation
999 {
1000   if ($have_slurm)
1001   {
1002     Log (undef, "release job allocation");
1003     system "scancel $ENV{SLURM_JOBID}";
1004   }
1005 }
1006
1007
1008 sub readfrompipes
1009 {
1010   my $gotsome = 0;
1011   foreach my $job (keys %reader)
1012   {
1013     my $buf;
1014     while (0 < sysread ($reader{$job}, $buf, 8192))
1015     {
1016       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1017       $jobstep[$job]->{stderr} .= $buf;
1018       preprocess_stderr ($job);
1019       if (length ($jobstep[$job]->{stderr}) > 16384)
1020       {
1021         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1022       }
1023       $gotsome = 1;
1024     }
1025   }
1026   return $gotsome;
1027 }
1028
1029
1030 sub preprocess_stderr
1031 {
1032   my $job = shift;
1033
1034   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1035     my $line = $1;
1036     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1037     Log ($job, "stderr $line");
1038     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1039       # whoa.
1040       $main::please_freeze = 1;
1041     }
1042     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1043       $jobstep[$job]->{node_fail} = 1;
1044       ban_node_by_slot($jobstep[$job]->{slotindex});
1045     }
1046   }
1047 }
1048
1049
1050 sub process_stderr
1051 {
1052   my $job = shift;
1053   my $task_success = shift;
1054   preprocess_stderr ($job);
1055
1056   map {
1057     Log ($job, "stderr $_");
1058   } split ("\n", $jobstep[$job]->{stderr});
1059 }
1060
1061 sub fetch_block
1062 {
1063   my $hash = shift;
1064   my ($child_out, $child_in, $output_block);
1065
1066   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
1067   sysread($child_out, $output_block, 64 * 1024 * 1024);
1068   waitpid($pid, 0);
1069   return $output_block;
1070 }
1071
1072 sub collate_output
1073 {
1074   Log (undef, "collate");
1075
1076   my ($child_out, $child_in);
1077   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1078   my $joboutput;
1079   for (@jobstep)
1080   {
1081     next if (!exists $_->{'arvados_task'}->{output} ||
1082              !$_->{'arvados_task'}->{'success'} ||
1083              $_->{'exitcode'} != 0);
1084     my $output = $_->{'arvados_task'}->{output};
1085     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1086     {
1087       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1088       print $child_in $output;
1089     }
1090     elsif (@jobstep == 1)
1091     {
1092       $joboutput = $output;
1093       last;
1094     }
1095     elsif (defined (my $outblock = fetch_block ($output)))
1096     {
1097       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1098       print $child_in $outblock;
1099     }
1100     else
1101     {
1102       Log (undef, "XXX fetch_block($output) failed XXX");
1103       $main::success = 0;
1104     }
1105   }
1106   $child_in->close;
1107
1108   if (!defined $joboutput) {
1109     my $s = IO::Select->new($child_out);
1110     if ($s->can_read(120)) {
1111       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1112     } else {
1113       Log (undef, "timed out reading from 'arv keep put'");
1114     }
1115   }
1116   waitpid($pid, 0);
1117
1118   if ($joboutput)
1119   {
1120     Log (undef, "output $joboutput");
1121     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1122   }
1123   else
1124   {
1125     Log (undef, "output undef");
1126   }
1127   return $joboutput;
1128 }
1129
1130
1131 sub killem
1132 {
1133   foreach (@_)
1134   {
1135     my $sig = 2;                # SIGINT first
1136     if (exists $proc{$_}->{"sent_$sig"} &&
1137         time - $proc{$_}->{"sent_$sig"} > 4)
1138     {
1139       $sig = 15;                # SIGTERM if SIGINT doesn't work
1140     }
1141     if (exists $proc{$_}->{"sent_$sig"} &&
1142         time - $proc{$_}->{"sent_$sig"} > 4)
1143     {
1144       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1145     }
1146     if (!exists $proc{$_}->{"sent_$sig"})
1147     {
1148       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1149       kill $sig, $_;
1150       select (undef, undef, undef, 0.1);
1151       if ($sig == 2)
1152       {
1153         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1154       }
1155       $proc{$_}->{"sent_$sig"} = time;
1156       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1157     }
1158   }
1159 }
1160
1161
1162 sub fhbits
1163 {
1164   my($bits);
1165   for (@_) {
1166     vec($bits,fileno($_),1) = 1;
1167   }
1168   $bits;
1169 }
1170
1171
1172 sub Log                         # ($jobstep_id, $logmessage)
1173 {
1174   if ($_[1] =~ /\n/) {
1175     for my $line (split (/\n/, $_[1])) {
1176       Log ($_[0], $line);
1177     }
1178     return;
1179   }
1180   my $fh = select STDERR; $|=1; select $fh;
1181   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1182   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1183   $message .= "\n";
1184   my $datetime;
1185   if ($metastream || -t STDERR) {
1186     my @gmtime = gmtime;
1187     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1188                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1189   }
1190   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1191
1192   if ($metastream) {
1193     print $metastream $datetime . " " . $message;
1194   }
1195 }
1196
1197
1198 sub croak
1199 {
1200   my ($package, $file, $line) = caller;
1201   my $message = "@_ at $file line $line\n";
1202   Log (undef, $message);
1203   freeze() if @jobstep_todo;
1204   collate_output() if @jobstep_todo;
1205   cleanup();
1206   save_meta() if $metastream;
1207   die;
1208 }
1209
1210
1211 sub cleanup
1212 {
1213   return if !$job_has_uuid;
1214   $Job->update_attributes('running' => 0,
1215                           'success' => 0,
1216                           'finished_at' => scalar gmtime);
1217 }
1218
1219
1220 sub save_meta
1221 {
1222   my $justcheckpoint = shift; # false if this will be the last meta saved
1223   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1224
1225   $local_logfile->flush;
1226   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1227       . quotemeta($local_logfile->filename);
1228   my $loglocator = `$cmd`;
1229   die "system $cmd failed: $?" if $?;
1230
1231   $local_logfile = undef;   # the temp file is automatically deleted
1232   Log (undef, "log manifest is $loglocator");
1233   $Job->{'log'} = $loglocator;
1234   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1235 }
1236
1237
1238 sub freeze_if_want_freeze
1239 {
1240   if ($main::please_freeze)
1241   {
1242     release_allocation();
1243     if (@_)
1244     {
1245       # kill some srun procs before freeze+stop
1246       map { $proc{$_} = {} } @_;
1247       while (%proc)
1248       {
1249         killem (keys %proc);
1250         select (undef, undef, undef, 0.1);
1251         my $died;
1252         while (($died = waitpid (-1, WNOHANG)) > 0)
1253         {
1254           delete $proc{$died};
1255         }
1256       }
1257     }
1258     freeze();
1259     collate_output();
1260     cleanup();
1261     save_meta();
1262     exit 0;
1263   }
1264 }
1265
1266
1267 sub freeze
1268 {
1269   Log (undef, "Freeze not implemented");
1270   return;
1271 }
1272
1273
1274 sub thaw
1275 {
1276   croak ("Thaw not implemented");
1277 }
1278
1279
1280 sub freezequote
1281 {
1282   my $s = shift;
1283   $s =~ s/\\/\\\\/g;
1284   $s =~ s/\n/\\n/g;
1285   return $s;
1286 }
1287
1288
1289 sub freezeunquote
1290 {
1291   my $s = shift;
1292   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1293   return $s;
1294 }
1295
1296
1297 sub srun
1298 {
1299   my $srunargs = shift;
1300   my $execargs = shift;
1301   my $opts = shift || {};
1302   my $stdin = shift;
1303   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1304   print STDERR (join (" ",
1305                       map { / / ? "'$_'" : $_ }
1306                       (@$args)),
1307                 "\n")
1308       if $ENV{CRUNCH_DEBUG};
1309
1310   if (defined $stdin) {
1311     my $child = open STDIN, "-|";
1312     defined $child or die "no fork: $!";
1313     if ($child == 0) {
1314       print $stdin or die $!;
1315       close STDOUT or die $!;
1316       exit 0;
1317     }
1318   }
1319
1320   return system (@$args) if $opts->{fork};
1321
1322   exec @$args;
1323   warn "ENV size is ".length(join(" ",%ENV));
1324   die "exec failed: $!: @$args";
1325 }
1326
1327
1328 sub ban_node_by_slot {
1329   # Don't start any new jobsteps on this node for 60 seconds
1330   my $slotid = shift;
1331   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1332   $slot[$slotid]->{node}->{hold_count}++;
1333   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1334 }
1335
1336 sub must_lock_now
1337 {
1338   my ($lockfile, $error_message) = @_;
1339   open L, ">", $lockfile or croak("$lockfile: $!");
1340   if (!flock L, LOCK_EX|LOCK_NB) {
1341     croak("Can't lock $lockfile: $error_message\n");
1342   }
1343 }
1344
1345 __DATA__
1346 #!/usr/bin/perl
1347
1348 # checkout-and-build
1349
1350 use Fcntl ':flock';
1351
1352 my $destdir = $ENV{"CRUNCH_SRC"};
1353 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1354 my $repo = $ENV{"CRUNCH_SRC_URL"};
1355
1356 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1357 flock L, LOCK_EX;
1358 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1359     exit 0;
1360 }
1361
1362 unlink "$destdir.commit";
1363 open STDOUT, ">", "$destdir.log";
1364 open STDERR, ">&STDOUT";
1365
1366 mkdir $destdir;
1367 my @git_archive_data = <DATA>;
1368 if (@git_archive_data) {
1369   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1370   print TARX @git_archive_data;
1371   if(!close(TARX)) {
1372     die "'tar -C $destdir -xf -' exited $?: $!";
1373   }
1374 }
1375
1376 my $pwd;
1377 chomp ($pwd = `pwd`);
1378 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1379 mkdir $install_dir;
1380
1381 for my $src_path ("$destdir/arvados/sdk/python") {
1382   if (-d $src_path) {
1383     shell_or_die ("virtualenv", $install_dir);
1384     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1385   }
1386 }
1387
1388 if (-e "$destdir/crunch_scripts/install") {
1389     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1390 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1391     # Old version
1392     shell_or_die ("./tests/autotests.sh", $install_dir);
1393 } elsif (-e "./install.sh") {
1394     shell_or_die ("./install.sh", $install_dir);
1395 }
1396
1397 if ($commit) {
1398     unlink "$destdir.commit.new";
1399     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1400     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1401 }
1402
1403 close L;
1404
1405 exit 0;
1406
1407 sub shell_or_die
1408 {
1409   if ($ENV{"DEBUG"}) {
1410     print STDERR "@_\n";
1411   }
1412   system (@_) == 0
1413       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1414 }
1415
1416 __DATA__