X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/4254934a7eba5e31b829bd53ae5e66443c8e7634..6d59f29ba37c608e4b01b27a2fa78bc065fdc2a6:/sdk/cli/bin/crunch-job diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job index 564a4e6a97..2ba36f2b25 100755 --- a/sdk/cli/bin/crunch-job +++ b/sdk/cli/bin/crunch-job @@ -84,6 +84,7 @@ unless (defined $ENV{"CRUNCH_TMP"}) { } } $ENV{"JOB_WORK"} = $ENV{"CRUNCH_TMP"} . "/work"; +$ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; $ENV{"CRUNCH_WORK"} = $ENV{"JOB_WORK"}; # deprecated mkdir ($ENV{"JOB_WORK"}); @@ -119,10 +120,8 @@ $SIG{'USR2'} = sub -my $arv = Arvados->new; -my $metastream = Warehouse::Stream->new(whc => new Warehouse); -$metastream->clear; -$metastream->write_start('log.txt'); +my $arv = Arvados->new('apiVersion' => 'v1'); +my $metastream; my $User = $arv->{'users'}->{'current'}->execute; @@ -167,6 +166,10 @@ else } $job_id = $Job->{'uuid'}; +$metastream = Warehouse::Stream->new(whc => new Warehouse); +$metastream->clear; +$metastream->name('.'); +$metastream->write_start($job_id . '.log.txt'); $Job->{'runtime_constraints'} ||= {}; @@ -258,7 +261,7 @@ if ($job_has_uuid) $Job->{'is_locked_by_uuid'} == $User->{'uuid'}) { croak("Error while updating / locking job"); } - $Job->update_attributes('started_at' => gmtime, + $Job->update_attributes('started_at' => scalar gmtime, 'running' => 1, 'success' => undef, 'tasks_summary' => { 'failed' => 0, @@ -296,6 +299,7 @@ my $jobstep_tomerge_level = 0; my $squeue_checked; my $squeue_kill_checked; my $output_in_keep = 0; +my $latest_refresh = scalar time; @@ -328,6 +332,15 @@ my $skip_install = ($local_job && $Job->{script_version} =~ m{^/}); if ($skip_install) { $ENV{"CRUNCH_SRC"} = $Job->{script_version}; + for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") { + if (-d $src_path) { + system("virtualenv", "$ENV{CRUNCH_TMP}/opt") == 0 + or croak ("virtualenv $ENV{CRUNCH_TMP}/opt failed: exit ".($?>>8)); + system ("cd $src_path && ./build.sh && \$CRUNCH_TMP/opt/bin/python setup.py install") + == 0 + or croak ("setup.py in $src_path failed: exit ".($?>>8)); + } + } } else { @@ -338,13 +351,13 @@ else Log (undef, "Install revision ".$Job->{script_version}); my $nodelist = join(",", @node); - # Clean out crunch_tmp/work and crunch_tmp/opt + # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src* my $cleanpid = fork(); if ($cleanpid == 0) { srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}], - ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt']); + ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']); exit (1); } while (1) @@ -364,7 +377,6 @@ else $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version}; $ENV{"CRUNCH_SRC"} = "$ENV{CRUNCH_TMP}/src"; - $ENV{"CRUNCH_INSTALL"} = "$ENV{CRUNCH_TMP}/opt"; my $commit; my $git_archive; @@ -543,8 +555,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name}; $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu}; $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu}; + $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep"; $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus}; + $ENV{"PATH"} = $ENV{"CRUNCH_INSTALL"} . "/bin:" . $ENV{"PATH"}; $ENV{"GZIP"} = "-n"; @@ -557,7 +571,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) my @execargs = qw(sh); my $build_script_to_send = ""; my $command = - "mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} " + "if [ -e $ENV{TASK_WORK} ]; then rm -rf $ENV{TASK_WORK}; fi; " + ."mkdir -p $ENV{JOB_WORK} $ENV{CRUNCH_TMP} $ENV{TASK_WORK} $ENV{TASK_KEEPMOUNT} " ."&& cd $ENV{CRUNCH_TMP} "; if ($build_script) { @@ -567,9 +582,10 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) } $ENV{"PYTHONPATH"} =~ s{^}{:} if $ENV{"PYTHONPATH"}; $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/sdk/python}; # xxx hack + $ENV{"PYTHONPATH"} =~ s{^}{$ENV{CRUNCH_SRC}/arvados/sdk/python:}; # xxx hack $ENV{"PYTHONPATH"} =~ s{$}{:/usr/local/arvados/src/sdk/python}; # xxx hack $command .= - "&& exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; + "&& exec arv-mount $ENV{TASK_KEEPMOUNT} --exec $ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"}; my @execargs = ('bash', '-c', $command); srun (\@srunargs, \@execargs, undef, $build_script_to_send); exit (111); @@ -608,24 +624,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) (@slot > @freeslot && $todo_ptr+1 > $#jobstep_todo)) { last THISROUND if $main::please_freeze; - if ($main::please_refresh) - { - $main::please_refresh = 0; - if ($job_has_uuid) { - my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); - for my $attr ('cancelled_at', - 'cancelled_by_user_uuid', - 'cancelled_by_client_uuid') { - $Job->{$attr} = $Job2->{$attr}; - } - if ($Job->{'cancelled_at'}) { - Log (undef, "Job cancelled at " . $Job->{cancelled_at} . - " by user " . $Job->{cancelled_by_user_uuid}); - $main::success = 0; - $main::please_freeze = 1; - } - } - } if ($main::please_info) { $main::please_info = 0; @@ -639,6 +637,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++) + reapchildren (); if (!$gotsome) { + check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); @@ -695,6 +694,7 @@ while (%proc) readfrompipes (); if (!reapchildren()) { + check_refresh_wanted(); check_squeue(); update_progress_stats(); select (undef, undef, undef, 0.1); @@ -731,7 +731,7 @@ if ($job_has_uuid) { $Job->update_attributes('output' => &collate_output(), 'running' => 0, 'success' => $Job->{'output'} && $main::success, - 'finished_at' => gmtime) + 'finished_at' => scalar gmtime) } if ($Job->{'output'}) @@ -884,6 +884,27 @@ sub reapchildren 1; } +sub check_refresh_wanted +{ + my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"}; + if (@stat && $stat[9] > $latest_refresh) { + $latest_refresh = scalar time; + if ($job_has_uuid) { + my $Job2 = $arv->{'jobs'}->{'get'}->execute('uuid' => $jobspec); + for my $attr ('cancelled_at', + 'cancelled_by_user_uuid', + 'cancelled_by_client_uuid') { + $Job->{$attr} = $Job2->{$attr}; + } + if ($Job->{'cancelled_at'}) { + Log (undef, "Job cancelled at " . $Job->{cancelled_at} . + " by user " . $Job->{cancelled_by_user_uuid}); + $main::success = 0; + $main::please_freeze = 1; + } + } + } +} sub check_squeue { @@ -1149,7 +1170,7 @@ sub cleanup return if !$job_has_uuid; $Job->update_attributes('running' => 0, 'success' => 0, - 'finished_at' => gmtime); + 'finished_at' => scalar gmtime); } @@ -1159,9 +1180,14 @@ sub save_meta my $m = $metastream; $m = $m->copy if $justcheckpoint; $m->write_finish; - my $loglocator = $m->as_key; + my $whc = Warehouse->new; + my $loglocator = $whc->store_block ($m->as_string); + $arv->{'collections'}->{'create'}->execute('collection' => { + 'uuid' => $loglocator, + 'manifest_text' => $m->as_string, + }); undef $metastream if !$justcheckpoint; # otherwise Log() will try to use it - Log (undef, "meta key is $loglocator"); + Log (undef, "log manifest is $loglocator"); $Job->{'log'} = $loglocator; $Job->update_attributes('log', $loglocator) if $job_has_uuid; } @@ -1337,7 +1363,7 @@ my $repo = $ENV{"CRUNCH_SRC_URL"}; open L, ">", "$destdir.lock" or die "$destdir.lock: $!"; flock L, LOCK_EX; -if (readlink ("$destdir.commit") eq $commit) { +if (readlink ("$destdir.commit") eq $commit && -d $destdir) { exit 0; } @@ -1346,16 +1372,27 @@ open STDOUT, ">", "$destdir.log"; open STDERR, ">&STDOUT"; mkdir $destdir; -open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; -print TARX ; -if(!close(TARX)) { - die "'tar -C $destdir -xf -' exited $?: $!"; +my @git_archive_data = ; +if (@git_archive_data) { + open TARX, "|-", "tar", "-C", $destdir, "-xf", "-"; + print TARX @git_archive_data; + if(!close(TARX)) { + die "'tar -C $destdir -xf -' exited $?: $!"; + } } my $pwd; chomp ($pwd = `pwd`); my $install_dir = $ENV{"CRUNCH_INSTALL"} || "$pwd/opt"; mkdir $install_dir; + +for my $src_path ("$destdir/arvados/sdk/python") { + if (-d $src_path) { + shell_or_die ("virtualenv", $install_dir); + shell_or_die ("cd $src_path && ./build.sh && $install_dir/bin/python setup.py install"); + } +} + if (-e "$destdir/crunch_scripts/install") { shell_or_die ("$destdir/crunch_scripts/install", $install_dir); } elsif (!-e "./install.sh" && -e "./tests/autotests.sh") {