3373: arv-run-pipeline-instance: removed .to_json where objects are supplied to ...
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index f0ada5d12aedcbfbacc7b3a2103b667d800dd89b..0fdf27d96ff5ea9a3d56e462d9f6e9a5f6f646fd 100755 (executable)
@@ -187,7 +187,9 @@ if $options[:instance]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
   end
 elsif not $options[:template]
-  abort "#{$0}: syntax error: you must supply a --template or --instance."
+  puts "error: you must supply a --template or --instance."
+  p.educate
+  abort
 end
 
 if $options[:run_here] == $options[:submit]
@@ -226,10 +228,10 @@ class PipelineInstance
                              :parameters => {
                                :uuid => uuid
                              },
-                             :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN']
-                             },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
       debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
@@ -241,14 +243,16 @@ class PipelineInstance
   end
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
-                             :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
+                             :body_object => {
                                :pipeline_instance => attributes
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
-      abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
     end
     debuglog "Created pipeline instance: #{j[:uuid]}"
     self.new(j)
@@ -258,11 +262,13 @@ class PipelineInstance
                              :parameters => {
                                :uuid => @pi[:uuid]
                              },
-                             :body => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :pipeline_instance => @attributes_to_update.to_json
+                             :body_object => {
+                               :pipeline_instance => @attributes_to_update
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     unless j.is_a? Hash and j[:uuid]
       debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
@@ -291,20 +297,24 @@ class JobCache
     @cache ||= {}
     result = $client.execute(:api_method => $arvados.jobs.get,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :uuid => uuid
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     @cache[uuid] = JSON.parse result.body, :symbolize_names => true
   end
   def self.where(conditions)
     result = $client.execute(:api_method => $arvados.jobs.list,
                              :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
                                :limit => 10000,
                                :where => conditions.to_json
                              },
-                             :authenticated => false)
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     list = JSON.parse result.body, :symbolize_names => true
     if list and list[:items].is_a? Array
       list[:items]
@@ -312,22 +322,51 @@ class JobCache
       []
     end
   end
-  def self.create(job, create_params)
+  def self.create(pipeline, component, job, create_params)
     @cache ||= {}
+
+    body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
+
     result = $client.execute(:api_method => $arvados.jobs.create,
-                             :parameters => {
-                               :api_token => ENV['ARVADOS_API_TOKEN'],
-                               :job => job.to_json
-                             }.merge(create_params),
-                             :authenticated => false)
+                             :body_object => body,
+                             :authenticated => false,
+                             :headers => {
+                               authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                             })
     j = JSON.parse result.body, :symbolize_names => true
     if j.is_a? Hash and j[:uuid]
       @cache[j[:uuid]] = j
     else
-      debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
+      debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+
+      msg = ""
+      j[:errors].each do |err|
+        msg += "Error creating job for component #{component}: #{err}\n"
+      end
+      msg += "Job submission was: #{body.to_json}"
+
+      $client.execute(:api_method => $arvados.logs.create,
+                      :body_object => {
+                        :log => {
+                          :object_uuid => pipeline[:uuid],
+                          :event_type => 'stderr',
+                          :owner_uuid => pipeline[:owner_uuid],
+                          :properties => {"text" => msg}
+                        }
+                      },
+                      :authenticated => false,
+                      :headers => {
+                        authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                      })
       nil
     end
   end
+
+  protected
+
+  def self.no_nil_values(hash)
+    hash.reject { |key, value| value.nil? }
+  end
 end
 
 class WhRunPipelineInstance
@@ -348,10 +387,12 @@ class WhRunPipelineInstance
     else
       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
                                :parameters => {
-                                 :api_token => ENV['ARVADOS_API_TOKEN'],
                                  :uuid => template
                                },
-                               :authenticated => false)
+                               :authenticated => false,
+                               :headers => {
+                                 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+                               })
       @template = JSON.parse result.body, :symbolize_names => true
       if !@template[:uuid]
         abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
@@ -377,7 +418,7 @@ class WhRunPipelineInstance
         param = params_args.shift.sub /^--/, ''
         params[param] = params_args.shift
       else
-        abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
+        abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
       end
     end
 
@@ -406,7 +447,7 @@ class WhRunPipelineInstance
       end
     end
     if !errors.empty?
-      abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
     end
     debuglog "options=" + @options.pretty_inspect
     self
