Incorporate code review comments (refs #2221, fixes #2325)
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use IPC::Open2;
75 use IO::Select;
76 use File::Temp;
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
90
91 my $force_unlock;
92 my $git_dir;
93 my $jobspec;
94 my $job_api_token;
95 my $resume_stash;
96 GetOptions('force-unlock' => \$force_unlock,
97            'git-dir=s' => \$git_dir,
98            'job=s' => \$jobspec,
99            'job-api-token=s' => \$job_api_token,
100            'resume-stash=s' => \$resume_stash,
101     );
102
103 if (defined $job_api_token) {
104   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 }
106
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
110
111
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new('apiVersion' => 'v1');
124 my $metastream;
125
126 my $User = $arv->{'users'}->{'current'}->execute;
127
128 my $Job = {};
129 my $job_id;
130 my $dbh;
131 my $sth;
132 if ($job_has_uuid)
133 {
134   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135   if (!$force_unlock) {
136     if ($Job->{'is_locked_by_uuid'}) {
137       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138     }
139     if ($Job->{'success'} ne undef) {
140       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141     }
142     if ($Job->{'running'}) {
143       croak("Job 'running' flag is already set");
144     }
145     if ($Job->{'started_at'}) {
146       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
147     }
148   }
149 }
150 else
151 {
152   $Job = JSON::decode_json($jobspec);
153
154   if (!$resume_stash)
155   {
156     map { croak ("No $_ specified") unless $Job->{$_} }
157     qw(script script_version script_parameters);
158   }
159
160   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161   $Job->{'started_at'} = gmtime;
162
163   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164
165   $job_has_uuid = 1;
166 }
167 $job_id = $Job->{'uuid'};
168
169 my $keep_logfile = $job_id . '.log.txt';
170 my $local_logfile = File::Temp->new();
171
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
175
176
177 Log (undef, "check slurm allocation");
178 my @slot;
179 my @node;
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
181 my @sinfo;
182 if (!$have_slurm)
183 {
184   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185   push @sinfo, "$localcpus localhost";
186 }
187 if (exists $ENV{SLURM_NODELIST})
188 {
189   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
190 }
191 foreach (@sinfo)
192 {
193   my ($ncpus, $slurm_nodelist) = split;
194   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
195
196   my @nodelist;
197   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
198   {
199     my $nodelist = $1;
200     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
201     {
202       my $ranges = $1;
203       foreach (split (",", $ranges))
204       {
205         my ($a, $b);
206         if (/(\d+)-(\d+)/)
207         {
208           $a = $1;
209           $b = $2;
210         }
211         else
212         {
213           $a = $_;
214           $b = $_;
215         }
216         push @nodelist, map {
217           my $n = $nodelist;
218           $n =~ s/\[[-,\d]+\]/$_/;
219           $n;
220         } ($a..$b);
221       }
222     }
223     else
224     {
225       push @nodelist, $nodelist;
226     }
227   }
228   foreach my $nodename (@nodelist)
229   {
230     Log (undef, "node $nodename - $ncpus slots");
231     my $node = { name => $nodename,
232                  ncpus => $ncpus,
233                  losing_streak => 0,
234                  hold_until => 0 };
235     foreach my $cpu (1..$ncpus)
236     {
237       push @slot, { node => $node,
238                     cpu => $cpu };
239     }
240   }
241   push @node, @nodelist;
242 }
243
244
245
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
248
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250
251
252
253 my $jobmanager_id;
254 if ($job_has_uuid)
255 {
256   # Claim this job, and make sure nobody else does
257   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261   $Job->update_attributes('started_at' => scalar gmtime,
262                           'running' => 1,
263                           'success' => undef,
264                           'tasks_summary' => { 'failed' => 0,
265                                                'todo' => 1,
266                                                'running' => 0,
267                                                'done' => 0 });
268 }
269
270
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
279
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
285
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
289
290
291 my @jobstep;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
296 my $squeue_checked;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299 my $latest_refresh = scalar time;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'failures' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
333     if (-d $src_path) {
334       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
335           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
336       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
337           == 0
338           or croak ("setup.py in $src_path failed: exit ".($?>>8));
339     }
340   }
341 }
342 else
343 {
344   do {
345     local $/ = undef;
346     $build_script = <DATA>;
347   };
348   Log (undef, "Install revision ".$Job->{script_version});
349   my $nodelist = join(",", @node);
350
351   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
352
353   my $cleanpid = fork();
354   if ($cleanpid == 0)
355   {
356     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
357           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
358     exit (1);
359   }
360   while (1)
361   {
362     last if $cleanpid == waitpid (-1, WNOHANG);
363     freeze_if_want_freeze ($cleanpid);
364     select (undef, undef, undef, 0.1);
365   }
366   Log (undef, "Clean-work-dir exited $?");
367
368   # Install requested code version
369
370   my @execargs;
371   my @srunargs = ("srun",
372                   "--nodelist=$nodelist",
373                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
374
375   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
376   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
377
378   my $commit;
379   my $git_archive;
380   my $treeish = $Job->{'script_version'};
381   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
382   # Todo: let script_version specify repository instead of expecting
383   # parent process to figure it out.
384   $ENV{"CRUNCH_SRC_URL"} = $repo;
385
386   # Create/update our clone of the remote git repo
387
388   if (!-d $ENV{"CRUNCH_SRC"}) {
389     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
390         or croak ("git clone $repo failed: exit ".($?>>8));
391     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
392   }
393   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
394
395   # If this looks like a subversion r#, look for it in git-svn commit messages
396
397   if ($treeish =~ m{^\d{1,4}$}) {
398     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
399     chomp $gitlog;
400     if ($gitlog =~ /^[a-f0-9]{40}$/) {
401       $commit = $gitlog;
402       Log (undef, "Using commit $commit for script_version $treeish");
403     }
404   }
405
406   # If that didn't work, try asking git to look it up as a tree-ish.
407
408   if (!defined $commit) {
409
410     my $cooked_treeish = $treeish;
411     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
412       # Looks like a git branch name -- make sure git knows it's
413       # relative to the remote repo
414       $cooked_treeish = "origin/$treeish";
415     }
416
417     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
418     chomp $found;
419     if ($found =~ /^[0-9a-f]{40}$/s) {
420       $commit = $found;
421       if ($commit ne $treeish) {
422         # Make sure we record the real commit id in the database,
423         # frozentokey, logs, etc. -- instead of an abbreviation or a
424         # branch name which can become ambiguous or point to a
425         # different commit in the future.
426         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
427         Log (undef, "Using commit $commit for tree-ish $treeish");
428         if ($commit ne $treeish) {
429           $Job->{'script_version'} = $commit;
430           !$job_has_uuid or
431               $Job->update_attributes('script_version' => $commit) or
432               croak("Error while updating job");
433         }
434       }
435     }
436   }
437
438   if (defined $commit) {
439     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
440     @execargs = ("sh", "-c",
441                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
442     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
443   }
444   else {
445     croak ("could not figure out commit id for $treeish");
446   }
447
448   my $installpid = fork();
449   if ($installpid == 0)
450   {
451     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
452     exit (1);
453   }
454   while (1)
455   {
456     last if $installpid == waitpid (-1, WNOHANG);
457     freeze_if_want_freeze ($installpid);
458     select (undef, undef, undef, 0.1);
459   }
460   Log (undef, "Install exited $?");
461 }
462
463
464
465 foreach (qw (script script_version script_parameters runtime_constraints))
466 {
467   Log (undef,
468        "$_ " .
469        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
470 }
471 foreach (split (/\n/, $Job->{knobs}))
472 {
473   Log (undef, "knob " . $_);
474 }
475
476
477
478 $main::success = undef;
479
480
481
482 ONELEVEL:
483
484 my $thisround_succeeded = 0;
485 my $thisround_failed = 0;
486 my $thisround_failed_multiple = 0;
487
488 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
489                        or $a <=> $b } @jobstep_todo;
490 my $level = $jobstep[$jobstep_todo[0]]->{level};
491 Log (undef, "start level $level");
492
493
494
495 my %proc;
496 my @freeslot = (0..$#slot);
497 my @holdslot;
498 my %reader;
499 my $progress_is_dirty = 1;
500 my $progress_stats_updated = 0;
501
502 update_progress_stats();
503
504
505
506 THISROUND:
507 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
508 {
509   my $id = $jobstep_todo[$todo_ptr];
510   my $Jobstep = $jobstep[$id];
511   if ($Jobstep->{level} != $level)
512   {
513     next;
514   }
515
516   pipe $reader{$id}, "writer" or croak ($!);
517   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
518   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
519
520   my $childslot = $freeslot[0];
521   my $childnode = $slot[$childslot]->{node};
522   my $childslotname = join (".",
523                             $slot[$childslot]->{node}->{name},
524                             $slot[$childslot]->{cpu});
525   my $childpid = fork();
526   if ($childpid == 0)
527   {
528     $SIG{'INT'} = 'DEFAULT';
529     $SIG{'QUIT'} = 'DEFAULT';
530     $SIG{'TERM'} = 'DEFAULT';
531
532     foreach (values (%reader))
533     {
534       close($_);
535     }
536     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
537     open(STDOUT,">&writer");
538     open(STDERR,">&writer");
539
540     undef $dbh;
541     undef $sth;
542
543     delete $ENV{"GNUPGHOME"};
544     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
545     $ENV{"TASK_QSEQUENCE"} = $id;
546     $ENV{"TASK_SEQUENCE"} = $level;
547     $ENV{"JOB_SCRIPT"} = $Job->{script};
548     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
549       $param =~ tr/a-z/A-Z/;
550       $ENV{"JOB_PARAMETER_$param"} = $value;
551     }
552     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
553     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
554     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
555     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
556     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
557     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
558     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
559
560     $ENV{"GZIP"} = "-n";
561
562     my @srunargs = (
563       "srun",
564       "--nodelist=".$childnode->{name},
565       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
566       "--job-name=$job_id.$id.$$",
567         );
568     my @execargs = qw(sh);
569     my $build_script_to_send = "";
570     my $command =
571         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
572         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
573         ."&& cd $ENV{CRUNCH_TMP} ";
574     if ($build_script)
575     {
576       $build_script_to_send = $build_script;
577       $command .=
578           "&& perl -";
579     }
580     $command .=
581         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
582     my @execargs = ('bash', '-c', $command);
583     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
584     exit (111);
585   }
586   close("writer");
587   if (!defined $childpid)
588   {
589     close $reader{$id};
590     delete $reader{$id};
591     next;
592   }
593   shift @freeslot;
594   $proc{$childpid} = { jobstep => $id,
595                        time => time,
596                        slot => $childslot,
597                        jobstepname => "$job_id.$id.$childpid",
598                      };
599   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
600   $slot[$childslot]->{pid} = $childpid;
601
602   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
603   Log ($id, "child $childpid started on $childslotname");
604   $Jobstep->{starttime} = time;
605   $Jobstep->{node} = $childnode->{name};
606   $Jobstep->{slotindex} = $childslot;
607   delete $Jobstep->{stderr};
608   delete $Jobstep->{finishtime};
609
610   splice @jobstep_todo, $todo_ptr, 1;
611   --$todo_ptr;
612
613   $progress_is_dirty = 1;
614
615   while (!@freeslot
616          ||
617          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
618   {
619     last THISROUND if $main::please_freeze;
620     if ($main::please_info)
621     {
622       $main::please_info = 0;
623       freeze();
624       collate_output();
625       save_meta(1);
626       update_progress_stats();
627     }
628     my $gotsome
629         = readfrompipes ()
630         + reapchildren ();
631     if (!$gotsome)
632     {
633       check_refresh_wanted();
634       check_squeue();
635       update_progress_stats();
636       select (undef, undef, undef, 0.1);
637     }
638     elsif (time - $progress_stats_updated >= 30)
639     {
640       update_progress_stats();
641     }
642     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
643         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
644     {
645       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
646           .($thisround_failed+$thisround_succeeded)
647           .") -- giving up on this round";
648       Log (undef, $message);
649       last THISROUND;
650     }
651
652     # move slots from freeslot to holdslot (or back to freeslot) if necessary
653     for (my $i=$#freeslot; $i>=0; $i--) {
654       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
655         push @holdslot, (splice @freeslot, $i, 1);
656       }
657     }
658     for (my $i=$#holdslot; $i>=0; $i--) {
659       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
660         push @freeslot, (splice @holdslot, $i, 1);
661       }
662     }
663
664     # give up if no nodes are succeeding
665     if (!grep { $_->{node}->{losing_streak} == 0 &&
666                     $_->{node}->{hold_count} < 4 } @slot) {
667       my $message = "Every node has failed -- giving up on this round";
668       Log (undef, $message);
669       last THISROUND;
670     }
671   }
672 }
673
674
675 push @freeslot, splice @holdslot;
676 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677
678
679 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 while (%proc)
681 {
682   if ($main::please_continue) {
683     $main::please_continue = 0;
684     goto THISROUND;
685   }
686   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
687   readfrompipes ();
688   if (!reapchildren())
689   {
690     check_refresh_wanted();
691     check_squeue();
692     update_progress_stats();
693     select (undef, undef, undef, 0.1);
694     killem (keys %proc) if $main::please_freeze;
695   }
696 }
697
698 update_progress_stats();
699 freeze_if_want_freeze();
700
701
702 if (!defined $main::success)
703 {
704   if (@jobstep_todo &&
705       $thisround_succeeded == 0 &&
706       ($thisround_failed == 0 || $thisround_failed > 4))
707   {
708     my $message = "stop because $thisround_failed tasks failed and none succeeded";
709     Log (undef, $message);
710     $main::success = 0;
711   }
712   if (!@jobstep_todo)
713   {
714     $main::success = 1;
715   }
716 }
717
718 goto ONELEVEL if !defined $main::success;
719
720
721 release_allocation();
722 freeze();
723 if ($job_has_uuid) {
724   $Job->update_attributes('output' => &collate_output(),
725                           'running' => 0,
726                           'success' => $Job->{'output'} && $main::success,
727                           'finished_at' => scalar gmtime)
728 }
729
730 if ($Job->{'output'})
731 {
732   eval {
733     my $manifest_text = `arv keep get $Job->{'output'}`;
734     $arv->{'collections'}->{'create'}->execute('collection' => {
735       'uuid' => $Job->{'output'},
736       'manifest_text' => $manifest_text,
737     });
738   };
739   if ($@) {
740     Log (undef, "Failed to register output manifest: $@");
741   }
742 }
743
744 Log (undef, "finish");
745
746 save_meta();
747 exit 0;
748
749
750
751 sub update_progress_stats
752 {
753   $progress_stats_updated = time;
754   return if !$progress_is_dirty;
755   my ($todo, $done, $running) = (scalar @jobstep_todo,
756                                  scalar @jobstep_done,
757                                  scalar @slot - scalar @freeslot - scalar @holdslot);
758   $Job->{'tasks_summary'} ||= {};
759   $Job->{'tasks_summary'}->{'todo'} = $todo;
760   $Job->{'tasks_summary'}->{'done'} = $done;
761   $Job->{'tasks_summary'}->{'running'} = $running;
762   if ($job_has_uuid) {
763     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
764   }
765   Log (undef, "status: $done done, $running running, $todo todo");
766   $progress_is_dirty = 0;
767 }
768
769
770
771 sub reapchildren
772 {
773   my $pid = waitpid (-1, WNOHANG);
774   return 0 if $pid <= 0;
775
776   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
777                   . "."
778                   . $slot[$proc{$pid}->{slot}]->{cpu});
779   my $jobstepid = $proc{$pid}->{jobstep};
780   my $elapsed = time - $proc{$pid}->{time};
781   my $Jobstep = $jobstep[$jobstepid];
782
783   my $childstatus = $?;
784   my $exitvalue = $childstatus >> 8;
785   my $exitinfo = sprintf("exit %d signal %d%s",
786                          $exitvalue,
787                          $childstatus & 127,
788                          ($childstatus & 128 ? ' core dump' : ''));
789   $Jobstep->{'arvados_task'}->reload;
790   my $task_success = $Jobstep->{'arvados_task'}->{success};
791
792   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
793
794   if (!defined $task_success) {
795     # task did not indicate one way or the other --> fail
796     $Jobstep->{'arvados_task'}->{success} = 0;
797     $Jobstep->{'arvados_task'}->save;
798     $task_success = 0;
799   }
800
801   if (!$task_success)
802   {
803     my $temporary_fail;
804     $temporary_fail ||= $Jobstep->{node_fail};
805     $temporary_fail ||= ($exitvalue == 111);
806
807     ++$thisround_failed;
808     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
809
810     # Check for signs of a failed or misconfigured node
811     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
812         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
813       # Don't count this against jobstep failure thresholds if this
814       # node is already suspected faulty and srun exited quickly
815       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
816           $elapsed < 5) {
817         Log ($jobstepid, "blaming failure on suspect node " .
818              $slot[$proc{$pid}->{slot}]->{node}->{name});
819         $temporary_fail ||= 1;
820       }
821       ban_node_by_slot($proc{$pid}->{slot});
822     }
823
824     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
825                              ++$Jobstep->{'failures'},
826                              $temporary_fail ? 'temporary ' : 'permanent',
827                              $elapsed));
828
829     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
830       # Give up on this task, and the whole job
831       $main::success = 0;
832       $main::please_freeze = 1;
833     }
834     else {
835       # Put this task back on the todo queue
836       push @jobstep_todo, $jobstepid;
837     }
838     $Job->{'tasks_summary'}->{'failed'}++;
839   }
840   else
841   {
842     ++$thisround_succeeded;
843     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
844     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
845     push @jobstep_done, $jobstepid;
846     Log ($jobstepid, "success in $elapsed seconds");
847   }
848   $Jobstep->{exitcode} = $childstatus;
849   $Jobstep->{finishtime} = time;
850   process_stderr ($jobstepid, $task_success);
851   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
852
853   close $reader{$jobstepid};
854   delete $reader{$jobstepid};
855   delete $slot[$proc{$pid}->{slot}]->{pid};
856   push @freeslot, $proc{$pid}->{slot};
857   delete $proc{$pid};
858
859   # Load new tasks
860   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
861     'where' => {
862       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
863     },
864     'order' => 'qsequence'
865   );
866   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
867     my $jobstep = {
868       'level' => $arvados_task->{'sequence'},
869       'failures' => 0,
870       'arvados_task' => $arvados_task
871     };
872     push @jobstep, $jobstep;
873     push @jobstep_todo, $#jobstep;
874   }
875
876   $progress_is_dirty = 1;
877   1;
878 }
879
880 sub check_refresh_wanted
881 {
882   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
883   if (@stat && $stat[9] > $latest_refresh) {
884     $latest_refresh = scalar time;
885     if ($job_has_uuid) {
886       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
887       for my $attr ('cancelled_at',
888                     'cancelled_by_user_uuid',
889                     'cancelled_by_client_uuid') {
890         $Job->{$attr} = $Job2->{$attr};
891       }
892       if ($Job->{'cancelled_at'}) {
893         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
894              " by user " . $Job->{cancelled_by_user_uuid});
895         $main::success = 0;
896         $main::please_freeze = 1;
897       }
898     }
899   }
900 }
901
902 sub check_squeue
903 {
904   # return if the kill list was checked <4 seconds ago
905   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
906   {
907     return;
908   }
909   $squeue_kill_checked = time;
910
911   # use killem() on procs whose killtime is reached
912   for (keys %proc)
913   {
914     if (exists $proc{$_}->{killtime}
915         && $proc{$_}->{killtime} <= time)
916     {
917       killem ($_);
918     }
919   }
920
921   # return if the squeue was checked <60 seconds ago
922   if (defined $squeue_checked && $squeue_checked > time - 60)
923   {
924     return;
925   }
926   $squeue_checked = time;
927
928   if (!$have_slurm)
929   {
930     # here is an opportunity to check for mysterious problems with local procs
931     return;
932   }
933
934   # get a list of steps still running
935   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
936   chop @squeue;
937   if ($squeue[-1] ne "ok")
938   {
939     return;
940   }
941   pop @squeue;
942
943   # which of my jobsteps are running, according to squeue?
944   my %ok;
945   foreach (@squeue)
946   {
947     if (/^(\d+)\.(\d+) (\S+)/)
948     {
949       if ($1 eq $ENV{SLURM_JOBID})
950       {
951         $ok{$3} = 1;
952       }
953     }
954   }
955
956   # which of my active child procs (>60s old) were not mentioned by squeue?
957   foreach (keys %proc)
958   {
959     if ($proc{$_}->{time} < time - 60
960         && !exists $ok{$proc{$_}->{jobstepname}}
961         && !exists $proc{$_}->{killtime})
962     {
963       # kill this proc if it hasn't exited in 30 seconds
964       $proc{$_}->{killtime} = time + 30;
965     }
966   }
967 }
968
969
970 sub release_allocation
971 {
972   if ($have_slurm)
973   {
974     Log (undef, "release job allocation");
975     system "scancel $ENV{SLURM_JOBID}";
976   }
977 }
978
979
980 sub readfrompipes
981 {
982   my $gotsome = 0;
983   foreach my $job (keys %reader)
984   {
985     my $buf;
986     while (0 < sysread ($reader{$job}, $buf, 8192))
987     {
988       print STDERR $buf if $ENV{CRUNCH_DEBUG};
989       $jobstep[$job]->{stderr} .= $buf;
990       preprocess_stderr ($job);
991       if (length ($jobstep[$job]->{stderr}) > 16384)
992       {
993         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
994       }
995       $gotsome = 1;
996     }
997   }
998   return $gotsome;
999 }
1000
1001
1002 sub preprocess_stderr
1003 {
1004   my $job = shift;
1005
1006   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1007     my $line = $1;
1008     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1009     Log ($job, "stderr $line");
1010     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1011       # whoa.
1012       $main::please_freeze = 1;
1013     }
1014     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1015       $jobstep[$job]->{node_fail} = 1;
1016       ban_node_by_slot($jobstep[$job]->{slotindex});
1017     }
1018   }
1019 }
1020
1021
1022 sub process_stderr
1023 {
1024   my $job = shift;
1025   my $task_success = shift;
1026   preprocess_stderr ($job);
1027
1028   map {
1029     Log ($job, "stderr $_");
1030   } split ("\n", $jobstep[$job]->{stderr});
1031 }
1032
1033 sub fetch_block
1034 {
1035   my $hash = shift;
1036   my ($child_out, $child_in, $output_block);
1037
1038   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
1039   sysread($child_out, $output_block, 64 * 1024 * 1024);
1040   waitpid($pid, 0);
1041   return $output_block;
1042 }
1043
1044 sub collate_output
1045 {
1046   Log (undef, "collate");
1047
1048   my ($child_out, $child_in);
1049   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1050   my $joboutput;
1051   for (@jobstep)
1052   {
1053     next if (!exists $_->{'arvados_task'}->{output} ||
1054              !$_->{'arvados_task'}->{'success'} ||
1055              $_->{'exitcode'} != 0);
1056     my $output = $_->{'arvados_task'}->{output};
1057     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1058     {
1059       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1060       print $child_in $output;
1061     }
1062     elsif (@jobstep == 1)
1063     {
1064       $joboutput = $output;
1065       last;
1066     }
1067     elsif (defined (my $outblock = fetch_block ($output)))
1068     {
1069       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1070       print $child_in $outblock;
1071     }
1072     else
1073     {
1074       Log (undef, "XXX fetch_block($output) failed XXX");
1075       $main::success = 0;
1076     }
1077   }
1078   $child_in->close;
1079
1080   if (!defined $joboutput) {
1081     my $s = IO::Select->new($child_out);
1082     if ($s->can_read(120)) {
1083       sysread($child_out, $joboutput, 64 * 1024 * 1024);
1084     } else {
1085       Log (undef, "timed out reading from 'arv keep put'");
1086     }
1087   }
1088   waitpid($pid, 0);
1089
1090   if ($joboutput)
1091   {
1092     Log (undef, "output $joboutput");
1093     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1094   }
1095   else
1096   {
1097     Log (undef, "output undef");
1098   }
1099   return $joboutput;
1100 }
1101
1102
1103 sub killem
1104 {
1105   foreach (@_)
1106   {
1107     my $sig = 2;                # SIGINT first
1108     if (exists $proc{$_}->{"sent_$sig"} &&
1109         time - $proc{$_}->{"sent_$sig"} > 4)
1110     {
1111       $sig = 15;                # SIGTERM if SIGINT doesn't work
1112     }
1113     if (exists $proc{$_}->{"sent_$sig"} &&
1114         time - $proc{$_}->{"sent_$sig"} > 4)
1115     {
1116       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1117     }
1118     if (!exists $proc{$_}->{"sent_$sig"})
1119     {
1120       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1121       kill $sig, $_;
1122       select (undef, undef, undef, 0.1);
1123       if ($sig == 2)
1124       {
1125         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1126       }
1127       $proc{$_}->{"sent_$sig"} = time;
1128       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1129     }
1130   }
1131 }
1132
1133
1134 sub fhbits
1135 {
1136   my($bits);
1137   for (@_) {
1138     vec($bits,fileno($_),1) = 1;
1139   }
1140   $bits;
1141 }
1142
1143
1144 sub Log                         # ($jobstep_id, $logmessage)
1145 {
1146   if ($_[1] =~ /\n/) {
1147     for my $line (split (/\n/, $_[1])) {
1148       Log ($_[0], $line);
1149     }
1150     return;
1151   }
1152   my $fh = select STDERR; $|=1; select $fh;
1153   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1154   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1155   $message .= "\n";
1156   my $datetime;
1157   if ($metastream || -t STDERR) {
1158     my @gmtime = gmtime;
1159     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1160                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1161   }
1162   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1163
1164   if ($metastream) {
1165     print $metastream $datetime . " " . $message;
1166   }
1167 }
1168
1169
1170 sub croak
1171 {
1172   my ($package, $file, $line) = caller;
1173   my $message = "@_ at $file line $line\n";
1174   Log (undef, $message);
1175   freeze() if @jobstep_todo;
1176   collate_output() if @jobstep_todo;
1177   cleanup();
1178   save_meta() if $metastream;
1179   die;
1180 }
1181
1182
1183 sub cleanup
1184 {
1185   return if !$job_has_uuid;
1186   $Job->update_attributes('running' => 0,
1187                           'success' => 0,
1188                           'finished_at' => scalar gmtime);
1189 }
1190
1191
1192 sub save_meta
1193 {
1194   my $justcheckpoint = shift; # false if this will be the last meta saved
1195   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1196
1197   $local_logfile->flush;
1198   my $cmd = "arv keep put --filename $keep_logfile ". $local_logfile->filename;
1199   my $loglocator = `$cmd`;
1200   die "system $cmd failed: $?" if $?;
1201
1202   $local_logfile = undef;   # the temp file is automatically deleted
1203   Log (undef, "log manifest is $loglocator");
1204   $Job->{'log'} = $loglocator;
1205   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1206 }
1207
1208
1209 sub freeze_if_want_freeze
1210 {
1211   if ($main::please_freeze)
1212   {
1213     release_allocation();
1214     if (@_)
1215     {
1216       # kill some srun procs before freeze+stop
1217       map { $proc{$_} = {} } @_;
1218       while (%proc)
1219       {
1220         killem (keys %proc);
1221         select (undef, undef, undef, 0.1);
1222         my $died;
1223         while (($died = waitpid (-1, WNOHANG)) > 0)
1224         {
1225           delete $proc{$died};
1226         }
1227       }
1228     }
1229     freeze();
1230     collate_output();
1231     cleanup();
1232     save_meta();
1233     exit 0;
1234   }
1235 }
1236
1237
1238 sub freeze
1239 {
1240   Log (undef, "Freeze not implemented");
1241   return;
1242 }
1243
1244
1245 sub thaw
1246 {
1247   croak ("Thaw not implemented");
1248 }
1249
1250
1251 sub freezequote
1252 {
1253   my $s = shift;
1254   $s =~ s/\\/\\\\/g;
1255   $s =~ s/\n/\\n/g;
1256   return $s;
1257 }
1258
1259
1260 sub freezeunquote
1261 {
1262   my $s = shift;
1263   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1264   return $s;
1265 }
1266
1267
1268 sub srun
1269 {
1270   my $srunargs = shift;
1271   my $execargs = shift;
1272   my $opts = shift || {};
1273   my $stdin = shift;
1274   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1275   print STDERR (join (" ",
1276                       map { / / ? "'$_'" : $_ }
1277                       (@$args)),
1278                 "\n")
1279       if $ENV{CRUNCH_DEBUG};
1280
1281   if (defined $stdin) {
1282     my $child = open STDIN, "-|";
1283     defined $child or die "no fork: $!";
1284     if ($child == 0) {
1285       print $stdin or die $!;
1286       close STDOUT or die $!;
1287       exit 0;
1288     }
1289   }
1290
1291   return system (@$args) if $opts->{fork};
1292
1293   exec @$args;
1294   warn "ENV size is ".length(join(" ",%ENV));
1295   die "exec failed: $!: @$args";
1296 }
1297
1298
1299 sub ban_node_by_slot {
1300   # Don't start any new jobsteps on this node for 60 seconds
1301   my $slotid = shift;
1302   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1303   $slot[$slotid]->{node}->{hold_count}++;
1304   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1305 }
1306
1307 __DATA__
1308 #!/usr/bin/perl
1309
1310 # checkout-and-build
1311
1312 use Fcntl ':flock';
1313
1314 my $destdir = $ENV{"CRUNCH_SRC"};
1315 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1316 my $repo = $ENV{"CRUNCH_SRC_URL"};
1317
1318 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1319 flock L, LOCK_EX;
1320 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1321     exit 0;
1322 }
1323
1324 unlink "$destdir.commit";
1325 open STDOUT, ">", "$destdir.log";
1326 open STDERR, ">&STDOUT";
1327
1328 mkdir $destdir;
1329 my @git_archive_data = <DATA>;
1330 if (@git_archive_data) {
1331   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1332   print TARX @git_archive_data;
1333   if(!close(TARX)) {
1334     die "'tar -C $destdir -xf -' exited $?: $!";
1335   }
1336 }
1337
1338 my $pwd;
1339 chomp ($pwd = `pwd`);
1340 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1341 mkdir $install_dir;
1342
1343 for my $src_path ("$destdir/arvados/sdk/python") {
1344   if (-d $src_path) {
1345     shell_or_die ("virtualenv", $install_dir);
1346     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1347   }
1348 }
1349
1350 if (-e "$destdir/crunch_scripts/install") {
1351     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1352 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1353     # Old version
1354     shell_or_die ("./tests/autotests.sh", $install_dir);
1355 } elsif (-e "./install.sh") {
1356     shell_or_die ("./install.sh", $install_dir);
1357 }
1358
1359 if ($commit) {
1360     unlink "$destdir.commit.new";
1361     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1362     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1363 }
1364
1365 close L;
1366
1367 exit 0;
1368
1369 sub shell_or_die
1370 {
1371   if ($ENV{"DEBUG"}) {
1372     print STDERR "@_\n";
1373   }
1374   system (@_) == 0
1375       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1376 }
1377
1378 __DATA__