Merge branch 'master' into 3296-user-profile
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index 304628f43869ea83cc5871461dbcff0055be0287..f20acebbd0057fe8131848299e1cda39c45501bf 100755 (executable)
@@ -243,7 +243,7 @@ class PipelineInstance
   end
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
-                             :body => {
+                             :body_object => {
                                :pipeline_instance => attributes
                              },
                              :authenticated => false,
@@ -252,7 +252,7 @@ class PipelineInstance
                              })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
-      abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
     end
     debuglog "Created pipeline instance: #{j[:uuid]}"
     self.new(j)
@@ -262,8 +262,8 @@ class PipelineInstance
                              :parameters => {
                                :uuid => @pi[:uuid]
                              },
-                             :body => {
-                               :pipeline_instance => @attributes_to_update.to_json
+                             :body_object => {
+                               :pipeline_instance => @attributes_to_update
                              },
                              :authenticated => false,
                              :headers => {
@@ -322,18 +322,13 @@ class JobCache
       []
     end
   end
-  def self.create(job, create_params)
+  def self.create(pipeline, component, job, create_params)
     @cache ||= {}
 
-    jsonified_create_params = {}
-    create_params.each do |k, v|
-      jsonified_create_params[k] = v.to_json
-    end
+    body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
 
     result = $client.execute(:api_method => $arvados.jobs.create,
-                             :body => {
-                               :job => job.to_json
-                             }.merge(jsonified_create_params),
+                             :body_object => body,
                              :authenticated => false,
                              :headers => {
                                authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
@@ -342,10 +337,36 @@ class JobCache
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
+      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+
+      msg = ""
+      j[:errors].each do |err|
+        msg += "Error creating job for component #{component}: #{err}\n"
+      end
+      msg += "Job submission was: #{body.to_json}"
+
+      $client.execute(:api_method => $arvados.logs.create,
+                      :body_object => {
+                        :log => {
+                          :object_uuid => pipeline[:uuid],
+                          :event_type => 'stderr',
+                          :owner_uuid => pipeline[:owner_uuid],
+                          :properties => {"text" => msg}
+                        }
+                      },
+                      :authenticated => false,
+                      :headers => {
+                        authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                      })
       nil
     end
   end
+
+  protected
+
+  def self.no_nil_values(hash)
+    hash.reject { |key, value| value.nil? }
+  end
 end
 
 class WhRunPipelineInstance
@@ -397,7 +418,7 @@ class WhRunPipelineInstance
         param = params_args.shift.sub /^--/, ''
         params[param] = params_args.shift
       else
-        abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
+        abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
       end
     end
 
@@ -426,23 +447,35 @@ class WhRunPipelineInstance
       end
     end
     if !errors.empty?
-      abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
     end
     debuglog "options=" + @options.pretty_inspect
     self
   end
 
   def setup_instance
-    if $options[:submit]
-      @instance ||= PipelineInstance.
-        create(:components => @components,
-              :pipeline_template_uuid => @template[:uuid],
-              :state => 'New')
+    if @instance
+      @instance[:properties][:run_options] ||= {}
+      if @options[:no_reuse]
+        # override properties of existing instance
+        @instance[:properties][:run_options][:enable_job_reuse] = false
+      else
+        # Default to "enable reuse" if not specified. (This code path
+        # can go away when old clients go away.)
+        if @instance[:properties][:run_options][:enable_job_reuse].nil?
+          @instance[:properties][:run_options][:enable_job_reuse] = true
+        end
+      end
     else
-      @instance ||= PipelineInstance.
-        create(:components => @components,
-             :pipeline_template_uuid => @template[:uuid],
-             :state => 'RunningOnClient')
+      @instance = PipelineInstance.
+        create(components: @components,
+               properties: {
+                 run_options: {
+                   enable_job_reuse: !@options[:no_reuse]
+                 }
+               },
+               pipeline_template_uuid: @template[:uuid],
+               state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
     end
     self
   end
@@ -468,7 +501,7 @@ class WhRunPipelineInstance
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
-          job = JobCache.create({
+          job = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => c[:script_parameters],
             :script_version => c[:script_version],
@@ -482,7 +515,8 @@ class WhRunPipelineInstance
             # dealing with new API servers.
             :minimum_script_version => c[:minimum_script_version],
             :exclude_script_versions => c[:exclude_minimum_script_versions],
-            :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+            :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
+                                !c[:nondeterministic]),
             :filters => c[:filters]
           })
           if job