Remove legacy 'thaw' code (refs #2221).
[arvados.git] / sdk / cli / bin / crunch-job
1 #!/usr/bin/perl
2 # -*- mode: perl; perl-indent-level: 2; -*-
3
4 =head1 NAME
5
6 crunch-job: Execute job steps, save snapshots as requested, collate output.
7
8 =head1 SYNOPSIS
9
10 Obtain job details from Arvados, run tasks on compute nodes (typically
11 invoked by scheduler on controller):
12
13  crunch-job --job x-y-z
14
15 Obtain job details from command line, run tasks on local machine
16 (typically invoked by application or developer on VM):
17
18  crunch-job --job '{"script_version":"/path/to/tree","script":"scriptname",...}'
19
20 =head1 OPTIONS
21
22 =over
23
24 =item --force-unlock
25
26 If the job is already locked, steal the lock and run it anyway.
27
28 =item --git-dir
29
30 Path to .git directory where the specified commit is found.
31
32 =item --job-api-token
33
34 Arvados API authorization token to use during the course of the job.
35
36 =back
37
38 =head1 RUNNING JOBS LOCALLY
39
40 crunch-job's log messages appear on stderr along with the job tasks'
41 stderr streams. The log is saved in Keep at each checkpoint and when
42 the job finishes.
43
44 If the job succeeds, the job's output locator is printed on stdout.
45
46 While the job is running, the following signals are accepted:
47
48 =over
49
50 =item control-C, SIGINT, SIGQUIT
51
52 Save a checkpoint, terminate any job tasks that are running, and stop.
53
54 =item SIGALRM
55
56 Save a checkpoint and continue.
57
58 =item SIGHUP
59
60 Refresh node allocation (i.e., check whether any nodes have been added
61 or unallocated) and attributes of the Job record that should affect
62 behavior (e.g., cancel job if cancelled_at becomes non-nil).
63
64 =back
65
66 =cut
67
68
69 use strict;
70 use POSIX ':sys_wait_h';
71 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
72 use Arvados;
73 use Getopt::Long;
74 use IPC::Open2;
75 use IO::Select;
76 use File::Temp;
77
78 $ENV{"TMPDIR"} ||= "/tmp";
79 unless (defined $ENV{"CRUNCH_TMP"}) {
80   $ENV{"CRUNCH_TMP"} = $ENV{"TMPDIR"} . "/crunch-job";
81   if ($ENV{"USER"} ne "crunch" && $< != 0) {
82     # use a tmp dir unique for my uid
83     $ENV{"CRUNCH_TMP"} .= "-$<";
84   }
85 }
86 $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work";
87 $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt";
88 $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated
89 mkdir ($ENV{"JOB_WORK"});
90
91 my $force_unlock;
92 my $git_dir;
93 my $jobspec;
94 my $job_api_token;
95 my $resume_stash;
96 GetOptions('force-unlock' => \$force_unlock,
97            'git-dir=s' => \$git_dir,
98            'job=s' => \$jobspec,
99            'job-api-token=s' => \$job_api_token,
100            'resume-stash=s' => \$resume_stash,
101     );
102
103 if (defined $job_api_token) {
104   $ENV{ARVADOS_API_TOKEN} = $job_api_token;
105 }
106
107 my $have_slurm = exists $ENV{SLURM_JOBID} && exists $ENV{SLURM_NODELIST};
108 my $job_has_uuid = $jobspec =~ /^[-a-z\d]+$/;
109 my $local_job = !$job_has_uuid;
110
111
112 $SIG{'USR1'} = sub
113 {
114   $main::ENV{CRUNCH_DEBUG} = 1;
115 };
116 $SIG{'USR2'} = sub
117 {
118   $main::ENV{CRUNCH_DEBUG} = 0;
119 };
120
121
122
123 my $arv = Arvados->new('apiVersion' => 'v1');
124 my $metastream;
125
126 my $User = $arv->{'users'}->{'current'}->execute;
127
128 my $Job = {};
129 my $job_id;
130 my $dbh;
131 my $sth;
132 if ($job_has_uuid)
133 {
134   $Job = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
135   if (!$force_unlock) {
136     if ($Job->{'is_locked_by_uuid'}) {
137       croak("Job is locked: " . $Job->{'is_locked_by_uuid'});
138     }
139     if ($Job->{'success'} ne undef) {
140       croak("Job 'success' flag (" . $Job->{'success'} . ") is not null");
141     }
142     if ($Job->{'running'}) {
143       croak("Job 'running' flag is already set");
144     }
145     if ($Job->{'started_at'}) {
146       croak("Job 'started_at' time is already set (" . $Job->{'started_at'} . ")");
147     }
148   }
149 }
150 else
151 {
152   $Job = JSON::decode_json($jobspec);
153
154   if (!$resume_stash)
155   {
156     map { croak ("No $_ specified") unless $Job->{$_} }
157     qw(script script_version script_parameters);
158   }
159
160   $Job->{'is_locked_by_uuid'} = $User->{'uuid'};
161   $Job->{'started_at'} = gmtime;
162
163   $Job = $arv->{'jobs'}->{'create'}->execute('job' => $Job);
164
165   $job_has_uuid = 1;
166 }
167 $job_id = $Job->{'uuid'};
168
169 my $keep_logfile = $job_id . '.log.txt';
170 my $local_logfile = File::Temp->new();
171
172 $Job->{'runtime_constraints'} ||= {};
173 $Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
174 my $max_ncpus = $Job->{'runtime_constraints'}->{'max_tasks_per_node'};
175
176
177 Log (undef, "check slurm allocation");
178 my @slot;
179 my @node;
180 # Should use $ENV{SLURM_TASKS_PER_NODE} instead of sinfo? (eg. "4(x3),2,4(x2)")
181 my @sinfo;
182 if (!$have_slurm)
183 {
184   my $localcpus = 0 + `grep -cw ^processor /proc/cpuinfo` || 1;
185   push @sinfo, "$localcpus localhost";
186 }
187 if (exists $ENV{SLURM_NODELIST})
188 {
189   push @sinfo, `sinfo -h --format='%c %N' --nodes='$ENV{SLURM_NODELIST}'`;
190 }
191 foreach (@sinfo)
192 {
193   my ($ncpus, $slurm_nodelist) = split;
194   $ncpus = $max_ncpus if $max_ncpus && $ncpus > $max_ncpus;
195
196   my @nodelist;
197   while ($slurm_nodelist =~ s/^([^\[,]+?(\[.*?\])?)(,|$)//)
198   {
199     my $nodelist = $1;
200     if ($nodelist =~ /\[((\d+)(-(\d+))?(,(\d+)(-(\d+))?)*)\]/)
201     {
202       my $ranges = $1;
203       foreach (split (",", $ranges))
204       {
205         my ($a, $b);
206         if (/(\d+)-(\d+)/)
207         {
208           $a = $1;
209           $b = $2;
210         }
211         else
212         {
213           $a = $_;
214           $b = $_;
215         }
216         push @nodelist, map {
217           my $n = $nodelist;
218           $n =~ s/\[[-,\d]+\]/$_/;
219           $n;
220         } ($a..$b);
221       }
222     }
223     else
224     {
225       push @nodelist, $nodelist;
226     }
227   }
228   foreach my $nodename (@nodelist)
229   {
230     Log (undef, "node $nodename - $ncpus slots");
231     my $node = { name => $nodename,
232                  ncpus => $ncpus,
233                  losing_streak => 0,
234                  hold_until => 0 };
235     foreach my $cpu (1..$ncpus)
236     {
237       push @slot, { node => $node,
238                     cpu => $cpu };
239     }
240   }
241   push @node, @nodelist;
242 }
243
244
245
246 # Ensure that we get one jobstep running on each allocated node before
247 # we start overloading nodes with concurrent steps
248
249 @slot = sort { $a->{cpu} <=> $b->{cpu} } @slot;
250
251
252
253 my $jobmanager_id;
254 if ($job_has_uuid)
255 {
256   # Claim this job, and make sure nobody else does
257   unless ($Job->update_attributes('is_locked_by_uuid' => $User->{'uuid'}) &&
258           $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) {
259     croak("Error while updating / locking job");
260   }
261   $Job->update_attributes('started_at' => scalar gmtime,
262                           'running' => 1,
263                           'success' => undef,
264                           'tasks_summary' => { 'failed' => 0,
265                                                'todo' => 1,
266                                                'running' => 0,
267                                                'done' => 0 });
268 }
269
270
271 Log (undef, "start");
272 $SIG{'INT'} = sub { $main::please_freeze = 1; };
273 $SIG{'QUIT'} = sub { $main::please_freeze = 1; };
274 $SIG{'TERM'} = \&croak;
275 $SIG{'TSTP'} = sub { $main::please_freeze = 1; };
276 $SIG{'ALRM'} = sub { $main::please_info = 1; };
277 $SIG{'CONT'} = sub { $main::please_continue = 1; };
278 $SIG{'HUP'} = sub { $main::please_refresh = 1; };
279
280 $main::please_freeze = 0;
281 $main::please_info = 0;
282 $main::please_continue = 0;
283 $main::please_refresh = 0;
284 my $jobsteps_must_output_keys = 0;      # becomes 1 when any task outputs a key
285
286 grep { $ENV{$1} = $2 if /^(NOCACHE.*?)=(.*)/ } split ("\n", $$Job{knobs});
287 $ENV{"CRUNCH_JOB_UUID"} = $job_id;
288 $ENV{"JOB_UUID"} = $job_id;
289
290
291 my @jobstep;
292 my @jobstep_todo = ();
293 my @jobstep_done = ();
294 my @jobstep_tomerge = ();
295 my $jobstep_tomerge_level = 0;
296 my $squeue_checked;
297 my $squeue_kill_checked;
298 my $output_in_keep = 0;
299 my $latest_refresh = scalar time;
300
301
302
303 if (defined $Job->{thawedfromkey})
304 {
305   thaw ($Job->{thawedfromkey});
306 }
307 else
308 {
309   my $first_task = $arv->{'job_tasks'}->{'create'}->execute('job_task' => {
310     'job_uuid' => $Job->{'uuid'},
311     'sequence' => 0,
312     'qsequence' => 0,
313     'parameters' => {},
314                                                           });
315   push @jobstep, { 'level' => 0,
316                    'failures' => 0,
317                    'arvados_task' => $first_task,
318                  };
319   push @jobstep_todo, 0;
320 }
321
322
323 my $build_script;
324
325
326 $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
327
328 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
329 if ($skip_install)
330 {
331   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
332   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
333     if (-d $src_path) {
334       system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0
335           or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8));
336       system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install")
337           == 0
338           or croak ("setup.py in $src_path failed: exit ".($?>>8));
339     }
340   }
341 }
342 else
343 {
344   do {
345     local $/ = undef;
346     $build_script = <DATA>;
347   };
348   Log (undef, "Install revision ".$Job->{script_version});
349   my $nodelist = join(",", @node);
350
351   # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
352
353   my $cleanpid = fork();
354   if ($cleanpid == 0)
355   {
356     srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
357           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
358     exit (1);
359   }
360   while (1)
361   {
362     last if $cleanpid == waitpid (-1, WNOHANG);
363     freeze_if_want_freeze ($cleanpid);
364     select (undef, undef, undef, 0.1);
365   }
366   Log (undef, "Clean-work-dir exited $?");
367
368   # Install requested code version
369
370   my @execargs;
371   my @srunargs = ("srun",
372                   "--nodelist=$nodelist",
373                   "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
374
375   $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
376   $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src";
377
378   my $commit;
379   my $git_archive;
380   my $treeish = $Job->{'script_version'};
381   my $repo = $git_dir || $ENV{'CRUNCH_DEFAULT_GIT_DIR'};
382   # Todo: let script_version specify repository instead of expecting
383   # parent process to figure it out.
384   $ENV{"CRUNCH_SRC_URL"} = $repo;
385
386   # Create/update our clone of the remote git repo
387
388   if (!-d $ENV{"CRUNCH_SRC"}) {
389     system(qw(git clone), $repo, $ENV{"CRUNCH_SRC"}) == 0
390         or croak ("git clone $repo failed: exit ".($?>>8));
391     system("cd $ENV{CRUNCH_SRC} && git config clean.requireForce false");
392   }
393   `cd $ENV{CRUNCH_SRC} && git remote set-url origin \"\$CRUNCH_SRC_URL\" && git fetch -q origin`;
394
395   # If this looks like a subversion r#, look for it in git-svn commit messages
396
397   if ($treeish =~ m{^\d{1,4}$}) {
398     my $gitlog = `cd $ENV{CRUNCH_SRC} && git log --pretty="format:%H" --grep="git-svn-id:.*\@$treeish " origin/master`;
399     chomp $gitlog;
400     if ($gitlog =~ /^[a-f0-9]{40}$/) {
401       $commit = $gitlog;
402       Log (undef, "Using commit $commit for script_version $treeish");
403     }
404   }
405
406   # If that didn't work, try asking git to look it up as a tree-ish.
407
408   if (!defined $commit) {
409
410     my $cooked_treeish = $treeish;
411     if ($treeish !~ m{^[0-9a-f]{5,}$}) {
412       # Looks like a git branch name -- make sure git knows it's
413       # relative to the remote repo
414       $cooked_treeish = "origin/$treeish";
415     }
416
417     my $found = `cd $ENV{CRUNCH_SRC} && git rev-list -1 $cooked_treeish`;
418     chomp $found;
419     if ($found =~ /^[0-9a-f]{40}$/s) {
420       $commit = $found;
421       if ($commit ne $treeish) {
422         # Make sure we record the real commit id in the database,
423         # frozentokey, logs, etc. -- instead of an abbreviation or a
424         # branch name which can become ambiguous or point to a
425         # different commit in the future.
426         $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
427         Log (undef, "Using commit $commit for tree-ish $treeish");
428         if ($commit ne $treeish) {
429           $Job->{'script_version'} = $commit;
430           !$job_has_uuid or
431               $Job->update_attributes('script_version' => $commit) or
432               croak("Error while updating job");
433         }
434       }
435     }
436   }
437
438   if (defined $commit) {
439     $ENV{"CRUNCH_SRC_COMMIT"} = $commit;
440     @execargs = ("sh", "-c",
441                  "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
442     $git_archive = `cd $ENV{CRUNCH_SRC} && git archive $commit`;
443   }
444   else {
445     croak ("could not figure out commit id for $treeish");
446   }
447
448   my $installpid = fork();
449   if ($installpid == 0)
450   {
451     srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
452     exit (1);
453   }
454   while (1)
455   {
456     last if $installpid == waitpid (-1, WNOHANG);
457     freeze_if_want_freeze ($installpid);
458     select (undef, undef, undef, 0.1);
459   }
460   Log (undef, "Install exited $?");
461 }
462
463
464
465 foreach (qw (script script_version script_parameters runtime_constraints))
466 {
467   Log (undef,
468        "$_ " .
469        (ref($Job->{$_}) ? JSON::encode_json($Job->{$_}) : $Job->{$_}));
470 }
471 foreach (split (/\n/, $Job->{knobs}))
472 {
473   Log (undef, "knob " . $_);
474 }
475
476
477
478 $main::success = undef;
479
480
481
482 ONELEVEL:
483
484 my $thisround_succeeded = 0;
485 my $thisround_failed = 0;
486 my $thisround_failed_multiple = 0;
487
488 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
489                        or $a <=> $b } @jobstep_todo;
490 my $level = $jobstep[$jobstep_todo[0]]->{level};
491 Log (undef, "start level $level");
492
493
494
495 my %proc;
496 my @freeslot = (0..$#slot);
497 my @holdslot;
498 my %reader;
499 my $progress_is_dirty = 1;
500 my $progress_stats_updated = 0;
501
502 update_progress_stats();
503
504
505
506 THISROUND:
507 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
508 {
509   my $id = $jobstep_todo[$todo_ptr];
510   my $Jobstep = $jobstep[$id];
511   if ($Jobstep->{level} != $level)
512   {
513     next;
514   }
515
516   pipe $reader{$id}, "writer" or croak ($!);
517   my $flags = fcntl ($reader{$id}, F_GETFL, 0) or croak ($!);
518   fcntl ($reader{$id}, F_SETFL, $flags | O_NONBLOCK) or croak ($!);
519
520   my $childslot = $freeslot[0];
521   my $childnode = $slot[$childslot]->{node};
522   my $childslotname = join (".",
523                             $slot[$childslot]->{node}->{name},
524                             $slot[$childslot]->{cpu});
525   my $childpid = fork();
526   if ($childpid == 0)
527   {
528     $SIG{'INT'} = 'DEFAULT';
529     $SIG{'QUIT'} = 'DEFAULT';
530     $SIG{'TERM'} = 'DEFAULT';
531
532     foreach (values (%reader))
533     {
534       close($_);
535     }
536     fcntl ("writer", F_SETFL, 0) or croak ($!); # no close-on-exec
537     open(STDOUT,">&writer");
538     open(STDERR,">&writer");
539
540     undef $dbh;
541     undef $sth;
542
543     delete $ENV{"GNUPGHOME"};
544     $ENV{"TASK_UUID"} = $Jobstep->{'arvados_task'}->{'uuid'};
545     $ENV{"TASK_QSEQUENCE"} = $id;
546     $ENV{"TASK_SEQUENCE"} = $level;
547     $ENV{"JOB_SCRIPT"} = $Job->{script};
548     while (my ($param, $value) = each %{$Job->{script_parameters}}) {
549       $param =~ tr/a-z/A-Z/;
550       $ENV{"JOB_PARAMETER_$param"} = $value;
551     }
552     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
553     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
554     $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
555     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
556     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
557     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
558     $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"};
559
560     $ENV{"GZIP"} = "-n";
561
562     my @srunargs = (
563       "srun",
564       "--nodelist=".$childnode->{name},
565       qw(-n1 -c1 -N1 -D), $ENV{'TMPDIR'},
566       "--job-name=$job_id.$id.$$",
567         );
568     my @execargs = qw(sh);
569     my $build_script_to_send = "";
570     my $command =
571         "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; "
572         ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} "
573         ."&& cd $ENV{CRUNCH_TMP} ";
574     if ($build_script)
575     {
576       $build_script_to_send = $build_script;
577       $command .=
578           "&& perl -";
579     }
580     $command .=
581         "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
582     my @execargs = ('bash', '-c', $command);
583     srun (\@srunargs, \@execargs, undef, $build_script_to_send);
584     exit (111);
585   }
586   close("writer");
587   if (!defined $childpid)
588   {
589     close $reader{$id};
590     delete $reader{$id};
591     next;
592   }
593   shift @freeslot;
594   $proc{$childpid} = { jobstep => $id,
595                        time => time,
596                        slot => $childslot,
597                        jobstepname => "$job_id.$id.$childpid",
598                      };
599   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
600   $slot[$childslot]->{pid} = $childpid;
601
602   Log ($id, "job_task ".$Jobstep->{'arvados_task'}->{'uuid'});
603   Log ($id, "child $childpid started on $childslotname");
604   $Jobstep->{starttime} = time;
605   $Jobstep->{node} = $childnode->{name};
606   $Jobstep->{slotindex} = $childslot;
607   delete $Jobstep->{stderr};
608   delete $Jobstep->{finishtime};
609
610   splice @jobstep_todo, $todo_ptr, 1;
611   --$todo_ptr;
612
613   $progress_is_dirty = 1;
614
615   while (!@freeslot
616          ||
617          (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo))
618   {
619     last THISROUND if $main::please_freeze;
620     if ($main::please_info)
621     {
622       $main::please_info = 0;
623       freeze();
624       collate_output();
625       save_meta(1);
626       update_progress_stats();
627     }
628     my $gotsome
629         = readfrompipes ()
630         + reapchildren ();
631     if (!$gotsome)
632     {
633       check_refresh_wanted();
634       check_squeue();
635       update_progress_stats();
636       select (undef, undef, undef, 0.1);
637     }
638     elsif (time - $progress_stats_updated >= 30)
639     {
640       update_progress_stats();
641     }
642     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
643         ($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
644     {
645       my $message = "Repeated failure rate too high ($thisround_failed_multiple/"
646           .($thisround_failed+$thisround_succeeded)
647           .") -- giving up on this round";
648       Log (undef, $message);
649       last THISROUND;
650     }
651
652     # move slots from freeslot to holdslot (or back to freeslot) if necessary
653     for (my $i=$#freeslot; $i>=0; $i--) {
654       if ($slot[$freeslot[$i]]->{node}->{hold_until} > scalar time) {
655         push @holdslot, (splice @freeslot, $i, 1);
656       }
657     }
658     for (my $i=$#holdslot; $i>=0; $i--) {
659       if ($slot[$holdslot[$i]]->{node}->{hold_until} <= scalar time) {
660         push @freeslot, (splice @holdslot, $i, 1);
661       }
662     }
663
664     # give up if no nodes are succeeding
665     if (!grep { $_->{node}->{losing_streak} == 0 &&
666                     $_->{node}->{hold_count} < 4 } @slot) {
667       my $message = "Every node has failed -- giving up on this round";
668       Log (undef, $message);
669       last THISROUND;
670     }
671   }
672 }
673
674
675 push @freeslot, splice @holdslot;
676 map { $slot[$freeslot[$_]]->{node}->{losing_streak} = 0 } (0..$#freeslot);
677
678
679 Log (undef, "wait for last ".(scalar keys %proc)." children to finish");
680 while (%proc)
681 {
682   if ($main::please_continue) {
683     $main::please_continue = 0;
684     goto THISROUND;
685   }
686   $main::please_info = 0, freeze(), collate_output(), save_meta(1) if $main::please_info;
687   readfrompipes ();
688   if (!reapchildren())
689   {
690     check_refresh_wanted();
691     check_squeue();
692     update_progress_stats();
693     select (undef, undef, undef, 0.1);
694     killem (keys %proc) if $main::please_freeze;
695   }
696 }
697
698 update_progress_stats();
699 freeze_if_want_freeze();
700
701
702 if (!defined $main::success)
703 {
704   if (@jobstep_todo &&
705       $thisround_succeeded == 0 &&
706       ($thisround_failed == 0 || $thisround_failed > 4))
707   {
708     my $message = "stop because $thisround_failed tasks failed and none succeeded";
709     Log (undef, $message);
710     $main::success = 0;
711   }
712   if (!@jobstep_todo)
713   {
714     $main::success = 1;
715   }
716 }
717
718 goto ONELEVEL if !defined $main::success;
719
720
721 release_allocation();
722 freeze();
723 if ($job_has_uuid) {
724   $Job->update_attributes('output' => &collate_output(),
725                           'running' => 0,
726                           'success' => $Job->{'output'} && $main::success,
727                           'finished_at' => scalar gmtime)
728 }
729
730 if ($Job->{'output'})
731 {
732   eval {
733     my $manifest_text = `arv keep get $Job->{'output'}`;
734     $arv->{'collections'}->{'create'}->execute('collection' => {
735       'uuid' => $Job->{'output'},
736       'manifest_text' => $manifest_text,
737     });
738   };
739   if ($@) {
740     Log (undef, "Failed to register output manifest: $@");
741   }
742 }
743
744 Log (undef, "finish");
745
746 save_meta();
747 exit 0;
748
749
750
751 sub update_progress_stats
752 {
753   $progress_stats_updated = time;
754   return if !$progress_is_dirty;
755   my ($todo, $done, $running) = (scalar @jobstep_todo,
756                                  scalar @jobstep_done,
757                                  scalar @slot - scalar @freeslot - scalar @holdslot);
758   $Job->{'tasks_summary'} ||= {};
759   $Job->{'tasks_summary'}->{'todo'} = $todo;
760   $Job->{'tasks_summary'}->{'done'} = $done;
761   $Job->{'tasks_summary'}->{'running'} = $running;
762   if ($job_has_uuid) {
763     $Job->update_attributes('tasks_summary' => $Job->{'tasks_summary'});
764   }
765   Log (undef, "status: $done done, $running running, $todo todo");
766   $progress_is_dirty = 0;
767 }
768
769
770
771 sub reapchildren
772 {
773   my $pid = waitpid (-1, WNOHANG);
774   return 0 if $pid <= 0;
775
776   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
777                   . "."
778                   . $slot[$proc{$pid}->{slot}]->{cpu});
779   my $jobstepid = $proc{$pid}->{jobstep};
780   my $elapsed = time - $proc{$pid}->{time};
781   my $Jobstep = $jobstep[$jobstepid];
782
783   my $childstatus = $?;
784   my $exitvalue = $childstatus >> 8;
785   my $exitinfo = sprintf("exit %d signal %d%s",
786                          $exitvalue,
787                          $childstatus & 127,
788                          ($childstatus & 128 ? ' core dump' : ''));
789   $Jobstep->{'arvados_task'}->reload;
790   my $task_success = $Jobstep->{'arvados_task'}->{success};
791
792   Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
793
794   if (!defined $task_success) {
795     # task did not indicate one way or the other --> fail
796     $Jobstep->{'arvados_task'}->{success} = 0;
797     $Jobstep->{'arvados_task'}->save;
798     $task_success = 0;
799   }
800
801   if (!$task_success)
802   {
803     my $temporary_fail;
804     $temporary_fail ||= $Jobstep->{node_fail};
805     $temporary_fail ||= ($exitvalue == 111);
806
807     ++$thisround_failed;
808     ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
809
810     # Check for signs of a failed or misconfigured node
811     if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
812         2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
813       # Don't count this against jobstep failure thresholds if this
814       # node is already suspected faulty and srun exited quickly
815       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
816           $elapsed < 5) {
817         Log ($jobstepid, "blaming failure on suspect node " .
818              $slot[$proc{$pid}->{slot}]->{node}->{name});
819         $temporary_fail ||= 1;
820       }
821       ban_node_by_slot($proc{$pid}->{slot});
822     }
823
824     Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
825                              ++$Jobstep->{'failures'},
826                              $temporary_fail ? 'temporary ' : 'permanent',
827                              $elapsed));
828
829     if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
830       # Give up on this task, and the whole job
831       $main::success = 0;
832       $main::please_freeze = 1;
833     }
834     else {
835       # Put this task back on the todo queue
836       push @jobstep_todo, $jobstepid;
837     }
838     $Job->{'tasks_summary'}->{'failed'}++;
839   }
840   else
841   {
842     ++$thisround_succeeded;
843     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
844     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
845     push @jobstep_done, $jobstepid;
846     Log ($jobstepid, "success in $elapsed seconds");
847   }
848   $Jobstep->{exitcode} = $childstatus;
849   $Jobstep->{finishtime} = time;
850   process_stderr ($jobstepid, $task_success);
851   Log ($jobstepid, "output " . $Jobstep->{'arvados_task'}->{output});
852
853   close $reader{$jobstepid};
854   delete $reader{$jobstepid};
855   delete $slot[$proc{$pid}->{slot}]->{pid};
856   push @freeslot, $proc{$pid}->{slot};
857   delete $proc{$pid};
858
859   # Load new tasks
860   my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
861     'where' => {
862       'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
863     },
864     'order' => 'qsequence'
865   );
866   foreach my $arvados_task (@{$newtask_list->{'items'}}) {
867     my $jobstep = {
868       'level' => $arvados_task->{'sequence'},
869       'failures' => 0,
870       'arvados_task' => $arvados_task
871     };
872     push @jobstep, $jobstep;
873     push @jobstep_todo, $#jobstep;
874   }
875
876   $progress_is_dirty = 1;
877   1;
878 }
879
880 sub check_refresh_wanted
881 {
882   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
883   if (@stat && $stat[9] > $latest_refresh) {
884     $latest_refresh = scalar time;
885     if ($job_has_uuid) {
886       my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec);
887       for my $attr ('cancelled_at',
888                     'cancelled_by_user_uuid',
889                     'cancelled_by_client_uuid') {
890         $Job->{$attr} = $Job2->{$attr};
891       }
892       if ($Job->{'cancelled_at'}) {
893         Log (undef, "Job cancelled at " . $Job->{cancelled_at} .
894              " by user " . $Job->{cancelled_by_user_uuid});
895         $main::success = 0;
896         $main::please_freeze = 1;
897       }
898     }
899   }
900 }
901
902 sub check_squeue
903 {
904   # return if the kill list was checked <4 seconds ago
905   if (defined $squeue_kill_checked && $squeue_kill_checked > time - 4)
906   {
907     return;
908   }
909   $squeue_kill_checked = time;
910
911   # use killem() on procs whose killtime is reached
912   for (keys %proc)
913   {
914     if (exists $proc{$_}->{killtime}
915         && $proc{$_}->{killtime} <= time)
916     {
917       killem ($_);
918     }
919   }
920
921   # return if the squeue was checked <60 seconds ago
922   if (defined $squeue_checked && $squeue_checked > time - 60)
923   {
924     return;
925   }
926   $squeue_checked = time;
927
928   if (!$have_slurm)
929   {
930     # here is an opportunity to check for mysterious problems with local procs
931     return;
932   }
933
934   # get a list of steps still running
935   my @squeue = `squeue -s -h -o '%i %j' && echo ok`;
936   chop @squeue;
937   if ($squeue[-1] ne "ok")
938   {
939     return;
940   }
941   pop @squeue;
942
943   # which of my jobsteps are running, according to squeue?
944   my %ok;
945   foreach (@squeue)
946   {
947     if (/^(\d+)\.(\d+) (\S+)/)
948     {
949       if ($1 eq $ENV{SLURM_JOBID})
950       {
951         $ok{$3} = 1;
952       }
953     }
954   }
955
956   # which of my active child procs (>60s old) were not mentioned by squeue?
957   foreach (keys %proc)
958   {
959     if ($proc{$_}->{time} < time - 60
960         && !exists $ok{$proc{$_}->{jobstepname}}
961         && !exists $proc{$_}->{killtime})
962     {
963       # kill this proc if it hasn't exited in 30 seconds
964       $proc{$_}->{killtime} = time + 30;
965     }
966   }
967 }
968
969
970 sub release_allocation
971 {
972   if ($have_slurm)
973   {
974     Log (undef, "release job allocation");
975     system "scancel $ENV{SLURM_JOBID}";
976   }
977 }
978
979
980 sub readfrompipes
981 {
982   my $gotsome = 0;
983   foreach my $job (keys %reader)
984   {
985     my $buf;
986     while (0 < sysread ($reader{$job}, $buf, 8192))
987     {
988       print STDERR $buf if $ENV{CRUNCH_DEBUG};
989       $jobstep[$job]->{stderr} .= $buf;
990       preprocess_stderr ($job);
991       if (length ($jobstep[$job]->{stderr}) > 16384)
992       {
993         substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
994       }
995       $gotsome = 1;
996     }
997   }
998   return $gotsome;
999 }
1000
1001
1002 sub preprocess_stderr
1003 {
1004   my $job = shift;
1005
1006   while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
1007     my $line = $1;
1008     substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
1009     Log ($job, "stderr $line");
1010     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
1011       # whoa.
1012       $main::please_freeze = 1;
1013     }
1014     elsif ($line =~ /srun: error: (Node failure on|Unable to create job step) /) {
1015       $jobstep[$job]->{node_fail} = 1;
1016       ban_node_by_slot($jobstep[$job]->{slotindex});
1017     }
1018   }
1019 }
1020
1021
1022 sub process_stderr
1023 {
1024   my $job = shift;
1025   my $task_success = shift;
1026   preprocess_stderr ($job);
1027
1028   map {
1029     Log ($job, "stderr $_");
1030   } split ("\n", $jobstep[$job]->{stderr});
1031 }
1032
1033 sub fetch_block
1034 {
1035   my $hash = shift;
1036   my ($child_out, $child_in, $output_block);
1037
1038   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'get', $hash);
1039   sysread($child_out, $output_block, 64 * 1024 * 1024);
1040   waitpid($pid, 0);
1041   return $output_block;
1042 }
1043
1044 sub collate_output
1045 {
1046   Log (undef, "collate");
1047
1048   my ($child_out, $child_in);
1049   my $pid = open2($child_out, $child_in, 'arv', 'keep', 'put', '--raw');
1050   my $joboutput;
1051   for (@jobstep)
1052   {
1053     next if (!exists $_->{'arvados_task'}->{output} ||
1054              !$_->{'arvados_task'}->{'success'} ||
1055              $_->{'exitcode'} != 0);
1056     my $output = $_->{'arvados_task'}->{output};
1057     if ($output !~ /^[0-9a-f]{32}(\+\S+)*$/)
1058     {
1059       $output_in_keep ||= $output =~ / [0-9a-f]{32}\S*\+K/;
1060       print $child_in $output;
1061     }
1062     elsif (@jobstep == 1)
1063     {
1064       $joboutput = $output;
1065       last;
1066     }
1067     elsif (defined (my $outblock = fetch_block ($output)))
1068     {
1069       $output_in_keep ||= $outblock =~ / [0-9a-f]{32}\S*\+K/;
1070       print $child_in $outblock;
1071     }
1072     else
1073     {
1074       print $child_in "XXX fetch_block($output) failed XXX\n";
1075       $main::success = 0;
1076     }
1077   }
1078   $child_in->close;
1079
1080   if (!defined $joboutput) {
1081     my $s = IO::Select->new($child_out);
1082     sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(5);
1083   }
1084   waitpid($pid, 0);
1085
1086   if ($joboutput)
1087   {
1088     Log (undef, "output $joboutput");
1089     $Job->update_attributes('output' => $joboutput) if $job_has_uuid;
1090   }
1091   else
1092   {
1093     Log (undef, "output undef");
1094   }
1095   return $joboutput;
1096 }
1097
1098
1099 sub killem
1100 {
1101   foreach (@_)
1102   {
1103     my $sig = 2;                # SIGINT first
1104     if (exists $proc{$_}->{"sent_$sig"} &&
1105         time - $proc{$_}->{"sent_$sig"} > 4)
1106     {
1107       $sig = 15;                # SIGTERM if SIGINT doesn't work
1108     }
1109     if (exists $proc{$_}->{"sent_$sig"} &&
1110         time - $proc{$_}->{"sent_$sig"} > 4)
1111     {
1112       $sig = 9;                 # SIGKILL if SIGTERM doesn't work
1113     }
1114     if (!exists $proc{$_}->{"sent_$sig"})
1115     {
1116       Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
1117       kill $sig, $_;
1118       select (undef, undef, undef, 0.1);
1119       if ($sig == 2)
1120       {
1121         kill $sig, $_;     # srun wants two SIGINT to really interrupt
1122       }
1123       $proc{$_}->{"sent_$sig"} = time;
1124       $proc{$_}->{"killedafter"} = time - $proc{$_}->{"time"};
1125     }
1126   }
1127 }
1128
1129
1130 sub fhbits
1131 {
1132   my($bits);
1133   for (@_) {
1134     vec($bits,fileno($_),1) = 1;
1135   }
1136   $bits;
1137 }
1138
1139
1140 sub Log                         # ($jobstep_id, $logmessage)
1141 {
1142   if ($_[1] =~ /\n/) {
1143     for my $line (split (/\n/, $_[1])) {
1144       Log ($_[0], $line);
1145     }
1146     return;
1147   }
1148   my $fh = select STDERR; $|=1; select $fh;
1149   my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
1150   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
1151   $message .= "\n";
1152   my $datetime;
1153   if ($metastream || -t STDERR) {
1154     my @gmtime = gmtime;
1155     $datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
1156                          $gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
1157   }
1158   print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
1159
1160   if ($metastream) {
1161     print $metastream $datetime . " " . $message;
1162   }
1163 }
1164
1165
1166 sub croak
1167 {
1168   my ($package, $file, $line) = caller;
1169   my $message = "@_ at $file line $line\n";
1170   Log (undef, $message);
1171   freeze() if @jobstep_todo;
1172   collate_output() if @jobstep_todo;
1173   cleanup();
1174   save_meta() if $metastream;
1175   die;
1176 }
1177
1178
1179 sub cleanup
1180 {
1181   return if !$job_has_uuid;
1182   $Job->update_attributes('running' => 0,
1183                           'success' => 0,
1184                           'finished_at' => scalar gmtime);
1185 }
1186
1187
1188 sub save_meta
1189 {
1190   my $justcheckpoint = shift; # false if this will be the last meta saved
1191   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
1192
1193   $local_logfile->flush;
1194   my $cmd = "arv keep put --filename $keep_logfile ". $local_logfile->filename;
1195   my $loglocator = `$cmd`;
1196   die "system $cmd failed: $?" if $?;
1197
1198   $local_logfile = undef;   # the temp file is automatically deleted
1199   Log (undef, "log manifest is $loglocator");
1200   $Job->{'log'} = $loglocator;
1201   $Job->update_attributes('log', $loglocator) if $job_has_uuid;
1202 }
1203
1204
1205 sub freeze_if_want_freeze
1206 {
1207   if ($main::please_freeze)
1208   {
1209     release_allocation();
1210     if (@_)
1211     {
1212       # kill some srun procs before freeze+stop
1213       map { $proc{$_} = {} } @_;
1214       while (%proc)
1215       {
1216         killem (keys %proc);
1217         select (undef, undef, undef, 0.1);
1218         my $died;
1219         while (($died = waitpid (-1, WNOHANG)) > 0)
1220         {
1221           delete $proc{$died};
1222         }
1223       }
1224     }
1225     freeze();
1226     collate_output();
1227     cleanup();
1228     save_meta();
1229     exit 0;
1230   }
1231 }
1232
1233
1234 sub freeze
1235 {
1236   Log (undef, "Freeze not implemented");
1237   return;
1238 }
1239
1240
1241 sub thaw
1242 {
1243   croak ("Thaw not implemented");
1244 }
1245
1246
1247 sub freezequote
1248 {
1249   my $s = shift;
1250   $s =~ s/\\/\\\\/g;
1251   $s =~ s/\n/\\n/g;
1252   return $s;
1253 }
1254
1255
1256 sub freezeunquote
1257 {
1258   my $s = shift;
1259   $s =~ s{\\(.)}{$1 eq "n" ? "\n" : $1}ge;
1260   return $s;
1261 }
1262
1263
1264 sub srun
1265 {
1266   my $srunargs = shift;
1267   my $execargs = shift;
1268   my $opts = shift || {};
1269   my $stdin = shift;
1270   my $args = $have_slurm ? [@$srunargs, @$execargs] : $execargs;
1271   print STDERR (join (" ",
1272                       map { / / ? "'$_'" : $_ }
1273                       (@$args)),
1274                 "\n")
1275       if $ENV{CRUNCH_DEBUG};
1276
1277   if (defined $stdin) {
1278     my $child = open STDIN, "-|";
1279     defined $child or die "no fork: $!";
1280     if ($child == 0) {
1281       print $stdin or die $!;
1282       close STDOUT or die $!;
1283       exit 0;
1284     }
1285   }
1286
1287   return system (@$args) if $opts->{fork};
1288
1289   exec @$args;
1290   warn "ENV size is ".length(join(" ",%ENV));
1291   die "exec failed: $!: @$args";
1292 }
1293
1294
1295 sub ban_node_by_slot {
1296   # Don't start any new jobsteps on this node for 60 seconds
1297   my $slotid = shift;
1298   $slot[$slotid]->{node}->{hold_until} = 60 + scalar time;
1299   $slot[$slotid]->{node}->{hold_count}++;
1300   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
1301 }
1302
1303 __DATA__
1304 #!/usr/bin/perl
1305
1306 # checkout-and-build
1307
1308 use Fcntl ':flock';
1309
1310 my $destdir = $ENV{"CRUNCH_SRC"};
1311 my $commit = $ENV{"CRUNCH_SRC_COMMIT"};
1312 my $repo = $ENV{"CRUNCH_SRC_URL"};
1313
1314 open L, ">", "$destdir.lock" or die "$destdir.lock: $!";
1315 flock L, LOCK_EX;
1316 if (readlink ("$destdir.commit") eq $commit && -d $destdir) {
1317     exit 0;
1318 }
1319
1320 unlink "$destdir.commit";
1321 open STDOUT, ">", "$destdir.log";
1322 open STDERR, ">&STDOUT";
1323
1324 mkdir $destdir;
1325 my @git_archive_data = <DATA>;
1326 if (@git_archive_data) {
1327   open TARX, "|-", "tar", "-C", $destdir, "-xf", "-";
1328   print TARX @git_archive_data;
1329   if(!close(TARX)) {
1330     die "'tar -C $destdir -xf -' exited $?: $!";
1331   }
1332 }
1333
1334 my $pwd;
1335 chomp ($pwd = `pwd`);
1336 my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt";
1337 mkdir $install_dir;
1338
1339 for my $src_path ("$destdir/arvados/sdk/python") {
1340   if (-d $src_path) {
1341     shell_or_die ("virtualenv", $install_dir);
1342     shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install");
1343   }
1344 }
1345
1346 if (-e "$destdir/crunch_scripts/install") {
1347     shell_or_die ("$destdir/crunch_scripts/install", $install_dir);
1348 } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {
1349     # Old version
1350     shell_or_die ("./tests/autotests.sh", $install_dir);
1351 } elsif (-e "./install.sh") {
1352     shell_or_die ("./install.sh", $install_dir);
1353 }
1354
1355 if ($commit) {
1356     unlink "$destdir.commit.new";
1357     symlink ($commit, "$destdir.commit.new") or die "$destdir.commit.new: $!";
1358     rename ("$destdir.commit.new", "$destdir.commit") or die "$destdir.commit: $!";
1359 }
1360
1361 close L;
1362
1363 exit 0;
1364
1365 sub shell_or_die
1366 {
1367   if ($ENV{"DEBUG"}) {
1368     print STDERR "@_\n";
1369   }
1370   system (@_) == 0
1371       or die "@_ failed: $! exit 0x".sprintf("%x",$?);
1372 }
1373
1374 __DATA__