@@ -416,7 +457,8 @@ class WhRunPipelineInstance
     if $options[:submit]
       @instance ||= PipelineInstance.
         create(:components => @components,
-              :pipeline_template_uuid => @template[:uuid])
+              :pipeline_template_uuid => @template[:uuid],
+              :state => 'New')
     else
       @instance ||= PipelineInstance.
         create(:components => @components,
@@ -430,10 +472,12 @@ class WhRunPipelineInstance
     moretodo = true
     interrupted = false
 
+    job_creation_failed = 0
     while moretodo
       moretodo = false
       @components.each do |cname, c|
         job = nil
+        owner_uuid = @instance[:owner_uuid]
         # Is the job satisfying this component already known to be
         # finished? (Already meaning "before we query API server about
         # the job's current state")
@@ -445,32 +489,29 @@ class WhRunPipelineInstance
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
-          job = JobCache.create({
+          job = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => c[:script_parameters],
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
             :output_is_persistent => c[:output_is_persistent] || false,
-            # TODO: Delete the following three attributes when
-            # supporting pre-20140418 API servers is no longer
-            # important. New API servers take these as flags that
-            # control behavior of create, rather than job attributes.
-            :minimum_script_version => c[:minimum_script_version],
-            :exclude_script_versions => c[:exclude_minimum_script_versions],
-            :no_reuse => @options[:no_reuse] || c[:nondeterministic],
+            :runtime_constraints => c[:runtime_constraints],
+            :owner_uuid => owner_uuid,
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
             :minimum_script_version => c[:minimum_script_version],
             :exclude_script_versions => c[:exclude_minimum_script_versions],
             :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+            :filters => c[:filters]
           })
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
           else
-            debuglog "component #{cname} new job failed"
+            debuglog "component #{cname} new job failed", 0
+            job_creation_failed += 1
           end
         end
 
@@ -524,7 +565,8 @@ class WhRunPipelineInstance
                     tail_kind: 'arvados#user',
                     tail_uuid: @my_user_uuid,
                     head_kind: 'arvados#collection',
-                    head_uuid: wanted
+                    head_uuid: wanted,
+                    owner_uuid: owner_uuid
                   }
                   debuglog "added link, uuid #{newlink[:uuid]}"
                 end
@@ -540,13 +582,17 @@ class WhRunPipelineInstance
         end
       end
       @instance[:components] = @components
-      @instance[:active] = moretodo
       report_status
 
       if @options[:no_wait]
         moretodo = false
       end
 
+      # If job creation fails, just give up on this pipeline instance.
+      if job_creation_failed > 0
+        moretodo = false
+      end
+
       if moretodo
         begin
           sleep 10
@@ -554,7 +600,6 @@ class WhRunPipelineInstance
           debuglog "interrupt", 0
           interrupted = true
           break
-          #abort
         end
       end
     end
@@ -564,11 +609,11 @@ class WhRunPipelineInstance
     failed = 0
     @components.each do |cname, c|
       if c[:job]
-        if c[:job][:finished_at]
+        if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
           ended += 1
           if c[:job][:success] == true
             succeeded += 1
-          elsif c[:job][:success] == false
+          elsif c[:job][:success] == false or c[:job][:cancelled_at]
             failed += 1
           end
         end
@@ -577,24 +622,27 @@ class WhRunPipelineInstance
 
     success = (succeeded == @components.length)
 
+    # A job create call failed. Just give up.
+    if job_creation_failed > 0
+      debuglog "job creation failed - giving up on this pipeline instance", 0
+      success = false
+      failed += 1
+    end
+
     if interrupted
      if success
-        @instance[:active] = false
-        @instance[:success] = success
-        @instance[:state] = "Complete"
+        @instance[:state] = 'Complete'
      else
-        @instance[:active] = nil
-        @instance[:success] = nil
         @instance[:state] = 'Paused'
       end
     else
       if ended == @components.length or failed > 0
-        @instance[:active] = false
-        @instance[:success] = success
-        @instance[:state] = success ? "Complete" : "Failed"
+        @instance[:state] = success ? 'Complete' : 'Failed'
       end
     end
 
+    debuglog "pipeline instance state is #{@instance[:state]}"
+
     # set components_summary
     components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
     @instance[:components_summary] = components_summary
@@ -603,8 +651,8 @@ class WhRunPipelineInstance
   end
 
   def cleanup
-    if @instance
-      @instance[:active] = false
+    if @instance and @instance[:state] == 'RunningOnClient'
+      @instance[:state] = 'Paused'
       @instance.save
     end
   end