Merge branch 'master' into 2257-inequality-conditions
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use Warehouse;
75 use Warehouse::Stream;
76 use IPC::System::Simple qw(capturex);
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
90
91 my $force_unlock;
92 my $git_dir;
93 my $jobspec;
94 my $job_api_token;
95 my $resume_stash;
96 GetOptions('force-unlock' => \$force_unlock,
97            'git-dir=s' => \$git_dir,
98            'job=s' => \$jobspec,
99            'job-api-token=s' => \$job_api_token,
100            'resume-stash=s' => \$resume_stash,
101     );
102
103 if (defined $job_api_token) {
104   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 }
106
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
110
111
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new('apiVersion' => 'v1');
124 my $metastream;
125
126 my $User = $arv->{'users'}->{'current'}->execute;
127
128 my $Job = {};
129 my $job_id;
130 my $dbh;
131 my $sth;
132 if ($job_has_uuid)
133 {
134   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135   if (!$force_unlock) {
136     if ($Job->{'is_locked_by_uuid'}) {
137       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138     }
139     if ($Job->{'success'} ne undef) {
140       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141     }
142     if ($Job->{'running'}) {
143       croak("Job 'running' flag is already set");
144     }
145     if ($Job->{'started_at'}) {
146       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
147     }
148   }
149 }
150 else
151 {
152   $Job = JSON::decode_json($jobspec);
153
154   if (!$resume_stash)
155   {
156     map { croak ("No $_ specified") unless $Job->{$_} }
157     qw(script script_version script_parameters);
158   }
159
160   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161   $Job->{'started_at'} = gmtime;
162
163   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164
165   $job_has_uuid = 1;
166 }
167 $job_id = $Job->{'uuid'};
168
169 $metastream = Warehouse::Stream->new(whc => new Warehouse);
170 $metastream->clear;
171 $metastream->name('.');
172 $metastream->write_start($job_id . '.log.txt');
173
174
175 $Job->{'runtime_constraints'} ||= {};
176 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
177 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
178
179
180 Log (undef, "check slurm allocation");
181 my @slot;
182 my @node;
183 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
184 my @sinfo;
185 if (!$have_slurm)
186 {
187   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
188   push @sinfo, "$localcpus localhost";
189 }
190 if (exists $ENV{SLURM_NODELIST})
191 {
192   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
193 }
194 foreach (@sinfo)
195 {
196   my ($ncpus, $slurm_nodelist) = split;
197   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
198
199   my @nodelist;
200   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
201   {
202     my $nodelist = $1;
203     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
204     {
205       my $ranges = $1;
206       foreach (split (",", $ranges))
207       {
208         my ($a, $b);
209         if (/(\d+)-(\d+)/)
210         {
211           $a = $1;
212           $b = $2;
213         }
214         else
215         {
216           $a = $_;
217           $b = $_;
218         }
219         push @nodelist, map {
220           my $n = $nodelist;
221           $n =~ s/\[[-,\d]+\]/$_/;
222           $n;
223         } ($a..$b);
224       }
225     }
226     else
227     {
228       push @nodelist, $nodelist;
229     }
230   }
231   foreach my $nodename (@nodelist)
232   {
233     Log (undef, "node $nodename - $ncpus slots");
234     my $node = { name => $nodename,
235                  ncpus => $ncpus,
236                  losing_streak => 0,
237                  hold_until => 0 };
238     foreach my $cpu (1..$ncpus)
239     {
240       push @slot, { node => $node,
241                     cpu => $cpu };
242     }
243   }
244   push @node, @nodelist;
245 }
246
247
248
249 # Ensure that we get one jobstep running on each allocated node before
250 # we start overloading nodes with concurrent steps
251
252 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
253
254
255
256 my $jobmanager_id;
257 if ($job_has_uuid)
258 {
259   # Claim this job, and make sure nobody else does
260   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
261           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
262     croak("Error while updating / locking job");
263   }
264   $Job->update_attributes('started_at' => scalar gmtime,
265                           'running' => 1,
266                           'success' => undef,
267                           'tasks_summary' => { 'failed' => 0,
268                                                'todo' => 1,
269                                                'running' => 0,
270                                                'done' => 0 });
271 }
272
273
274 Log (undef, "start");
275 $SIG{'INT'} = sub { $main::please_freeze = 1; };
276 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
277 $SIG{'TERM'} = \&croak;
278 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
279 $SIG{'ALRM'} = sub { $main::please_info = 1; };
280 $SIG{'CONT'} = sub { $main::please_continue = 1; };
281 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
282
283 $main::please_freeze = 0;
284 $main::please_info = 0;
285 $main::please_continue = 0;
286 $main::please_refresh = 0;
287 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
288
289 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
290 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
291 $ENV{"JOB_UUID"} = $job_id;
292
293
294 my @jobstep;
295 my @jobstep_todo = ();
296 my @jobstep_done = ();
297 my @jobstep_tomerge = ();
298 my $jobstep_tomerge_level = 0;
299 my $squeue_checked;
300 my $squeue_kill_checked;
301 my $output_in_keep = 0;
302 my $latest_refresh = scalar time;
303
304
305
306 if (defined $Job->{thawedfromkey})
307 {
308   thaw ($Job->{thawedfromkey});
309 }
310 else
311 {
312   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
313     'job_uuid' => $Job->{'uuid'},
314     'sequence' => 0,
315     'qsequence' => 0,
316     'parameters' => {},
317                                                           });
318   push @jobstep, { 'level' => 0,
319                    'failures' => 0,
320                    'arvados_task' => $first_task,
321                  };
322   push @jobstep_todo, 0;
323 }
324
325
326 my $build_script;
327
328
329 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
330
331 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
332 if ($skip_install)
333 {
334   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
335   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
336     if (-d $src_path) {
337       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
338           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
339       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
340           == 0
341           or croak ("setup.py in $src_path failed: exit ".($?>>8));
342     }
343   }
344 }
345 else
346 {
347   do {
348     local $/ = undef;
349     $build_script = <DATA>;
350   };
351   Log (undef, "Install revision ".$Job->{script_version});
352   my $nodelist = join(",", @node);
353
354   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
355
356   my $cleanpid = fork();
357   if ($cleanpid == 0)
358   {
359     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
360           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
361     exit (1);
362   }
363   while (1)
364   {
365     last if $cleanpid == waitpid (-1, WNOHANG);
366     freeze_if_want_freeze ($cleanpid);
367     select (undef, undef, undef, 0.1);
368   }
369   Log (undef, "Clean-work-dir exited $?");
370
371   # Install requested code version
372
373   my @execargs;
374   my @srunargs = ("srun",
375                   "--nodelist=$nodelist",
376                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
377
378   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
379   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
380
381   my $commit;
382   my $git_archive;
383   my $treeish = $Job->{'script_version'};
384   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
385   # Todo: let script_version specify repository instead of expecting
386   # parent process to figure it out.
387   $ENV{"CRUNCH_SRC_URL"} = $repo;
388
389   # Create/update our clone of the remote git repo
390
391   if (!-d $ENV{"CRUNCH_SRC"}) {
392     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
393         or croak ("git clone $repo failed: exit ".($?>>8));
394     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
395   }
396   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
397
398   # If this looks like a subversion r#, look for it in git-svn commit messages
399
400   if ($treeish =~ m{^\d{1,4}$}) {
401     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
402     chomp $gitlog;
403     if ($gitlog =~ /^[a-f0-9]{40}$/) {
404       $commit = $gitlog;
405       Log (undef, "Using commit $commit for script_version $treeish");
406     }
407   }
408
409   # If that didn't work, try asking git to look it up as a tree-ish.
410
411   if (!defined $commit) {
412
413     my $cooked_treeish = $treeish;
414     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
415       # Looks like a git branch name -- make sure git knows it's
416       # relative to the remote repo
417       $cooked_treeish = "origin/$treeish";
418     }
419
420     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
421     chomp $found;
422     if ($found =~ /^[0-9a-f]{40}$/s) {
423       $commit = $found;
424       if ($commit ne $treeish) {
425         # Make sure we record the real commit id in the database,
426         # frozentokey, logs, etc. -- instead of an abbreviation or a
427         # branch name which can become ambiguous or point to a
428         # different commit in the future.
429         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
430         Log (undef, "Using commit $commit for tree-ish $treeish");
431         if ($commit ne $treeish) {
432           $Job->{'script_version'} = $commit;
433           !$job_has_uuid or
434               $Job->update_attributes('script_version' => $commit) or
435               croak("Error while updating job");
436         }
437       }
438     }
439   }
440
441   if (defined $commit) {
442     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
443     @execargs = ("sh", "-c",
444                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
445     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
446   }
447   else {
448     croak ("could not figure out commit id for $treeish");
449   }
450
451   my $installpid = fork();
452   if ($installpid == 0)
453   {
454     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
455     exit (1);
456   }
457   while (1)
458   {
459     last if $installpid == waitpid (-1, WNOHANG);
460     freeze_if_want_freeze ($installpid);
461     select (undef, undef, undef, 0.1);
462   }
463   Log (undef, "Install exited $?");
464 }
465
466
467
468 foreach (qw (script script_version script_parameters runtime_constraints))
469 {
470   Log (undef,
471        "$_ " .
472        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
473 }
474 foreach (split (/\n/, $Job->{knobs}))
475 {
476   Log (undef, "knob " . $_);
477 }
478
479
480
481 $main::success = undef;
482
483
484
485 ONELEVEL:
486
487 my $thisround_succeeded = 0;
488 my $thisround_failed = 0;
489 my $thisround_failed_multiple = 0;
490
491 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
492                        or $a <=> $b } @jobstep_todo;
493 my $level = $jobstep[$jobstep_todo[0]]->{level};
494 Log (undef, "start level $level");
495
496
497
498 my %proc;
499 my @freeslot = (0..$#slot);
500 my @holdslot;
501 my %reader;
502 my $progress_is_dirty = 1;
503 my $progress_stats_updated = 0;
504
505 update_progress_stats();
506
507
508
509 THISROUND:
510 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
511 {
512   my $id = $jobstep_todo[$todo_ptr];
513   my $Jobstep = $jobstep[$id];
514   if ($Jobstep->{level} != $level)
515   {
516     next;
517   }
518
519   pipe $reader{$id}, "writer" or croak ($!);
520   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
521   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
522
523   my $childslot = $freeslot[0];
524   my $childnode = $slot[$childslot]->{node};
525   my $childslotname = join (".",
526                             $slot[$childslot]->{node}->{name},
527                             $slot[$childslot]->{cpu});
528   my $childpid = fork();
529   if ($childpid == 0)
530   {
531     $SIG{'INT'} = 'DEFAULT';
532     $SIG{'QUIT'} = 'DEFAULT';
533     $SIG{'TERM'} = 'DEFAULT';
534
535     foreach (values (%reader))
536     {
537       close($_);
538     }
539     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
540     open(STDOUT,">&writer");
541     open(STDERR,">&writer");
542
543     undef $dbh;
544     undef $sth;
545
546     delete $ENV{"GNUPGHOME"};
547     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
548     $ENV{"TASK_QSEQUENCE"} = $id;
549     $ENV{"TASK_SEQUENCE"} = $level;
550     $ENV{"JOB_SCRIPT"} = $Job->{script};
551     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
552       $param =~ tr/a-z/A-Z/;
553       $ENV{"JOB_PARAMETER_$param"} = $value;
554     }
555     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
556     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
557     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
558     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
559     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
560     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
561     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
562
563     $ENV{"GZIP"} = "-n";
564
565     my @srunargs = (
566       "srun",
567       "--nodelist=".$childnode->{name},
568       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
569       "--job-name=$job_id.$id.$$",
570         );
571     my @execargs = qw(sh);
572     my $build_script_to_send = "";
573     my $command =
574         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
575         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
576         ."&& cd $ENV{CRUNCH_TMP} ";
577     if ($build_script)
578     {
579       $build_script_to_send = $build_script;
580       $command .=
581           "&& perl -";
582     }
583     $command .=
584         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
585     my @execargs = ('bash', '-c', $command);
586     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
587     exit (111);
588   }
589   close("writer");
590   if (!defined $childpid)
591   {
592     close $reader{$id};
593     delete $reader{$id};
594     next;
595   }
596   shift @freeslot;
597   $proc{$childpid} = { jobstep => $id,
598                        time => time,
599                        slot => $childslot,
600                        jobstepname => "$job_id.$id.$childpid",
601                      };
602   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
603   $slot[$childslot]->{pid} = $childpid;
604
605   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
606   Log ($id, "child $childpid started on $childslotname");
607   $Jobstep->{starttime} = time;
608   $Jobstep->{node} = $childnode->{name};
609   $Jobstep->{slotindex} = $childslot;
610   delete $Jobstep->{stderr};
611   delete $Jobstep->{finishtime};
612
613   splice @jobstep_todo, $todo_ptr, 1;
614   --$todo_ptr;
615
616   $progress_is_dirty = 1;
617
618   while (!@freeslot
619          ||
620          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
621   {
622     last THISROUND if $main::please_freeze;
623     if ($main::please_info)
624     {
625       $main::please_info = 0;
626       freeze();
627       collate_output();
628       save_meta(1);
629       update_progress_stats();
630     }
631     my $gotsome
632         = readfrompipes ()
633         + reapchildren ();
634     if (!$gotsome)
635     {
636       check_refresh_wanted();
637       check_squeue();
638       update_progress_stats();
639       select (undef, undef, undef, 0.1);
640     }
641     elsif (time - $progress_stats_updated >= 30)
642     {
643       update_progress_stats();
644     }
645     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
646         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
647     {
648       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
649           .($thisround_failed+$thisround_succeeded)
650           .") -- giving up on this round";
651       Log (undef, $message);
652       last THISROUND;
653     }
654
655     # move slots from freeslot to holdslot (or back to freeslot) if necessary
656     for (my $i=$#freeslot; $i>=0; $i--) {
657       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
658         push @holdslot, (splice @freeslot, $i, 1);
659       }
660     }
661     for (my $i=$#holdslot; $i>=0; $i--) {
662       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
663         push @freeslot, (splice @holdslot, $i, 1);
664       }
665     }
666
667     # give up if no nodes are succeeding
668     if (!grep { $_->{node}->{losing_streak} == 0 &&
669                     $_->{node}->{hold_count} < 4 } @slot) {
670       my $message = "Every node has failed -- giving up on this round";
671       Log (undef, $message);
672       last THISROUND;
673     }
674   }
675 }
676
677
678 push @freeslot, splice @holdslot;
679 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
680
681
682 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
683 while (%proc)
684 {
685   if ($main::please_continue) {
686     $main::please_continue = 0;
687     goto THISROUND;
688   }
689   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
690   readfrompipes ();
691   if (!reapchildren())
692   {
693     check_refresh_wanted();
694     check_squeue();
695     update_progress_stats();
696     select (undef, undef, undef, 0.1);
697     killem (keys %proc) if $main::please_freeze;
698   }
699 }
700
701 update_progress_stats();
702 freeze_if_want_freeze();
703
704
705 if (!defined $main::success)
706 {
707   if (@jobstep_todo &&
708       $thisround_succeeded == 0 &&
709       ($thisround_failed == 0 || $thisround_failed > 4))
710   {
711     my $message = "stop because $thisround_failed tasks failed and none succeeded";
712     Log (undef, $message);
713     $main::success = 0;
714   }
715   if (!@jobstep_todo)
716   {
717     $main::success = 1;
718   }
719 }
720
721 goto ONELEVEL if !defined $main::success;
722
723
724 release_allocation();
725 freeze();
726 if ($job_has_uuid) {
727   $Job->update_attributes('output' => &collate_output(),
728                           'running' => 0,
729                           'success' => $Job->{'output'} && $main::success,
730                           'finished_at' => scalar gmtime)
731 }
732
733 if ($Job->{'output'})
734 {
735   eval {
736     my $manifest_text = capturex("whget", $Job->{'output'});
737     $arv->{'collections'}->{'create'}->execute('collection' => {
738       'uuid' => $Job->{'output'},
739       'manifest_text' => $manifest_text,
740     });
741   };
742   if ($@) {
743     Log (undef, "Failed to register output manifest: $@");
744   }
745 }
746
747 Log (undef, "finish");
748
749 save_meta();
750 exit 0;
751
752
753
754 sub update_progress_stats
755 {
756   $progress_stats_updated = time;
757   return if !$progress_is_dirty;
758   my ($todo, $done, $running) = (scalar @jobstep_todo,
759                                  scalar @jobstep_done,
760                                  scalar @slot - scalar @freeslot - scalar @holdslot);
761   $Job->{'tasks_summary'} ||= {};
762   $Job->{'tasks_summary'}->{'todo'} = $todo;
763   $Job->{'tasks_summary'}->{'done'} = $done;
764   $Job->{'tasks_summary'}->{'running'} = $running;
765   if ($job_has_uuid) {
766     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
767   }
768   Log (undef, "status: $done done, $running running, $todo todo");
769   $progress_is_dirty = 0;
770 }
771
772
773
774 sub reapchildren
775 {
776   my $pid = waitpid (-1, WNOHANG);
777   return 0 if $pid <= 0;
778
779   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
780                   . "."
781                   . $slot[$proc{$pid}->{slot}]->{cpu});
782   my $jobstepid = $proc{$pid}->{jobstep};
783   my $elapsed = time - $proc{$pid}->{time};
784   my $Jobstep = $jobstep[$jobstepid];
785
786   my $childstatus = $?;
787   my $exitvalue = $childstatus >> 8;
788   my $exitinfo = sprintf("exit %d signal %d%s",
789                          $exitvalue,
790                          $childstatus & 127,
791                          ($childstatus & 128 ? ' core dump' : ''));
792   $Jobstep->{'arvados_task'}->reload;
793   my $task_success = $Jobstep->{'arvados_task'}->{success};
794
795   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
796
797   if (!defined $task_success) {
798     # task did not indicate one way or the other --> fail
799     $Jobstep->{'arvados_task'}->{success} = 0;
800     $Jobstep->{'arvados_task'}->save;
801     $task_success = 0;
802   }
803
804   if (!$task_success)
805   {
806     my $temporary_fail;
807     $temporary_fail ||= $Jobstep->{node_fail};
808     $temporary_fail ||= ($exitvalue == 111);
809
810     ++$thisround_failed;
811     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
812
813     # Check for signs of a failed or misconfigured node
814     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
815         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
816       # Don't count this against jobstep failure thresholds if this
817       # node is already suspected faulty and srun exited quickly
818       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
819           $elapsed < 5) {
820         Log ($jobstepid, "blaming failure on suspect node " .
821              $slot[$proc{$pid}->{slot}]->{node}->{name});
822         $temporary_fail ||= 1;
823       }
824       ban_node_by_slot($proc{$pid}->{slot});
825     }
826
827     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
828                              ++$Jobstep->{'failures'},
829                              $temporary_fail ? 'temporary ' : 'permanent',
830                              $elapsed));
831
832     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
833       # Give up on this task, and the whole job
834       $main::success = 0;
835       $main::please_freeze = 1;
836     }
837     else {
838       # Put this task back on the todo queue
839       push @jobstep_todo, $jobstepid;
840     }
841     $Job->{'tasks_summary'}->{'failed'}++;
842   }
843   else
844   {
845     ++$thisround_succeeded;
846     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
847     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
848     push @jobstep_done, $jobstepid;
849     Log ($jobstepid, "success in $elapsed seconds");
850   }
851   $Jobstep->{exitcode} = $childstatus;
852   $Jobstep->{finishtime} = time;
853   process_stderr ($jobstepid, $task_success);
854   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
855
856   close $reader{$jobstepid};
857   delete $reader{$jobstepid};
858   delete $slot[$proc{$pid}->{slot}]->{pid};
859   push @freeslot, $proc{$pid}->{slot};
860   delete $proc{$pid};
861
862   # Load new tasks
863   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
864     'where' => {
865       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
866     },
867     'order' => 'qsequence'
868   );
869   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
870     my $jobstep = {
871       'level' => $arvados_task->{'sequence'},
872       'failures' => 0,
873       'arvados_task' => $arvados_task
874     };
875     push @jobstep, $jobstep;
876     push @jobstep_todo, $#jobstep;
877   }
878
879   $progress_is_dirty = 1;
880   1;
881 }
882
883 sub check_refresh_wanted
884 {
885   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
886   if (@stat && $stat[9] > $latest_refresh) {
887     $latest_refresh = scalar time;
888     if ($job_has_uuid) {
889       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
890       for my $attr ('cancelled_at',
891                     'cancelled_by_user_uuid',
892                     'cancelled_by_client_uuid') {
893         $Job->{$attr} = $Job2->{$attr};
894       }
895       if ($Job->{'cancelled_at'}) {
896         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
897              " by user " . $Job->{cancelled_by_user_uuid});
898         $main::success = 0;
899         $main::please_freeze = 1;
900       }
901     }
902   }
903 }
904
905 sub check_squeue
906 {
907   # return if the kill list was checked <4 seconds ago
908   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
909   {
910     return;
911   }
912   $squeue_kill_checked = time;
913
914   # use killem() on procs whose killtime is reached
915   for (keys %proc)
916   {
917     if (exists $proc{$_}->{killtime}
918         && $proc{$_}->{killtime} <= time)
919     {
920       killem ($_);
921     }
922   }
923
924   # return if the squeue was checked <60 seconds ago
925   if (defined $squeue_checked && $squeue_checked > time - 60)
926   {
927     return;
928   }
929   $squeue_checked = time;
930
931   if (!$have_slurm)
932   {
933     # here is an opportunity to check for mysterious problems with local procs
934     return;
935   }
936
937   # get a list of steps still running
938   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
939   chop @squeue;
940   if ($squeue[-1] ne "ok")
941   {
942     return;
943   }
944   pop @squeue;
945
946   # which of my jobsteps are running, according to squeue?
947   my %ok;
948   foreach (@squeue)
949   {
950     if (/^(\d+)\.(\d+) (\S+)/)
951     {
952       if ($1 eq $ENV{SLURM_JOBID})
953       {
954         $ok{$3} = 1;
955       }
956     }
957   }
958
959   # which of my active child procs (>60s old) were not mentioned by squeue?
960   foreach (keys %proc)
961   {
962     if ($proc{$_}->{time} < time - 60
963         && !exists $ok{$proc{$_}->{jobstepname}}
964         && !exists $proc{$_}->{killtime})
965     {
966       # kill this proc if it hasn't exited in 30 seconds
967       $proc{$_}->{killtime} = time + 30;
968     }
969   }
970 }
971
972
973 sub release_allocation
974 {
975   if ($have_slurm)
976   {
977     Log (undef, "release job allocation");
978     system "scancel $ENV{SLURM_JOBID}";
979   }
980 }
981
982
983 sub readfrompipes
984 {
985   my $gotsome = 0;
986   foreach my $job (keys %reader)
987   {
988     my $buf;
989     while (0 < sysread ($reader{$job}, $buf, 8192))
990     {
991       print STDERR $buf if $ENV{CRUNCH_DEBUG};
992       $jobstep[$job]->{stderr} .= $buf;
993       preprocess_stderr ($job);
994       if (length ($jobstep[$job]->{stderr}) > 16384)
995       {
996         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
997       }
998       $gotsome = 1;
999     }
1000   }
1001   return $gotsome;
1002 }
1003
1004
1005 sub preprocess_stderr
1006 {
1007   my $job = shift;
1008
1009   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1010     my $line = $1;
1011     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1012     Log ($job, "stderr $line");
1013     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1014       # whoa.
1015       $main::please_freeze = 1;
1016     }
1017     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1018       $jobstep[$job]->{node_fail} = 1;
1019       ban_node_by_slot($jobstep[$job]->{slotindex});
1020     }
1021   }
1022 }
1023
1024
1025 sub process_stderr
1026 {
1027   my $job = shift;
1028   my $task_success = shift;
1029   preprocess_stderr ($job);
1030
1031   map {
1032     Log ($job, "stderr $_");
1033   } split ("\n", $jobstep[$job]->{stderr});
1034 }
1035
1036
1037 sub collate_output
1038 {
1039   my $whc = Warehouse->new;
1040   Log (undef, "collate");
1041   $whc->write_start (1);
1042   my $joboutput;
1043   for (@jobstep)
1044   {
1045     next if (!exists $_->{'arvados_task'}->{output} ||
1046              !$_->{'arvados_task'}->{'success'} ||
1047              $_->{'exitcode'} != 0);
1048     my $output = $_->{'arvados_task'}->{output};
1049     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1050     {
1051       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1052       $whc->write_data ($output);
1053     }
1054     elsif (@jobstep == 1)
1055     {
1056       $joboutput = $output;
1057       $whc->write_finish;
1058     }
1059     elsif (defined (my $outblock = $whc->fetch_block ($output)))
1060     {
1061       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1062       $whc->write_data ($outblock);
1063     }
1064     else
1065     {
1066       my $errstr = $whc->errstr;
1067       $whc->write_data ("XXX fetch_block($output) failed: $errstr XXX\n");
1068       $main::success = 0;
1069     }
1070   }
1071   $joboutput = $whc->write_finish if !defined $joboutput;
1072   if ($joboutput)
1073   {
1074     Log (undef, "output $joboutput");
1075     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1076   }
1077   else
1078   {
1079     Log (undef, "output undef");
1080   }
1081   return $joboutput;
1082 }
1083
1084
1085 sub killem
1086 {
1087   foreach (@_)
1088   {
1089     my $sig = 2;                # SIGINT first
1090     if (exists $proc{$_}->{"sent_$sig"} &&
1091         time - $proc{$_}->{"sent_$sig"} > 4)
1092     {
1093       $sig = 15;                # SIGTERM if SIGINT doesn't work
1094     }
1095     if (exists $proc{$_}->{"sent_$sig"} &&
1096         time - $proc{$_}->{"sent_$sig"} > 4)
1097     {
1098       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1099     }
1100     if (!exists $proc{$_}->{"sent_$sig"})
1101     {
1102       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1103       kill $sig, $_;
1104       select (undef, undef, undef, 0.1);
1105       if ($sig == 2)
1106       {
1107         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1108       }
1109       $proc{$_}->{"sent_$sig"} = time;
1110       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1111     }
1112   }
1113 }
1114
1115
1116 sub fhbits
1117 {
1118   my($bits);
1119   for (@_) {
1120     vec($bits,fileno($_),1) = 1;
1121   }
1122   $bits;
1123 }
1124
1125
1126 sub Log                         # ($jobstep_id, $logmessage)
1127 {
1128   if ($_[1] =~ /\n/) {
1129     for my $line (split (/\n/, $_[1])) {
1130       Log ($_[0], $line);
1131     }
1132     return;
1133   }
1134   my $fh = select STDERR; $|=1; select $fh;
1135   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1136   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1137   $message .= "\n";
1138   my $datetime;
1139   if ($metastream || -t STDERR) {
1140     my @gmtime = gmtime;
1141     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1142                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1143   }
1144   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1145
1146   return if !$metastream;
1147   $metastream->write_data ($datetime . " " . $message);
1148 }
1149
1150
1151 sub croak
1152 {
1153   my ($package, $file, $line) = caller;
1154   my $message = "@_ at $file line $line\n";
1155   Log (undef, $message);
1156   freeze() if @jobstep_todo;
1157   collate_output() if @jobstep_todo;
1158   cleanup();
1159   save_meta() if $metastream;
1160   die;
1161 }
1162
1163
1164 sub cleanup
1165 {
1166   return if !$job_has_uuid;
1167   $Job->update_attributes('running' => 0,
1168                           'success' => 0,
1169                           'finished_at' => scalar gmtime);
1170 }
1171
1172
1173 sub save_meta
1174 {
1175   my $justcheckpoint = shift; # false if this will be the last meta saved
1176   my $m = $metastream;
1177   $m = $m->copy if $justcheckpoint;
1178   $m->write_finish;
1179   my $whc = Warehouse->new;
1180   my $loglocator = $whc->store_block ($m->as_string);
1181   $arv->{'collections'}->{'create'}->execute('collection' => {
1182     'uuid' => $loglocator,
1183     'manifest_text' => $m->as_string,
1184   });
1185   undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it
1186   Log (undef, "log manifest is $loglocator");
1187   $Job->{'log'} = $loglocator;
1188   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1189 }
1190
1191
1192 sub freeze_if_want_freeze
1193 {
1194   if ($main::please_freeze)
1195   {
1196     release_allocation();
1197     if (@_)
1198     {
1199       # kill some srun procs before freeze+stop
1200       map { $proc{$_} = {} } @_;
1201       while (%proc)
1202       {
1203         killem (keys %proc);
1204         select (undef, undef, undef, 0.1);
1205         my $died;
1206         while (($died = waitpid (-1, WNOHANG)) > 0)
1207         {
1208           delete $proc{$died};
1209         }
1210       }
1211     }
1212     freeze();
1213     collate_output();
1214     cleanup();
1215     save_meta();
1216     exit 0;
1217   }
1218 }
1219
1220
1221 sub freeze
1222 {
1223   Log (undef, "Freeze not implemented");
1224   return;
1225 }
1226
1227
1228 sub thaw
1229 {
1230   croak ("Thaw not implemented");
1231
1232   my $whc;
1233   my $key = shift;
1234   Log (undef, "thaw from $key");
1235
1236   @jobstep = ();
1237   @jobstep_done = ();
1238   @jobstep_todo = ();
1239   @jobstep_tomerge = ();
1240   $jobstep_tomerge_level = 0;
1241   my $frozenjob = {};
1242
1243   my $stream = new Warehouse::Stream ( whc => $whc,
1244                                        hash => [split (",", $key)] );
1245   $stream->rewind;
1246   while (my $dataref = $stream->read_until (undef, "\n\n"))
1247   {
1248     if ($$dataref =~ /^job /)
1249     {
1250       foreach (split ("\n", $$dataref))
1251       {
1252         my ($k, $v) = split ("=", $_, 2);
1253         $frozenjob->{$k} = freezeunquote ($v);
1254       }
1255       next;
1256     }
1257
1258     if ($$dataref =~ /^merge (\d+) (.*)/)
1259     {
1260       $jobstep_tomerge_level = $1;
1261       @jobstep_tomerge
1262           = map { freezeunquote ($_) } split ("\n", freezeunquote($2));
1263       next;
1264     }
1265
1266     my $Jobstep = { };
1267     foreach (split ("\n", $$dataref))
1268     {
1269       my ($k, $v) = split ("=", $_, 2);
1270       $Jobstep->{$k} = freezeunquote ($v) if $k;
1271     }
1272     $Jobstep->{'failures'} = 0;
1273     push @jobstep, $Jobstep;
1274
1275     if ($Jobstep->{exitcode} eq "0")
1276     {
1277       push @jobstep_done, $#jobstep;
1278     }
1279     else
1280     {
1281       push @jobstep_todo, $#jobstep;
1282     }
1283   }
1284
1285   foreach (qw (script script_version script_parameters))
1286   {
1287     $Job->{$_} = $frozenjob->{$_};
1288   }
1289   $Job->save if $job_has_uuid;
1290 }
1291
1292
1293 sub freezequote
1294 {
1295   my $s = shift;
1296   $s =~ s/\\/\\\\/g;
1297   $s =~ s/\n/\\n/g;
1298   return $s;
1299 }
1300
1301
1302 sub freezeunquote
1303 {
1304   my $s = shift;
1305   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1306   return $s;
1307 }
1308
1309
1310 sub srun
1311 {
1312   my $srunargs = shift;
1313   my $execargs = shift;
1314   my $opts = shift || {};
1315   my $stdin = shift;
1316   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1317   print STDERR (join (" ",
1318                       map { / / ? "'$_'" : $_ }
1319                       (@$args)),
1320                 "\n")
1321       if $ENV{CRUNCH_DEBUG};
1322
1323   if (defined $stdin) {
1324     my $child = open STDIN, "-|";
1325     defined $child or die "no fork: $!";
1326     if ($child == 0) {
1327       print $stdin or die $!;
1328       close STDOUT or die $!;
1329       exit 0;
1330     }
1331   }
1332
1333   return system (@$args) if $opts->{fork};
1334
1335   exec @$args;
1336   warn "ENV size is ".length(join(" ",%ENV));
1337   die "exec failed: $!: @$args";
1338 }
1339
1340
1341 sub ban_node_by_slot {
1342   # Don't start any new jobsteps on this node for 60 seconds
1343   my $slotid = shift;
1344   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1345   $slot[$slotid]->{node}->{hold_count}++;
1346   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1347 }
1348
1349 __DATA__
1350 #!/usr/bin/perl
1351
1352 # checkout-and-build
1353
1354 use Fcntl ':flock';
1355
1356 my $destdir = $ENV{"CRUNCH_SRC"};
1357 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1358 my $repo = $ENV{"CRUNCH_SRC_URL"};
1359
1360 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1361 flock L, LOCK_EX;
1362 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1363     exit 0;
1364 }
1365
1366 unlink "$destdir.commit";
1367 open STDOUT, ">", "$destdir.log";
1368 open STDERR, ">&STDOUT";
1369
1370 mkdir $destdir;
1371 my @git_archive_data = <DATA>;
1372 if (@git_archive_data) {
1373   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1374   print TARX @git_archive_data;
1375   if(!close(TARX)) {
1376     die "'tar -C $destdir -xf -' exited $?: $!";
1377   }
1378 }
1379
1380 my $pwd;
1381 chomp ($pwd = `pwd`);
1382 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1383 mkdir $install_dir;
1384
1385 for my $src_path ("$destdir/arvados/sdk/python") {
1386   if (-d $src_path) {
1387     shell_or_die ("virtualenv", $install_dir);
1388     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1389   }
1390 }
1391
1392 if (-e "$destdir/crunch_scripts/install") {
1393     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1394 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1395     # Old version
1396     shell_or_die ("./tests/autotests.sh", $install_dir);
1397 } elsif (-e "./install.sh") {
1398     shell_or_die ("./install.sh", $install_dir);
1399 }
1400
1401 if ($commit) {
1402     unlink "$destdir.commit.new";
1403     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1404     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1405 }
1406
1407 close L;
1408
1409 exit 0;
1410
1411 sub shell_or_die
1412 {
1413   if ($ENV{"DEBUG"}) {
1414     print STDERR "@_\n";
1415   }
1416   system (@_) == 0
1417       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1418 }
1419
1420 __DATA__