abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
begin
+ require 'arvados'
require 'rubygems'
require 'json'
require 'pp'
abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
-Try: gem install pp google-api-client json trollop
+Try: gem install arvados pp google-api-client json trollop
EOS
end
abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
end
elsif not $options[:template]
- abort "#{$0}: syntax error: you must supply a --template or --instance."
+ puts "error: you must supply a --template or --instance."
+ p.educate
+ abort
end
if $options[:run_here] == $options[:submit]
:application_name => File.split($0).last,
:application_version => $application_version.to_s)
$arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
class PipelineInstance
:parameters => {
:uuid => uuid
},
- :body => {
- :api_token => ENV['ARVADOS_API_TOKEN']
- },
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
def self.create(attributes)
result = $client.execute(:api_method => $arvados.pipeline_instances.create,
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :pipeline_instance => attributes
+ :pipeline_instance => attributes.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
- abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
end
debuglog "Created pipeline instance: #{j[:uuid]}"
self.new(j)
:uuid => @pi[:uuid]
},
:body => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:pipeline_instance => @attributes_to_update.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
@cache ||= {}
result = $client.execute(:api_method => $arvados.jobs.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => uuid
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@cache[uuid] = JSON.parse result.body, :symbolize_names => true
end
def self.where(conditions)
result = $client.execute(:api_method => $arvados.jobs.list,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:limit => 10000,
:where => conditions.to_json
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
list = JSON.parse result.body, :symbolize_names => true
if list and list[:items].is_a? Array
list[:items]
[]
end
end
- def self.create(attributes)
+ def self.create(pipeline, component, job, create_params)
@cache ||= {}
+
+ body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
+
result = $client.execute(:api_method => $arvados.jobs.create,
- :parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
- :job => attributes.to_json
- },
- :authenticated => false)
+ :body => body,
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
j = JSON.parse result.body, :symbolize_names => true
if j.is_a? Hash and j[:uuid]
@cache[j[:uuid]] = j
else
- debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
+ debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
+
+ msg = ""
+ j[:errors].each do |err|
+ msg += "Error creating job for component #{component}: #{err}\n"
+ end
+ msg += "Job submission was: #{body.to_json}"
+
+ $client.execute(:api_method => $arvados.logs.create,
+ :body => {
+ :log => {
+ :object_uuid => pipeline[:uuid],
+ :event_type => 'stderr',
+ :owner_uuid => pipeline[:owner_uuid],
+ :properties => {"text" => msg}
+ }
+ },
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
nil
end
end
+
+ protected
+
+ def self.no_nil_values(hash)
+ hash.reject { |key, value| value.nil? }
+ end
end
class WhRunPipelineInstance
else
result = $client.execute(:api_method => $arvados.pipeline_templates.get,
:parameters => {
- :api_token => ENV['ARVADOS_API_TOKEN'],
:uuid => template
},
- :authenticated => false)
+ :authenticated => false,
+ :headers => {
+ authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
+ })
@template = JSON.parse result.body, :symbolize_names => true
if !@template[:uuid]
abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
param = params_args.shift.sub /^--/, ''
params[param] = params_args.shift
else
- abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
end
end
end
end
if !errors.empty?
- abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
+ abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
end
debuglog "options=" + @options.pretty_inspect
self
end
def setup_instance
- @instance ||= PipelineInstance.
- create(:components => @components,
+ if $options[:submit]
+ @instance ||= PipelineInstance.
+ create(:components => @components,
+ :pipeline_template_uuid => @template[:uuid],
+ :state => 'New')
+ else
+ @instance ||= PipelineInstance.
+ create(:components => @components,
:pipeline_template_uuid => @template[:uuid],
- :active => true)
+ :state => 'RunningOnClient')
+ end
self
end
def run
moretodo = true
+ interrupted = false
+
+ job_creation_failed = 0
while moretodo
moretodo = false
@components.each do |cname, c|
job = nil
-
+ owner_uuid = @instance[:owner_uuid]
+ # Is the job satisfying this component already known to be
+ # finished? (Already meaning "before we query API server about
+ # the job's current state")
+ c_already_finished = (c[:job] &&
+ c[:job][:uuid] &&
+ !c[:job][:success].nil?)
if !c[:job] and
c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
# No job yet associated with this component and is component inputs
# are fully specified (any output_of script_parameters are resolved
# to real value)
- job = JobCache.create({:script => c[:script],
- :script_parameters => c[:script_parameters],
- :script_version => c[:script_version],
- :repository => c[:repository],
- :minimum_script_version => c[:minimum_script_version],
- :exclude_script_versions => c[:exclude_minimum_script_versions],
- :nondeterministic => c[:nondeterministic],
- :no_reuse => @options[:no_reuse]})
+ job = JobCache.create(@instance, cname, {
+ :script => c[:script],
+ :script_parameters => c[:script_parameters],
+ :script_version => c[:script_version],
+ :repository => c[:repository],
+ :nondeterministic => c[:nondeterministic],
+ :output_is_persistent => c[:output_is_persistent] || false,
+ :runtime_constraints => c[:runtime_constraints],
+ :owner_uuid => owner_uuid,
+ }, {
+ # This is the right place to put these attributes when
+ # dealing with new API servers.
+ :minimum_script_version => c[:minimum_script_version],
+ :exclude_script_versions => c[:exclude_minimum_script_versions],
+ :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+ :filters => c[:filters]
+ })
if job
debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
else
- debuglog "component #{cname} new job failed"
+ debuglog "component #{cname} new job failed", 0
+ job_creation_failed += 1
end
end
if (c[:job][:running] or
not (c[:job][:finished_at] or c[:job][:cancelled_at]))
# Job is running so update copy of job record
- c[:job] = JobCache.get(c[:job][:uuid])
+ c[:job] = JobCache.get(c[:job][:uuid])
end
if c[:job][:success]
end
end
end
+ unless c_already_finished
+ # This is my first time discovering that the job
+ # succeeded. (At the top of this loop, I was still
+ # waiting for it to finish.)
+ if c[:output_is_persistent]
+ # I need to make sure a resources/wants link is in
+ # place to protect the output from garbage
+ # collection. (Normally Crunch does this for me, but
+ # here I might be reusing the output of someone else's
+ # job and I need to make sure it's understood that the
+ # output is valuable to me, too.)
+ wanted = c[:job][:output]
+ debuglog "checking for existing persistence link for #{wanted}"
+ @my_user_uuid ||= $arv.user.current[:uuid]
+ links = $arv.link.list(limit: 1,
+ filters:
+ [%w(link_class = resources),
+ %w(name = wants),
+ %w(tail_uuid =) + [@my_user_uuid],
+ %w(head_uuid =) + [wanted]
+ ])[:items]
+ if links.any?
+ debuglog "link already exists, uuid #{links.first[:uuid]}"
+ else
+ newlink = $arv.link.create link: \
+ {
+ link_class: 'resources',
+ name: 'wants',
+ tail_kind: 'arvados#user',
+ tail_uuid: @my_user_uuid,
+ head_kind: 'arvados#collection',
+ head_uuid: wanted,
+ owner_uuid: owner_uuid
+ }
+ debuglog "added link, uuid #{newlink[:uuid]}"
+ end
+ end
+ end
elsif c[:job][:running] ||
(!c[:job][:started_at] && !c[:job][:cancelled_at])
# Job is still running
end
end
@instance[:components] = @components
- @instance[:active] = moretodo
report_status
if @options[:no_wait]
moretodo = false
end
+ # If job creation fails, just give up on this pipeline instance.
+ if job_creation_failed > 0
+ moretodo = false
+ end
+
if moretodo
begin
sleep 10
rescue Interrupt
debuglog "interrupt", 0
- abort
+ interrupted = true
+ break
end
end
end
failed = 0
@components.each do |cname, c|
if c[:job]
- if c[:job][:finished_at]
+ if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
ended += 1
if c[:job][:success] == true
succeeded += 1
- elsif c[:job][:success] == false
+ elsif c[:job][:success] == false or c[:job][:cancelled_at]
failed += 1
end
end
end
end
-
- if ended == @components.length or failed > 0
- @instance[:active] = false
- @instance[:success] = (succeeded == @components.length)
+
+ success = (succeeded == @components.length)
+
+ # A job create call failed. Just give up.
+ if job_creation_failed > 0
+ debuglog "job creation failed - giving up on this pipeline instance", 0
+ success = false
+ failed += 1
+ end
+
+ if interrupted
+ if success
+ @instance[:state] = 'Complete'
+ else
+ @instance[:state] = 'Paused'
+ end
+ else
+ if ended == @components.length or failed > 0
+ @instance[:state] = success ? 'Complete' : 'Failed'
+ end
end
+ debuglog "pipeline instance state is #{@instance[:state]}"
+
+ # set components_summary
+ components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
+ @instance[:components_summary] = components_summary
+
@instance.save
end
def cleanup
- if @instance
- @instance[:active] = false
+ if @instance and @instance[:state] == 'RunningOnClient'
+ @instance[:state] = 'Paused'
@instance.save
end
end