Use quotemeta to protect shell escapes (refs #2221, #2325)
[arvados.git] / sdk / cli / bin / crunch-job
index 07b49436657b316628a16db7a1b7156129bf7ca8..e81628cc2b3d7087ea8c50b1d546f298064c9ba9 100755 (executable)
@@ -33,6 +33,12 @@ Path to .git directory where the specified commit is found.
 
 Arvados API authorization token to use during the course of the job.
 
 
 Arvados API authorization token to use during the course of the job.
 
+=item --no-clear-tmp
+
+Do not clear per-job/task temporary directories during initial job
+setup. This can speed up development and debugging when running jobs
+locally.
+
 =back
 
 =head1 RUNNING JOBS LOCALLY
 =back
 
 =head1 RUNNING JOBS LOCALLY
@@ -74,6 +80,7 @@ use Getopt::Long;
 use IPC::Open2;
 use IO::Select;
 use File::Temp;
 use IPC::Open2;
 use IO::Select;
 use File::Temp;
+use Fcntl ':flock';
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -92,11 +99,13 @@ my $force_unlock;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
 my $git_dir;
 my $jobspec;
 my $job_api_token;
+my $no_clear_tmp;
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
 my $resume_stash;
 GetOptions('force-unlock' => \$force_unlock,
            'git-dir=s' => \$git_dir,
            'job=s' => \$jobspec,
            'job-api-token=s' => \$job_api_token,
+           'no-clear-tmp' => \$no_clear_tmp,
            'resume-stash=s' => \$resume_stash,
     );
 
            'resume-stash=s' => \$resume_stash,
     );
 
@@ -320,6 +329,12 @@ else
 }
 
 
 }
 
 
+if (!$have_slurm)
+{
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
+
 my $build_script;
 
 
 my $build_script;
 
 
@@ -328,6 +343,11 @@ $ENV{"CRUNCH_SRC_COMMIT"} = $Job->{script_version};
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
 my $skip_install = ($local_job && $Job->{script_version} =~ m{^/});
 if ($skip_install)
 {
+  if (!defined $no_clear_tmp) {
+    my $clear_tmp_cmd = 'rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*';
+    system($clear_tmp_cmd) == 0
+       or croak ("`$clear_tmp_cmd` failed: ".($?>>8));
+  }
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
   $ENV{"CRUNCH_SRC"} = $Job->{script_version};
   for my $src_path ("$ENV{CRUNCH_SRC}/arvados/sdk/python") {
     if (-d $src_path) {
@@ -348,22 +368,24 @@ else
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
   Log (undef, "Install revision ".$Job->{script_version});
   my $nodelist = join(",", @node);
 
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
+  if (!defined $no_clear_tmp) {
+    # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
 
 
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-         ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
+    my $cleanpid = fork();
+    if ($cleanpid == 0)
+    {
+      srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+           ['bash', '-c', 'if mount | grep -q $JOB_WORK/; then sudo /bin/umount $JOB_WORK/* 2>/dev/null; fi; sleep 1; rm -rf $JOB_WORK $CRUNCH_TMP/opt $CRUNCH_TMP/src*']);
+      exit (1);
+    }
+    while (1)
+    {
+      last if $cleanpid == waitpid (-1, WNOHANG);
+      freeze_if_want_freeze ($cleanpid);
+      select (undef, undef, undef, 0.1);
+    }
+    Log (undef, "Clean-work-dir exited $?");
   }
   }
-  Log (undef, "Clean-work-dir exited $?");
 
   # Install requested code version
 
 
   # Install requested code version
 
@@ -460,6 +482,12 @@ else
   Log (undef, "Install exited $?");
 }
 
   Log (undef, "Install exited $?");
 }
 
+if (!$have_slurm)
+{
+  # Grab our lock again (we might have deleted and re-created CRUNCH_TMP above)
+  must_lock_now("$ENV{CRUNCH_TMP}/.lock", "a job is already running here.");
+}
+
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
 
 
 foreach (qw (script script_version script_parameters runtime_constraints))
@@ -551,7 +579,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
     }
     $ENV{"TASK_SLOT_NODE"} = $slot[$childslot]->{node}->{name};
     $ENV{"TASK_SLOT_NUMBER"} = $slot[$childslot]->{cpu};
-    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/".$slot[$childslot]->{cpu};
+    $ENV{"TASK_WORK"} = $ENV{"JOB_WORK"}."/$id.$$";
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
     $ENV{"TASK_KEEPMOUNT"} = $ENV{"TASK_WORK"}."/keep";
     $ENV{"TASK_TMPDIR"} = $ENV{"TASK_WORK"}; # deprecated
     $ENV{"CRUNCH_NODE_SLOTS"} = $slot[$childslot]->{node}->{ncpus};
@@ -730,7 +758,7 @@ if ($job_has_uuid) {
 if ($Job->{'output'})
 {
   eval {
 if ($Job->{'output'})
 {
   eval {
-    my $manifest_text = `arv keep get $Job->{'output'}`;
+    my $manifest_text = `arv keep get \Q$Job->{'output'}\E`;
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
     $arv->{'collections'}->{'create'}->execute('collection' => {
       'uuid' => $Job->{'output'},
       'manifest_text' => $manifest_text,
@@ -1071,7 +1099,7 @@ sub collate_output
     }
     else
     {
     }
     else
     {
-      print $child_in "XXX fetch_block($output) failed XXX\n";
+      Log (undef, "XXX fetch_block($output) failed XXX");
       $main::success = 0;
     }
   }
       $main::success = 0;
     }
   }
@@ -1079,7 +1107,11 @@ sub collate_output
 
   if (!defined $joboutput) {
     my $s = IO::Select->new($child_out);
 
   if (!defined $joboutput) {
     my $s = IO::Select->new($child_out);
-    sysread($child_out, $joboutput, 64 * 1024 * 1024) if $s->can_read(5);
+    if ($s->can_read(120)) {
+      sysread($child_out, $joboutput, 64 * 1024 * 1024);
+    } else {
+      Log (undef, "timed out reading from 'arv keep put'");
+    }
   }
   waitpid($pid, 0);
 
   }
   waitpid($pid, 0);
 
@@ -1191,7 +1223,8 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
 
   $local_logfile->flush;
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
 
   $local_logfile->flush;
-  my $cmd = "arv keep put --filename $keep_logfile ". $local_logfile->filename;
+  my $cmd = "arv keep put --filename \Q$keep_logfile\E "
+      . quotemeta($local_logfile->filename);
   my $loglocator = `$cmd`;
   die "system $cmd failed: $?" if $?;
 
   my $loglocator = `$cmd`;
   die "system $cmd failed: $?" if $?;
 
@@ -1300,6 +1333,15 @@ sub ban_node_by_slot {
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
   Log (undef, "backing off node " . $slot[$slotid]->{node}->{name} . " for 60 seconds");
 }
 
+sub must_lock_now
+{
+  my ($lockfile, $error_message) = @_;
+  open L, ">", $lockfile or croak("$lockfile: $!");
+  if (!flock L, LOCK_EX|LOCK_NB) {
+    croak("Can't lock $lockfile: $error_message\n");
+  }
+}
+
 __DATA__
 #!/usr/bin/perl
 
 __DATA__
 #!/usr/bin/perl