Get logs and archives directly from the given git repo. Skip git-clone
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; indent-tabs-mode: nil; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =item --no-clear-tmp
37
38 Do not clear per-job/task temporary directories during initial job
39 setup. This can speed up development and debugging when running jobs
40 locally.
41
42 =back
43
44 =head1 RUNNING JOBS LOCALLY
45
46 crunch-job's log messages appear on stderr along with the job tasks'
47 stderr streams. The log is saved in Keep at each checkpoint and when
48 the job finishes.
49
50 If the job succeeds, the job's output locator is printed on stdout.
51
52 While the job is running, the following signals are accepted:
53
54 =over
55
56 =item control-C, SIGINT, SIGQUIT
57
58 Save a checkpoint, terminate any job tasks that are running, and stop.
59
60 =item SIGALRM
61
62 Save a checkpoint and continue.
63
64 =item SIGHUP
65
66 Refresh node allocation (i.e., check whether any nodes have been added
67 or unallocated) and attributes of the Job record that should affect
68 behavior (e.g., cancel job if cancelled_at becomes non-nil).
69
70 =back
71
72 =cut
73
74
75 use strict;
76 use POSIX ':sys_wait_h';
77 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
78 use Arvados;
79 use Getopt::Long;
80 use IPC::Open2;
81 use IO::Select;
82 use File::Temp;
83 use Fcntl ':flock';
84
85 $ENV{"TMPDIR"} ||= "/tmp";
86 unless (defined $ENV{"CRUNCH_TMP"}) {
87   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
88   if ($ENV{"USER"} ne "crunch" && $< != 0) {
89     # use a tmp dir unique for my uid
90     $ENV{"CRUNCH_TMP"} .= "-$<";
91   }
92 }
93 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
94 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
95 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
96 mkdir ($ENV{"JOB_WORK"});
97
98 my $force_unlock;
99 my $git_dir;
100 my $jobspec;
101 my $job_api_token;
102 my $no_clear_tmp;
103 my $resume_stash;
104 GetOptions('force-unlock' => \$force_unlock,
105            'git-dir=s' => \$git_dir,
106            'job=s' => \$jobspec,
107            'job-api-token=s' => \$job_api_token,
108            'no-clear-tmp' => \$no_clear_tmp,
109            'resume-stash=s' => \$resume_stash,
110     );
111
112 if (defined $job_api_token) {
113   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
114 }
115
116 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
117 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
118 my $local_job = !$job_has_uuid;
119
120
121 $SIG{'USR1'} = sub
122 {
123   $main::ENV{CRUNCH_DEBUG} = 1;
124 };
125 $SIG{'USR2'} = sub
126 {
127   $main::ENV{CRUNCH_DEBUG} = 0;
128 };
129
130
131
132 my $arv = Arvados->new('apiVersion' => 'v1');
133 my $metastream;
134
135 my $User = $arv->{'users'}->{'current'}->execute;
136
137 my $Job = {};
138 my $job_id;
139 my $dbh;
140 my $sth;
141 if ($job_has_uuid)
142 {
143   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
144   if (!$force_unlock) {
145     if ($Job->{'is_locked_by_uuid'}) {
146       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
147     }
148     if ($Job->{'success'} ne undef) {
149       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
150     }
151     if ($Job->{'running'}) {
152       croak("Job 'running' flag is already set");
153     }
154     if ($Job->{'started_at'}) {
155       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
156     }
157   }
158 }
159 else
160 {
161   $Job = JSON::decode_json($jobspec);
162
163   if (!$resume_stash)
164   {
165     map { croak ("No $_ specified") unless $Job->{$_} }
166     qw(script script_version script_parameters);
167   }
168
169   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
170   $Job->{'started_at'} = gmtime;
171
172   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
173
174   $job_has_uuid = 1;
175 }
176 $job_id = $Job->{'uuid'};
177
178 my $keep_logfile = $job_id . '.log.txt';
179 my $local_logfile = File::Temp->new();
180
181 $Job->{'runtime_constraints'} ||= {};
182 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
183 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
184
185
186 Log (undef, "check slurm allocation");
187 my @slot;
188 my @node;
189 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
190 my @sinfo;
191 if (!$have_slurm)
192 {
193   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
194   push @sinfo, "$localcpus localhost";
195 }
196 if (exists $ENV{SLURM_NODELIST})
197 {
198   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
199 }
200 foreach (@sinfo)
201 {
202   my ($ncpus, $slurm_nodelist) = split;
203   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
204
205   my @nodelist;
206   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
207   {
208     my $nodelist = $1;
209     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
210     {
211       my $ranges = $1;
212       foreach (split (",", $ranges))
213       {
214         my ($a, $b);
215         if (/(\d+)-(\d+)/)
216         {
217           $a = $1;
218           $b = $2;
219         }
220         else
221         {
222           $a = $_;
223           $b = $_;
224         }
225         push @nodelist, map {
226           my $n = $nodelist;
227           $n =~ s/\[[-,\d]+\]/$_/;
228           $n;
229         } ($a..$b);
230       }
231     }
232     else
233     {
234       push @nodelist, $nodelist;
235     }
236   }
237   foreach my $nodename (@nodelist)
238   {
239     Log (undef, "node $nodename - $ncpus slots");
240     my $node = { name => $nodename,
241                  ncpus => $ncpus,
242                  losing_streak => 0,
243                  hold_until => 0 };
244     foreach my $cpu (1..$ncpus)
245     {
246       push @slot, { node => $node,
247                     cpu => $cpu };
248     }
249   }
250   push @node, @nodelist;
251 }
252
253
254
255 # Ensure that we get one jobstep running on each allocated node before
256 # we start overloading nodes with concurrent steps
257
258 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
259
260
261
262 my $jobmanager_id;
263 if ($job_has_uuid)
264 {
265   # Claim this job, and make sure nobody else does
266   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
267           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
268     croak("Error while updating / locking job");
269   }
270   $Job->update_attributes('started_at' => scalar gmtime,
271                           'running' => 1,
272                           'success' => undef,
273                           'tasks_summary' => { 'failed' => 0,
274                                                'todo' => 1,
275                                                'running' => 0,
276                                                'done' => 0 });
277 }
278
279
280 Log (undef, "start");
281 $SIG{'INT'} = sub { $main::please_freeze = 1; };
282 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
283 $SIG{'TERM'} = \&croak;
284 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
285 $SIG{'ALRM'} = sub { $main::please_info = 1; };
286 $SIG{'CONT'} = sub { $main::please_continue = 1; };
287 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
288
289 $main::please_freeze = 0;
290 $main::please_info = 0;
291 $main::please_continue = 0;
292 $main::please_refresh = 0;
293 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
294
295 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
296 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
297 $ENV{"JOB_UUID"} = $job_id;
298
299
300 my @jobstep;
301 my @jobstep_todo = ();
302 my @jobstep_done = ();
303 my @jobstep_tomerge = ();
304 my $jobstep_tomerge_level = 0;
305 my $squeue_checked;
306 my $squeue_kill_checked;
307 my $output_in_keep = 0;
308 my $latest_refresh = scalar time;
309
310
311
312 if (defined $Job->{thawedfromkey})
313 {
314   thaw ($Job->{thawedfromkey});
315 }
316 else
317 {
318   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
319     'job_uuid' => $Job->{'uuid'},
320     'sequence' => 0,
321     'qsequence' => 0,
322     'parameters' => {},
323                                                           });
324   push @jobstep, { 'level' => 0,
325                    'failures' => 0,
326                    'arvados_task' => $first_task,
327                  };
328   push @jobstep_todo, 0;
329 }
330
331
332 if (!$have_slurm)
333 {
334   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
335 }
336
337
338 my $build_script;
339
340
341 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
342
343 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
344 if ($skip_install)
345 {
346   if (!defined $no_clear_tmp) {
347     my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
348     system($clear_tmp_cmd) == 0
349         or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
350   }
351   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
352   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
353     if (-d $src_path) {
354       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
355           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
356       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
357           == 0
358           or croak ("setup.py in $src_path failed: exit ".($?>>8));
359     }
360   }
361 }
362 else
363 {
364   do {
365     local $/ = undef;
366     $build_script = <DATA>;
367   };
368   Log (undef, "Install revision ".$Job->{script_version});
369   my $nodelist = join(",", @node);
370
371   if (!defined $no_clear_tmp) {
372     # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
373
374     my $cleanpid = fork();
375     if ($cleanpid == 0)
376     {
377       srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
378             ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
379       exit (1);
380     }
381     while (1)
382     {
383       last if $cleanpid == waitpid (-1, WNOHANG);
384       freeze_if_want_freeze ($cleanpid);
385       select (undef, undef, undef, 0.1);
386     }
387     Log (undef, "Clean-work-dir exited $?");
388   }
389
390   # Install requested code version
391
392   my @execargs;
393   my @srunargs = ("srun",
394                   "--nodelist=$nodelist",
395                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
396
397   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
398   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
399
400   my $commit;
401   my $git_archive;
402   my $treeish = $Job->{'script_version'};
403   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'} || $Job->{'repository'};
404   # Todo: let script_version specify repository instead of expecting
405   # parent process to figure it out.
406   $ENV{"CRUNCH_SRC_URL"} = $repo;
407
408   if (-d "$repo/.git") {
409     # We were given a working directory, but we are only interested in
410     # the index.
411     $repo = "$repo/.git";
412   }
413
414   # If this looks like a subversion r#, look for it in git-svn commit messages
415
416   if ($treeish =~ m{^\d{1,4}$}) {
417     my $gitlog = `git --git-dir="$repo" log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " master`;
418     chomp $gitlog;
419     if ($gitlog =~ /^[a-f0-9]{40}$/) {
420       $commit = $gitlog;
421       Log (undef, "Using commit $commit for script_version $treeish");
422     }
423   }
424
425   # If that didn't work, try asking git to look it up as a tree-ish.
426
427   if (!defined $commit) {
428     my $found = `git --git-dir="$repo" rev-list -1 "$treeish"`;
429     chomp $found;
430     if ($found =~ /^[0-9a-f]{40}$/s) {
431       $commit = $found;
432       if ($commit ne $treeish) {
433         # Make sure we record the real commit id in the database,
434         # frozentokey, logs, etc. -- instead of an abbreviation or a
435         # branch name which can become ambiguous or point to a
436         # different commit in the future.
437         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
438         Log (undef, "Using commit $commit for tree-ish $treeish");
439         if ($commit ne $treeish) {
440           $Job->{'script_version'} = $commit;
441           !$job_has_uuid or
442               $Job->update_attributes('script_version' => $commit) or
443               croak("Error while updating job");
444         }
445       }
446     }
447   }
448
449   if (defined $commit) {
450     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
451     @execargs = ("sh", "-c",
452                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
453     $git_archive = `git --git-dir="$repo" archive "$commit"`;
454   }
455   else {
456     croak ("could not figure out commit id for $treeish");
457   }
458
459   my $installpid = fork();
460   if ($installpid == 0)
461   {
462     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
463     exit (1);
464   }
465   while (1)
466   {
467     last if $installpid == waitpid (-1, WNOHANG);
468     freeze_if_want_freeze ($installpid);
469     select (undef, undef, undef, 0.1);
470   }
471   Log (undef, "Install exited $?");
472 }
473
474 if (!$have_slurm)
475 {
476   # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
477   must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
478 }
479
480
481
482 foreach (qw (script script_version script_parameters runtime_constraints))
483 {
484   Log (undef,
485        "$_ " .
486        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
487 }
488 foreach (split (/\n/, $Job->{knobs}))
489 {
490   Log (undef, "knob " . $_);
491 }
492
493
494
495 $main::success = undef;
496
497
498
499 ONELEVEL:
500
501 my $thisround_succeeded = 0;
502 my $thisround_failed = 0;
503 my $thisround_failed_multiple = 0;
504
505 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
506                        or $a <=> $b } @jobstep_todo;
507 my $level = $jobstep[$jobstep_todo[0]]->{level};
508 Log (undef, "start level $level");
509
510
511
512 my %proc;
513 my @freeslot = (0..$#slot);
514 my @holdslot;
515 my %reader;
516 my $progress_is_dirty = 1;
517 my $progress_stats_updated = 0;
518
519 update_progress_stats();
520
521
522
523 THISROUND:
524 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
525 {
526   my $id = $jobstep_todo[$todo_ptr];
527   my $Jobstep = $jobstep[$id];
528   if ($Jobstep->{level} != $level)
529   {
530     next;
531   }
532
533   pipe $reader{$id}, "writer" or croak ($!);
534   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
535   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
536
537   my $childslot = $freeslot[0];
538   my $childnode = $slot[$childslot]->{node};
539   my $childslotname = join (".",
540                             $slot[$childslot]->{node}->{name},
541                             $slot[$childslot]->{cpu});
542   my $childpid = fork();
543   if ($childpid == 0)
544   {
545     $SIG{'INT'} = 'DEFAULT';
546     $SIG{'QUIT'} = 'DEFAULT';
547     $SIG{'TERM'} = 'DEFAULT';
548
549     foreach (values (%reader))
550     {
551       close($_);
552     }
553     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
554     open(STDOUT,">&writer");
555     open(STDERR,">&writer");
556
557     undef $dbh;
558     undef $sth;
559
560     delete $ENV{"GNUPGHOME"};
561     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
562     $ENV{"TASK_QSEQUENCE"} = $id;
563     $ENV{"TASK_SEQUENCE"} = $level;
564     $ENV{"JOB_SCRIPT"} = $Job->{script};
565     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
566       $param =~ tr/a-z/A-Z/;
567       $ENV{"JOB_PARAMETER_$param"} = $value;
568     }
569     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
570     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
571     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
572     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}.".keep";
573     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
574     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
575     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
576
577     $ENV{"GZIP"} = "-n";
578
579     my @srunargs = (
580       "srun",
581       "--nodelist=".$childnode->{name},
582       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
583       "--job-name=$job_id.$id.$$",
584         );
585     my @execargs = qw(sh);
586     my $build_script_to_send = "";
587     my $command =
588         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
589         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
590         ."&& cd $ENV{CRUNCH_TMP} ";
591     if ($build_script)
592     {
593       $build_script_to_send = $build_script;
594       $command .=
595           "&& perl -";
596     }
597     $command .=
598         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
599     my @execargs = ('bash', '-c', $command);
600     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
601     exit (111);
602   }
603   close("writer");
604   if (!defined $childpid)
605   {
606     close $reader{$id};
607     delete $reader{$id};
608     next;
609   }
610   shift @freeslot;
611   $proc{$childpid} = { jobstep => $id,
612                        time => time,
613                        slot => $childslot,
614                        jobstepname => "$job_id.$id.$childpid",
615                      };
616   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
617   $slot[$childslot]->{pid} = $childpid;
618
619   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
620   Log ($id, "child $childpid started on $childslotname");
621   $Jobstep->{starttime} = time;
622   $Jobstep->{node} = $childnode->{name};
623   $Jobstep->{slotindex} = $childslot;
624   delete $Jobstep->{stderr};
625   delete $Jobstep->{finishtime};
626
627   splice @jobstep_todo, $todo_ptr, 1;
628   --$todo_ptr;
629
630   $progress_is_dirty = 1;
631
632   while (!@freeslot
633          ||
634          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
635   {
636     last THISROUND if $main::please_freeze;
637     if ($main::please_info)
638     {
639       $main::please_info = 0;
640       freeze();
641       collate_output();
642       save_meta(1);
643       update_progress_stats();
644     }
645     my $gotsome
646         = readfrompipes ()
647         + reapchildren ();
648     if (!$gotsome)
649     {
650       check_refresh_wanted();
651       check_squeue();
652       update_progress_stats();
653       select (undef, undef, undef, 0.1);
654     }
655     elsif (time - $progress_stats_updated >= 30)
656     {
657       update_progress_stats();
658     }
659     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
660         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
661     {
662       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
663           .($thisround_failed+$thisround_succeeded)
664           .") -- giving up on this round";
665       Log (undef, $message);
666       last THISROUND;
667     }
668
669     # move slots from freeslot to holdslot (or back to freeslot) if necessary
670     for (my $i=$#freeslot; $i>=0; $i--) {
671       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
672         push @holdslot, (splice @freeslot, $i, 1);
673       }
674     }
675     for (my $i=$#holdslot; $i>=0; $i--) {
676       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
677         push @freeslot, (splice @holdslot, $i, 1);
678       }
679     }
680
681     # give up if no nodes are succeeding
682     if (!grep { $_->{node}->{losing_streak} == 0 &&
683                     $_->{node}->{hold_count} < 4 } @slot) {
684       my $message = "Every node has failed -- giving up on this round";
685       Log (undef, $message);
686       last THISROUND;
687     }
688   }
689 }
690
691
692 push @freeslot, splice @holdslot;
693 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
694
695
696 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
697 while (%proc)
698 {
699   if ($main::please_continue) {
700     $main::please_continue = 0;
701     goto THISROUND;
702   }
703   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
704   readfrompipes ();
705   if (!reapchildren())
706   {
707     check_refresh_wanted();
708     check_squeue();
709     update_progress_stats();
710     select (undef, undef, undef, 0.1);
711     killem (keys %proc) if $main::please_freeze;
712   }
713 }
714
715 update_progress_stats();
716 freeze_if_want_freeze();
717
718
719 if (!defined $main::success)
720 {
721   if (@jobstep_todo &&
722       $thisround_succeeded == 0 &&
723       ($thisround_failed == 0 || $thisround_failed > 4))
724   {
725     my $message = "stop because $thisround_failed tasks failed and none succeeded";
726     Log (undef, $message);
727     $main::success = 0;
728   }
729   if (!@jobstep_todo)
730   {
731     $main::success = 1;
732   }
733 }
734
735 goto ONELEVEL if !defined $main::success;
736
737
738 release_allocation();
739 freeze();
740 if ($job_has_uuid) {
741   $Job->update_attributes('output' => &collate_output(),
742                           'running' => 0,
743                           'success' => $Job->{'output'} && $main::success,
744                           'finished_at' => scalar gmtime)
745 }
746
747 if ($Job->{'output'})
748 {
749   eval {
750     my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
751     $arv->{'collections'}->{'create'}->execute('collection' => {
752       'uuid' => $Job->{'output'},
753       'manifest_text' => $manifest_text,
754     });
755     if ($Job->{'output_is_persistent'}) {
756       $arv->{'links'}->{'create'}->execute('link' => {
757         'tail_kind' => 'arvados#user',
758         'tail_uuid' => $User->{'uuid'},
759         'head_kind' => 'arvados#collection',
760         'head_uuid' => $Job->{'output'},
761         'link_class' => 'resources',
762         'name' => 'wants',
763       });
764     }
765   };
766   if ($@) {
767     Log (undef, "Failed to register output manifest: $@");
768   }
769 }
770
771 Log (undef, "finish");
772
773 save_meta();
774 exit 0;
775
776
777
778 sub update_progress_stats
779 {
780   $progress_stats_updated = time;
781   return if !$progress_is_dirty;
782   my ($todo, $done, $running) = (scalar @jobstep_todo,
783                                  scalar @jobstep_done,
784                                  scalar @slot - scalar @freeslot - scalar @holdslot);
785   $Job->{'tasks_summary'} ||= {};
786   $Job->{'tasks_summary'}->{'todo'} = $todo;
787   $Job->{'tasks_summary'}->{'done'} = $done;
788   $Job->{'tasks_summary'}->{'running'} = $running;
789   if ($job_has_uuid) {
790     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
791   }
792   Log (undef, "status: $done done, $running running, $todo todo");
793   $progress_is_dirty = 0;
794 }
795
796
797
798 sub reapchildren
799 {
800   my $pid = waitpid (-1, WNOHANG);
801   return 0 if $pid <= 0;
802
803   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
804                   . "."
805                   . $slot[$proc{$pid}->{slot}]->{cpu});
806   my $jobstepid = $proc{$pid}->{jobstep};
807   my $elapsed = time - $proc{$pid}->{time};
808   my $Jobstep = $jobstep[$jobstepid];
809
810   my $childstatus = $?;
811   my $exitvalue = $childstatus >> 8;
812   my $exitinfo = sprintf("exit %d signal %d%s",
813                          $exitvalue,
814                          $childstatus & 127,
815                          ($childstatus & 128 ? ' core dump' : ''));
816   $Jobstep->{'arvados_task'}->reload;
817   my $task_success = $Jobstep->{'arvados_task'}->{success};
818
819   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
820
821   if (!defined $task_success) {
822     # task did not indicate one way or the other --> fail
823     $Jobstep->{'arvados_task'}->{success} = 0;
824     $Jobstep->{'arvados_task'}->save;
825     $task_success = 0;
826   }
827
828   if (!$task_success)
829   {
830     my $temporary_fail;
831     $temporary_fail ||= $Jobstep->{node_fail};
832     $temporary_fail ||= ($exitvalue == 111);
833
834     ++$thisround_failed;
835     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
836
837     # Check for signs of a failed or misconfigured node
838     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
839         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
840       # Don't count this against jobstep failure thresholds if this
841       # node is already suspected faulty and srun exited quickly
842       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
843           $elapsed < 5) {
844         Log ($jobstepid, "blaming failure on suspect node " .
845              $slot[$proc{$pid}->{slot}]->{node}->{name});
846         $temporary_fail ||= 1;
847       }
848       ban_node_by_slot($proc{$pid}->{slot});
849     }
850
851     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
852                              ++$Jobstep->{'failures'},
853                              $temporary_fail ? 'temporary ' : 'permanent',
854                              $elapsed));
855
856     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
857       # Give up on this task, and the whole job
858       $main::success = 0;
859       $main::please_freeze = 1;
860     }
861     else {
862       # Put this task back on the todo queue
863       push @jobstep_todo, $jobstepid;
864     }
865     $Job->{'tasks_summary'}->{'failed'}++;
866   }
867   else
868   {
869     ++$thisround_succeeded;
870     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
871     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
872     push @jobstep_done, $jobstepid;
873     Log ($jobstepid, "success in $elapsed seconds");
874   }
875   $Jobstep->{exitcode} = $childstatus;
876   $Jobstep->{finishtime} = time;
877   process_stderr ($jobstepid, $task_success);
878   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
879
880   close $reader{$jobstepid};
881   delete $reader{$jobstepid};
882   delete $slot[$proc{$pid}->{slot}]->{pid};
883   push @freeslot, $proc{$pid}->{slot};
884   delete $proc{$pid};
885
886   # Load new tasks
887   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
888     'where' => {
889       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
890     },
891     'order' => 'qsequence'
892   );
893   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
894     my $jobstep = {
895       'level' => $arvados_task->{'sequence'},
896       'failures' => 0,
897       'arvados_task' => $arvados_task
898     };
899     push @jobstep, $jobstep;
900     push @jobstep_todo, $#jobstep;
901   }
902
903   $progress_is_dirty = 1;
904   1;
905 }
906
907 sub check_refresh_wanted
908 {
909   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
910   if (@stat && $stat[9] > $latest_refresh) {
911     $latest_refresh = scalar time;
912     if ($job_has_uuid) {
913       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
914       for my $attr ('cancelled_at',
915                     'cancelled_by_user_uuid',
916                     'cancelled_by_client_uuid') {
917         $Job->{$attr} = $Job2->{$attr};
918       }
919       if ($Job->{'cancelled_at'}) {
920         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
921              " by user " . $Job->{cancelled_by_user_uuid});
922         $main::success = 0;
923         $main::please_freeze = 1;
924       }
925     }
926   }
927 }
928
929 sub check_squeue
930 {
931   # return if the kill list was checked <4 seconds ago
932   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
933   {
934     return;
935   }
936   $squeue_kill_checked = time;
937
938   # use killem() on procs whose killtime is reached
939   for (keys %proc)
940   {
941     if (exists $proc{$_}->{killtime}
942         && $proc{$_}->{killtime} <= time)
943     {
944       killem ($_);
945     }
946   }
947
948   # return if the squeue was checked <60 seconds ago
949   if (defined $squeue_checked && $squeue_checked > time - 60)
950   {
951     return;
952   }
953   $squeue_checked = time;
954
955   if (!$have_slurm)
956   {
957     # here is an opportunity to check for mysterious problems with local procs
958     return;
959   }
960
961   # get a list of steps still running
962   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
963   chop @squeue;
964   if ($squeue[-1] ne "ok")
965   {
966     return;
967   }
968   pop @squeue;
969
970   # which of my jobsteps are running, according to squeue?
971   my %ok;
972   foreach (@squeue)
973   {
974     if (/^(\d+)\.(\d+) (\S+)/)
975     {
976       if ($1 eq $ENV{SLURM_JOBID})
977       {
978         $ok{$3} = 1;
979       }
980     }
981   }
982
983   # which of my active child procs (>60s old) were not mentioned by squeue?
984   foreach (keys %proc)
985   {
986     if ($proc{$_}->{time} < time - 60
987         && !exists $ok{$proc{$_}->{jobstepname}}
988         && !exists $proc{$_}->{killtime})
989     {
990       # kill this proc if it hasn't exited in 30 seconds
991       $proc{$_}->{killtime} = time + 30;
992     }
993   }
994 }
995
996
997 sub release_allocation
998 {
999   if ($have_slurm)
1000   {
1001     Log (undef, "release job allocation");
1002     system "scancel $ENV{SLURM_JOBID}";
1003   }
1004 }
1005
1006
1007 sub readfrompipes
1008 {
1009   my $gotsome = 0;
1010   foreach my $job (keys %reader)
1011   {
1012     my $buf;
1013     while (0 < sysread ($reader{$job}, $buf, 8192))
1014     {
1015       print STDERR $buf if $ENV{CRUNCH_DEBUG};
1016       $jobstep[$job]->{stderr} .= $buf;
1017       preprocess_stderr ($job);
1018       if (length ($jobstep[$job]->{stderr}) > 16384)
1019       {
1020         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
1021       }
1022       $gotsome = 1;
1023     }
1024   }
1025   return $gotsome;
1026 }
1027
1028
1029 sub preprocess_stderr
1030 {
1031   my $job = shift;
1032
1033   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1034     my $line = $1;
1035     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1036     Log ($job, "stderr $line");
1037     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1038       # whoa.
1039       $main::please_freeze = 1;
1040     }
1041     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1042       $jobstep[$job]->{node_fail} = 1;
1043       ban_node_by_slot($jobstep[$job]->{slotindex});
1044     }
1045   }
1046 }
1047
1048
1049 sub process_stderr
1050 {
1051   my $job = shift;
1052   my $task_success = shift;
1053   preprocess_stderr ($job);
1054
1055   map {
1056     Log ($job, "stderr $_");
1057   } split ("\n", $jobstep[$job]->{stderr});
1058 }
1059
1060 sub fetch_block
1061 {
1062   my $hash = shift;
1063   my ($keep, $child_out, $output_block);
1064
1065   my $cmd = "arv keep get \Q$hash\E";
1066   open($keep, '-|', $cmd) or die "fetch_block: $cmd: $!";
1067   sysread($keep, $output_block, 64 * 1024 * 1024);
1068   close $keep;
1069   return $output_block;
1070 }
1071
1072 sub collate_output
1073 {
1074   Log (undef, "collate");
1075
1076   my ($child_out, $child_in);
1077   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1078   my $joboutput;
1079   for (@jobstep)
1080   {
1081     next if (!exists $_->{'arvados_task'}->{output} ||
1082              !$_->{'arvados_task'}->{'success'} ||
1083              $_->{'exitcode'} != 0);
1084     my $output = $_->{'arvados_task'}->{output};
1085     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1086     {
1087       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1088       print $child_in $output;
1089     }
1090     elsif (@jobstep == 1)
1091     {
1092       $joboutput = $output;
1093       last;
1094     }
1095     elsif (defined (my $outblock = fetch_block ($output)))
1096     {
1097       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1098       print $child_in $outblock;
1099     }
1100     else
1101     {
1102       Log (undef, "XXX fetch_block($output) failed XXX");
1103       $main::success = 0;
1104     }
1105   }
1106   $child_in->close;
1107
1108   if (!defined $joboutput) {
1109     my $s = IO::Select->new($child_out);
1110     if ($s->can_read(120)) {
1111       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1112       chomp($joboutput);
1113     } else {
1114       Log (undef, "timed out reading from 'arv keep put'");
1115     }
1116   }
1117   waitpid($pid, 0);
1118
1119   if ($joboutput)
1120   {
1121     Log (undef, "output $joboutput");
1122     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1123   }
1124   else
1125   {
1126     Log (undef, "output undef");
1127   }
1128   return $joboutput;
1129 }
1130
1131
1132 sub killem
1133 {
1134   foreach (@_)
1135   {
1136     my $sig = 2;                # SIGINT first
1137     if (exists $proc{$_}->{"sent_$sig"} &&
1138         time - $proc{$_}->{"sent_$sig"} > 4)
1139     {
1140       $sig = 15;                # SIGTERM if SIGINT doesn't work
1141     }
1142     if (exists $proc{$_}->{"sent_$sig"} &&
1143         time - $proc{$_}->{"sent_$sig"} > 4)
1144     {
1145       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1146     }
1147     if (!exists $proc{$_}->{"sent_$sig"})
1148     {
1149       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1150       kill $sig, $_;
1151       select (undef, undef, undef, 0.1);
1152       if ($sig == 2)
1153       {
1154         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1155       }
1156       $proc{$_}->{"sent_$sig"} = time;
1157       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1158     }
1159   }
1160 }
1161
1162
1163 sub fhbits
1164 {
1165   my($bits);
1166   for (@_) {
1167     vec($bits,fileno($_),1) = 1;
1168   }
1169   $bits;
1170 }
1171
1172
1173 sub Log                         # ($jobstep_id, $logmessage)
1174 {
1175   if ($_[1] =~ /\n/) {
1176     for my $line (split (/\n/, $_[1])) {
1177       Log ($_[0], $line);
1178     }
1179     return;
1180   }
1181   my $fh = select STDERR; $|=1; select $fh;
1182   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1183   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1184   $message .= "\n";
1185   my $datetime;
1186   if ($metastream || -t STDERR) {
1187     my @gmtime = gmtime;
1188     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1189                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1190   }
1191   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1192
1193   if ($metastream) {
1194     print $metastream $datetime . " " . $message;
1195   }
1196 }
1197
1198
1199 sub croak
1200 {
1201   my ($package, $file, $line) = caller;
1202   my $message = "@_ at $file line $line\n";
1203   Log (undef, $message);
1204   freeze() if @jobstep_todo;
1205   collate_output() if @jobstep_todo;
1206   cleanup();
1207   save_meta() if $metastream;
1208   die;
1209 }
1210
1211
1212 sub cleanup
1213 {
1214   return if !$job_has_uuid;
1215   $Job->update_attributes('running' => 0,
1216                           'success' => 0,
1217                           'finished_at' => scalar gmtime);
1218 }
1219
1220
1221 sub save_meta
1222 {
1223   my $justcheckpoint = shift; # false if this will be the last meta saved
1224   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1225
1226   $local_logfile->flush;
1227   my $cmd = "arv keep put --filename \Q$keep_logfile\E "
1228       . quotemeta($local_logfile->filename);
1229   my $loglocator = `$cmd`;
1230   die "system $cmd failed: $?" if $?;
1231
1232   $local_logfile = undef;   # the temp file is automatically deleted
1233   Log (undef, "log manifest is $loglocator");
1234   $Job->{'log'} = $loglocator;
1235   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1236 }
1237
1238
1239 sub freeze_if_want_freeze
1240 {
1241   if ($main::please_freeze)
1242   {
1243     release_allocation();
1244     if (@_)
1245     {
1246       # kill some srun procs before freeze+stop
1247       map { $proc{$_} = {} } @_;
1248       while (%proc)
1249       {
1250         killem (keys %proc);
1251         select (undef, undef, undef, 0.1);
1252         my $died;
1253         while (($died = waitpid (-1, WNOHANG)) > 0)
1254         {
1255           delete $proc{$died};
1256         }
1257       }
1258     }
1259     freeze();
1260     collate_output();
1261     cleanup();
1262     save_meta();
1263     exit 0;
1264   }
1265 }
1266
1267
1268 sub freeze
1269 {
1270   Log (undef, "Freeze not implemented");
1271   return;
1272 }
1273
1274
1275 sub thaw
1276 {
1277   croak ("Thaw not implemented");
1278 }
1279
1280
1281 sub freezequote
1282 {
1283   my $s = shift;
1284   $s =~ s/\\/\\\\/g;
1285   $s =~ s/\n/\\n/g;
1286   return $s;
1287 }
1288
1289
1290 sub freezeunquote
1291 {
1292   my $s = shift;
1293   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1294   return $s;
1295 }
1296
1297
1298 sub srun
1299 {
1300   my $srunargs = shift;
1301   my $execargs = shift;
1302   my $opts = shift || {};
1303   my $stdin = shift;
1304   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1305   print STDERR (join (" ",
1306                       map { / / ? "'$_'" : $_ }
1307                       (@$args)),
1308                 "\n")
1309       if $ENV{CRUNCH_DEBUG};
1310
1311   if (defined $stdin) {
1312     my $child = open STDIN, "-|";
1313     defined $child or die "no fork: $!";
1314     if ($child == 0) {
1315       print $stdin or die $!;
1316       close STDOUT or die $!;
1317       exit 0;
1318     }
1319   }
1320
1321   return system (@$args) if $opts->{fork};
1322
1323   exec @$args;
1324   warn "ENV size is ".length(join(" ",%ENV));
1325   die "exec failed: $!: @$args";
1326 }
1327
1328
1329 sub ban_node_by_slot {
1330   # Don't start any new jobsteps on this node for 60 seconds
1331   my $slotid = shift;
1332   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1333   $slot[$slotid]->{node}->{hold_count}++;
1334   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1335 }
1336
1337 sub must_lock_now
1338 {
1339   my ($lockfile, $error_message) = @_;
1340   open L, ">", $lockfile or croak("$lockfile: $!");
1341   if (!flock L, LOCK_EX|LOCK_NB) {
1342     croak("Can't lock $lockfile: $error_message\n");
1343   }
1344 }
1345
1346 __DATA__
1347 #!/usr/bin/perl
1348
1349 # checkout-and-build
1350
1351 use Fcntl ':flock';
1352
1353 my $destdir = $ENV{"CRUNCH_SRC"};
1354 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1355 my $repo = $ENV{"CRUNCH_SRC_URL"};
1356
1357 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1358 flock L, LOCK_EX;
1359 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1360     exit 0;
1361 }
1362
1363 unlink "$destdir.commit";
1364 open STDOUT, ">", "$destdir.log";
1365 open STDERR, ">&STDOUT";
1366
1367 mkdir $destdir;
1368 my @git_archive_data = <DATA>;
1369 if (@git_archive_data) {
1370   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1371   print TARX @git_archive_data;
1372   if(!close(TARX)) {
1373     die "'tar -C $destdir -xf -' exited $?: $!";
1374   }
1375 }
1376
1377 my $pwd;
1378 chomp ($pwd = `pwd`);
1379 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1380 mkdir $install_dir;
1381
1382 for my $src_path ("$destdir/arvados/sdk/python") {
1383   if (-d $src_path) {
1384     shell_or_die ("virtualenv", $install_dir);
1385     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1386   }
1387 }
1388
1389 if (-e "$destdir/crunch_scripts/install") {
1390     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1391 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1392     # Old version
1393     shell_or_die ("./tests/autotests.sh", $install_dir);
1394 } elsif (-e "./install.sh") {
1395     shell_or_die ("./install.sh", $install_dir);
1396 }
1397
1398 if ($commit) {
1399     unlink "$destdir.commit.new";
1400     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1401     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1402 }
1403
1404 close L;
1405
1406 exit 0;
1407
1408 sub shell_or_die
1409 {
1410   if ($ENV{"DEBUG"}) {
1411     print STDERR "@_\n";
1412   }
1413   system (@_) == 0
1414       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1415 }
1416
1417 __DATA__