X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/cedd2b046ddf4d2f819a4d1dedbbe82d4e70e72d..6386f2aca83b477210bda19a3284ea31463d2efc:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index 265d634d58..b19bf04dd6 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -243,8 +243,8 @@ class PipelineInstance end def self.create(attributes) result = $client.execute(:api_method => $arvados.pipeline_instances.create, - :body => { - :pipeline_instance => attributes.to_json + :body_object => { + :pipeline_instance => attributes }, :authenticated => false, :headers => { @@ -262,8 +262,8 @@ class PipelineInstance :parameters => { :uuid => @pi[:uuid] }, - :body => { - :pipeline_instance => @attributes_to_update.to_json + :body_object => { + :pipeline_instance => @attributes_to_update }, :authenticated => false, :headers => { @@ -285,6 +285,16 @@ class PipelineInstance def [](x) @pi[x] end + + def log_stderr(msg) + $arv.log.create log: { + event_type: 'stderr', + object_uuid: self[:uuid], + owner_uuid: self[:owner_uuid], + properties: {"text" => msg}, + } + end + protected def initialize(j) @attributes_to_update = {} @@ -328,7 +338,7 @@ class JobCache body = {job: no_nil_values(job)}.merge(no_nil_values(create_params)) result = $client.execute(:api_method => $arvados.jobs.create, - :body => body, + :body_object => body, :authenticated => false, :headers => { authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] @@ -345,19 +355,7 @@ class JobCache end msg += "Job submission was: #{body.to_json}" - $client.execute(:api_method => $arvados.logs.create, - :body => { - :log => { - :object_uuid => pipeline[:uuid], - :event_type => 'stderr', - :owner_uuid => pipeline[:owner_uuid], - :properties => {"text" => msg} - } - }, - :authenticated => false, - :headers => { - authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] - }) + pipeline.log_stderr(msg) nil end end @@ -380,10 +378,6 @@ class WhRunPipelineInstance if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true - if !@template[:components] - abort ("#{$0}: Template loaded from #{template} " + - "does not have a \"components\" key") - end else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { @@ -422,8 +416,25 @@ class WhRunPipelineInstance end end + if not @template[:components].is_a?(Hash) + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" + end @components = @template[:components].dup + bad_components = @components.each_pair.select do |cname, cspec| + not cspec.is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" + end + + bad_components = @components.each_pair.select do |cname, cspec| + not cspec[:script_parameters].is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" + end + errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| @@ -475,7 +486,7 @@ class WhRunPipelineInstance } }, pipeline_template_uuid: @template[:uuid], - state: ($options[:submit] ? 'New' : 'RunningOnClient')) + state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self end @@ -484,6 +495,10 @@ class WhRunPipelineInstance moretodo = true interrupted = false + if @instance[:started_at].nil? + @instance[:started_at] = Time.now + end + job_creation_failed = 0 while moretodo moretodo = false @@ -507,7 +522,6 @@ class WhRunPipelineInstance :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], - :output_is_persistent => c[:output_is_persistent] || false, :runtime_constraints => c[:runtime_constraints], :owner_uuid => owner_uuid, }, { @@ -551,37 +565,52 @@ class WhRunPipelineInstance # This is my first time discovering that the job # succeeded. (At the top of this loop, I was still # waiting for it to finish.) - if c[:output_is_persistent] - # I need to make sure a resources/wants link is in - # place to protect the output from garbage - # collection. (Normally Crunch does this for me, but - # here I might be reusing the output of someone else's - # job and I need to make sure it's understood that the - # output is valuable to me, too.) - wanted = c[:job][:output] - debuglog "checking for existing persistence link for #{wanted}" - @my_user_uuid ||= $arv.user.current[:uuid] - links = $arv.link.list(limit: 1, - filters: - [%w(link_class = resources), - %w(name = wants), - %w(tail_uuid =) + [@my_user_uuid], - %w(head_uuid =) + [wanted] - ])[:items] - if links.any? - debuglog "link already exists, uuid #{links.first[:uuid]}" + + debuglog "names: #{@instance[:name]} #{@template[:name]}", 0 + if (not @instance[:name].nil?) and (not @instance[:name].empty?) + pipeline_name = @instance[:name] + else + fetch_template(@instance[:pipeline_template_uuid]) + pipeline_name = @template[:name] + end + if c[:output_name] != false + # Create a collection located in the same project as the pipeline with the contents of the output. + portable_data_hash = c[:job][:output] + collections = $arv.collection.list(limit: 1, + filters: [['portable_data_hash', '=', portable_data_hash]], + select: ["portable_data_hash", "manifest_text"] + )[:items] + if collections.any? + name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}" + + # check if there is a name collision. + name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid], + ["name", "=", name]])[:items] + + newcollection_actual = nil + if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash + # There is already a collection with the same name and the + # same contents, so just point to that. + newcollection_actual = name_collisions.first + end + + if newcollection_actual.nil? + # Did not find a collection with the same name (or the + # collection has a different portable data hash) so create + # a new collection with ensure_unique_name: true. + newcollection = { + owner_uuid: owner_uuid, + name: name, + portable_data_hash: collections.first[:portable_data_hash], + manifest_text: collections.first[:manifest_text] + } + debuglog "Creating collection #{newcollection}", 0 + newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true + end + + c[:output_uuid] = newcollection_actual[:uuid] else - newlink = $arv.link.create link: \ - { - link_class: 'resources', - name: 'wants', - tail_kind: 'arvados#user', - tail_uuid: @my_user_uuid, - head_kind: 'arvados#collection', - head_uuid: wanted, - owner_uuid: owner_uuid - } - debuglog "added link, uuid #{newlink[:uuid]}" + debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0 end end end @@ -654,6 +683,10 @@ class WhRunPipelineInstance end end + if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state] + @instance[:finished_at] = Time.now + end + debuglog "pipeline instance state is #{@instance[:state]}" # set components_summary @@ -711,6 +744,19 @@ class WhRunPipelineInstance end end end + + def abort(msg) + if @instance + if ["New", "Ready", "RunningOnClient", + "RunningOnServer"].include?(@instance[:state]) + @instance[:state] = "Failed" + @instance[:finished_at] = Time.now + @instance.save + end + @instance.log_stderr(msg) + end + Kernel::abort(msg) + end end runner = WhRunPipelineInstance.new($options)