abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
end
elsif not $options[:template]
- abort "#{$0}: syntax error: you must supply a --template or --instance."
+ puts "error: you must supply a --template or --instance."
+ p.educate
+ abort
end
if $options[:run_here] == $options[:submit]
:parameters => {
:uuid => uuid
},
- :body => {
- :api_token => ENV['ARVADOS_API_TOKEN']
- },
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
def self.create(attributes)
result = $client.execute(:api_method => $arvados.pipeline_instances.create,
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :pipeline_instance => attributes
+ :pipeline_instance => attributes.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
- abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
end
debuglog "Created pipeline instance: #{j[:uuid]}"
self.new(j)
:uuid => @pi[:uuid]
},
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:pipeline_instance => @attributes_to_update.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
@cache ||= {}
result = $client.execute(:api_method => $arvados.jobs.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@cache[uuid] = JSON.parse result.body, :symbolize_names => true
end
def self.where(conditions)
result = $client.execute(:api_method => $arvados.jobs.list,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:limit => 10000,
:where => conditions.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
list = JSON.parse result.body, :symbolize_names => true
if list and list[:items].is_a? Array
list[:items]
[]
end
end
- def self.create(job, create_params)
+ def self.create(pipeline, component, job, create_params)
@cache ||= {}
+
+ body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
+
result = $client.execute(:api_method => $arvados.jobs.create,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :job => job.to_json
- }.merge(create_params),
- :authenticated => false)
+ :body => body,
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
- debuglog "create job: #{j[:errors] rescue nil} with attributes #{job}", 0
+ debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+
+ msg = ""
+ j[:errors].each do |err|
+ msg += "Error creating job for component #{component}: #{err}\n"
+ end
+ msg += "Job submission was: #{body.to_json}"
+
+ $client.execute(:api_method => $arvados.logs.create,
+ :body => {
+ :log => {
+ :object_uuid => pipeline[:uuid],
+ :event_type => 'stderr',
+ :owner_uuid => pipeline[:owner_uuid],
+ :properties => {"text" => msg}
+ }
+ },
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
nil
end
end
+
+ protected
+
+ def self.no_nil_values(hash)
+ hash.reject { |key, value| value.nil? }
+ end
end
class WhRunPipelineInstance
else
result = $client.execute(:api_method => $arvados.pipeline_templates.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => template
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@template = JSON.parse result.body, :symbolize_names => true
if !@template[:uuid]
abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
param = params_args.shift.sub /^--/, ''
params[param] = params_args.shift
else
- abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
end
end
end
end
if !errors.empty?
- abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
end
debuglog "options=" + @options.pretty_inspect
self
end
def setup_instance
- @instance ||= PipelineInstance.
- create(:components => @components,
+ if $options[:submit]
+ @instance ||= PipelineInstance.
+ create(:components => @components,
+ :pipeline_template_uuid => @template[:uuid],
+ :state => 'New')
+ else
+ @instance ||= PipelineInstance.
+ create(:components => @components,
:pipeline_template_uuid => @template[:uuid],
- :active => true)
+ :state => 'RunningOnClient')
+ end
self
end
def run
moretodo = true
+ interrupted = false
+
+ job_creation_failed = 0
while moretodo
moretodo = false
@components.each do |cname, c|
job = nil
+ owner_uuid = @instance[:owner_uuid]
# Is the job satisfying this component already known to be
# finished? (Already meaning "before we query API server about
# the job's current state")
# No job yet associated with this component and is component inputs
# are fully specified (any output_of script_parameters are resolved
# to real value)
- job = JobCache.create({
+ job = JobCache.create(@instance, cname, {
:script => c[:script],
:script_parameters => c[:script_parameters],
:script_version => c[:script_version],
:repository => c[:repository],
:nondeterministic => c[:nondeterministic],
:output_is_persistent => c[:output_is_persistent] || false,
- # TODO: Delete the following three attributes when
- # supporting pre-20140418 API servers is no longer
- # important. New API servers take these as flags that
- # control behavior of create, rather than job attributes.
- :minimum_script_version => c[:minimum_script_version],
- :exclude_script_versions => c[:exclude_minimum_script_versions],
- :no_reuse => @options[:no_reuse] || c[:nondeterministic],
+ :runtime_constraints => c[:runtime_constraints],
+ :owner_uuid => owner_uuid,
}, {
# This is the right place to put these attributes when
# dealing with new API servers.
:minimum_script_version => c[:minimum_script_version],
:exclude_script_versions => c[:exclude_minimum_script_versions],
:find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+ :filters => c[:filters]
})
if job
debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
else
- debuglog "component #{cname} new job failed"
+ debuglog "component #{cname} new job failed", 0
+ job_creation_failed += 1
end
end
tail_kind: 'arvados#user',
tail_uuid: @my_user_uuid,
head_kind: 'arvados#collection',
- head_uuid: wanted
+ head_uuid: wanted,
+ owner_uuid: owner_uuid
}
debuglog "added link, uuid #{newlink[:uuid]}"
end
end
end
@instance[:components] = @components
- @instance[:active] = moretodo
report_status
if @options[:no_wait]
moretodo = false
end
+ # If job creation fails, just give up on this pipeline instance.
+ if job_creation_failed > 0
+ moretodo = false
+ end
+
if moretodo
begin
sleep 10
rescue Interrupt
debuglog "interrupt", 0
- abort
+ interrupted = true
+ break
end
end
end
failed = 0
@components.each do |cname, c|
if c[:job]
- if c[:job][:finished_at]
+ if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
ended += 1
if c[:job][:success] == true
succeeded += 1
- elsif c[:job][:success] == false
+ elsif c[:job][:success] == false or c[:job][:cancelled_at]
failed += 1
end
end
end
end
- if ended == @components.length or failed > 0
- @instance[:active] = false
- @instance[:success] = (succeeded == @components.length)
+ success = (succeeded == @components.length)
+
+ # A job create call failed. Just give up.
+ if job_creation_failed > 0
+ debuglog "job creation failed - giving up on this pipeline instance", 0
+ success = false
+ failed += 1
end
+ if interrupted
+ if success
+ @instance[:state] = 'Complete'
+ else
+ @instance[:state] = 'Paused'
+ end
+ else
+ if ended == @components.length or failed > 0
+ @instance[:state] = success ? 'Complete' : 'Failed'
+ end
+ end
+
+ debuglog "pipeline instance state is #{@instance[:state]}"
+
+ # set components_summary
+ components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
+ @instance[:components_summary] = components_summary
+
@instance.save
end
def cleanup
- if @instance
- @instance[:active] = false
+ if @instance and @instance[:state] == 'RunningOnClient'
+ @instance[:state] = 'Paused'
@instance.save
end
end