use POSIX ':sys_wait_h';
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Arvados;
+use Digest::MD5 qw(md5_hex);
use Getopt::Long;
use IPC::Open2;
use IO::Select;
my $arv = Arvados->new('apiVersion' => 'v1');
-my $metastream;
+my $local_logfile;
my $User = $arv->{'users'}->{'current'}->execute;
$job_id = $Job->{'uuid'};
my $keep_logfile = $job_id . '.log.txt';
-my $local_logfile = File::Temp->new();
+$local_logfile = File::Temp->new();
$Job->{'runtime_constraints'} ||= {};
$Job->{'runtime_constraints'}->{'max_tasks_per_node'} ||= 0;
$command .= "&& exec arv-mount --allow-other $ENV{TASK_KEEPMOUNT} --exec ";
if ($docker_image)
{
- $command .= "$docker_bin run -i -a stdin -a stdout -a stderr ";
+ $command .= "crunchstat -cgroup-parent=/sys/fs/cgroup/lxc -cgroup-cid=$ENV{TASK_WORK}/docker.cid -poll=1000 ";
+ $command .= "$docker_bin run -i -a stdin -a stdout -a stderr -cidfile=$ENV{TASK_WORK}/docker.cid ";
# Dynamically configure the container to use the host system as its
# DNS server. Get the host's global addresses from the ip command,
# and turn them into docker --dns options using gawk.
}
while (my ($env_key, $env_val) = each %ENV)
{
- $command .= "-e \Q$env_key=$env_val\E ";
+ if ($env_key =~ /^(JOB|TASK)_/) {
+ $command .= "-e \Q$env_key=$env_val\E ";
+ }
}
- $command .= "$docker_image ";
+ $command .= "\Q$docker_image\E ";
+ } else {
+ $command .= "crunchstat -cgroup-path=/sys/fs/cgroup "
}
$command .= "$ENV{CRUNCH_SRC}/crunch_scripts/" . $Job->{"script"};
my @execargs = ('bash', '-c', $command);
release_allocation();
freeze();
+my $collated_output = &collate_output();
+
if ($job_has_uuid) {
- $Job->update_attributes('output' => &collate_output(),
- 'running' => 0,
- 'success' => $Job->{'output'} && $main::success,
+ $Job->update_attributes('running' => 0,
+ 'success' => $collated_output && $main::success,
'finished_at' => scalar gmtime)
}
-if ($Job->{'output'})
+if ($collated_output)
{
eval {
- my $manifest_text = `arv keep get ''\Q$Job->{'output'}\E`;
- $arv->{'collections'}->{'create'}->execute('collection' => {
- 'uuid' => $Job->{'output'},
+ open(my $orig_manifest, '-|', 'arv', 'keep', 'get', $collated_output)
+ or die "failed to get collated manifest: $!";
+ # Read the original manifest, and strip permission hints from it,
+ # so we can put the result in a Collection.
+ my @manifest_lines = ();
+ while (my $manifest_line = <$orig_manifest>) {
+ my @words = split(/ /, $manifest_line, -1);
+ foreach my $ii (0..$#words) {
+ if ($words[$ii] =~ /^[0-9a-f]{32}\+/) {
+ $words[$ii] =~ s/\+A[0-9a-f]{40}@[0-9a-f]{8}\b//;
+ }
+ }
+ push(@manifest_lines, join(" ", @words));
+ }
+ my $manifest_text = join("", @manifest_lines);
+ my $output = $arv->{'collections'}->{'create'}->execute('collection' => {
+ 'uuid' => md5_hex($manifest_text),
'manifest_text' => $manifest_text,
});
+ $Job->update_attributes('output' => $output->{uuid});
if ($Job->{'output_is_persistent'}) {
$arv->{'links'}->{'create'}->execute('link' => {
'tail_kind' => 'arvados#user',
delete $proc{$pid};
# Load new tasks
- my $newtask_list = $arv->{'job_tasks'}->{'list'}->execute(
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence'
- );
- foreach my $arvados_task (@{$newtask_list->{'items'}}) {
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = $arv->{'job_tasks'}->{'list'}->execute(
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ foreach my $arvados_task (@$newtask_list) {
my $jobstep = {
'level' => $arvados_task->{'sequence'},
'failures' => 0,
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
- if ($metastream || -t STDERR) {
+ if ($local_logfile || -t STDERR) {
my @gmtime = gmtime;
$datetime = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d",
$gmtime[5]+1900, $gmtime[4]+1, @gmtime[3,2,1,0]);
}
print STDERR ((-t STDERR) ? ($datetime." ".$message) : $message);
- if ($metastream) {
- print $metastream $datetime . " " . $message;
+ if ($local_logfile) {
+ print $local_logfile $datetime . " " . $message;
}
}
freeze() if @jobstep_todo;
collate_output() if @jobstep_todo;
cleanup();
- save_meta() if $metastream;
+ save_meta() if $local_logfile;
die;
}
. quotemeta($local_logfile->filename);
my $loglocator = `$cmd`;
die "system $cmd failed: $?" if $?;
+ chomp($loglocator);
$local_logfile = undef; # the temp file is automatically deleted
Log (undef, "log manifest is $loglocator");