abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
begin
+ require 'arvados'
require 'rubygems'
require 'json'
require 'pp'
abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
-Try: gem install pp google-api-client json trollop
+Try: gem install arvados pp google-api-client json trollop
EOS
end
:application_name => File.split($0).last,
:application_version => $application_version.to_s)
$arvados = $client.discovered_api('arvados', $arvados_api_version)
+$arv = Arvados.new api_version: 'v1'
class PipelineInstance
moretodo = false
@components.each do |cname, c|
job = nil
-
+ # Is the job satisfying this component already known to be
+ # finished? (Already meaning "before we query API server about
+ # the job's current state")
+ c_already_finished = (c[:job] &&
+ c[:job][:uuid] &&
+ !c[:job][:success].nil?)
if !c[:job] and
c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
# No job yet associated with this component and is component inputs
job = JobCache.create({:script => c[:script],
:script_parameters => c[:script_parameters],
:script_version => c[:script_version],
+ :repository => c[:repository],
:minimum_script_version => c[:minimum_script_version],
:exclude_script_versions => c[:exclude_minimum_script_versions],
:nondeterministic => c[:nondeterministic],
- :no_reuse => @options[:no_reuse]})
+ :no_reuse => @options[:no_reuse],
+ :output_is_persistent => c[:output_is_persistent] || false})
if job
debuglog "component #{cname} new job #{job[:uuid]}"
c[:job] = job
if (c[:job][:running] or
not (c[:job][:finished_at] or c[:job][:cancelled_at]))
# Job is running so update copy of job record
- c[:job] = JobCache.get(c[:job][:uuid])
+ c[:job] = JobCache.get(c[:job][:uuid])
end
if c[:job][:success]
end
end
end
+ unless c_already_finished
+ # This is my first time discovering that the job
+ # succeeded. (At the top of this loop, I was still
+ # waiting for it to finish.)
+ if c[:output_is_persistent]
+ # I need to make sure a resources/wants link is in
+ # place to protect the output from garbage
+ # collection. (Normally Crunch does this for me, but
+ # here I might be reusing the output of someone else's
+ # job and I need to make sure it's understood that the
+ # output is valuable to me, too.)
+ wanted = c[:job][:output]
+ debuglog "checking for existing persistence link for #{wanted}"
+ @my_user_uuid ||= $arv.user.current[:uuid]
+ links = $arv.link.list(limit: 1,
+ filters:
+ [%w(link_class = resources),
+ %w(name = wants),
+ %w(tail_uuid =) + [@my_user_uuid],
+ %w(head_uuid =) + [wanted]
+ ])[:items]
+ if links.any?
+ debuglog "link already exists, uuid #{links.first[:uuid]}"
+ else
+ newlink = $arv.link.create link: \
+ {
+ link_class: 'resources',
+ name: 'wants',
+ tail_kind: 'arvados#user',
+ tail_uuid: @my_user_uuid,
+ head_kind: 'arvados#collection',
+ head_uuid: wanted
+ }
+ debuglog "added link, uuid #{newlink[:uuid]}"
+ end
+ end
+ end
elsif c[:job][:running] ||
(!c[:job][:started_at] && !c[:job][:cancelled_at])
# Job is still running
end
end
end
-
+
if ended == @components.length or failed > 0
@instance[:active] = false
@instance[:success] = (succeeded == @components.length)