X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/922da69f42998b29355796e20e4dee0079d4113e..452baf1ec1b55c6c4613972ee5f6b5ebf28e8ed7:/sdk/cli/bin/arv-run-pipeline-instance diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance index f20acebbd0..c6fd61f9e3 100755 --- a/sdk/cli/bin/arv-run-pipeline-instance +++ b/sdk/cli/bin/arv-run-pipeline-instance @@ -42,6 +42,8 @@ # [--status-json path] Print JSON status report to a file or # fifo. Default: /dev/null # +# [--description] Description for the pipeline instance. +# # == Parameters # # [param_name=param_value] @@ -175,6 +177,10 @@ p = Trollop::Parser.new do "Manage the pipeline in process.", :short => :none, :type => :boolean) + opt(:description, + "Description for the pipeline instance.", + :short => :none, + :type => :string) stop_on [:'--'] end $options = Trollop::with_standard_exception_handling p do @@ -285,6 +291,16 @@ class PipelineInstance def [](x) @pi[x] end + + def log_stderr(msg) + $arv.log.create log: { + event_type: 'stderr', + object_uuid: self[:uuid], + owner_uuid: self[:owner_uuid], + properties: {"text" => msg}, + } + end + protected def initialize(j) @attributes_to_update = {} @@ -345,19 +361,7 @@ class JobCache end msg += "Job submission was: #{body.to_json}" - $client.execute(:api_method => $arvados.logs.create, - :body_object => { - :log => { - :object_uuid => pipeline[:uuid], - :event_type => 'stderr', - :owner_uuid => pipeline[:owner_uuid], - :properties => {"text" => msg} - } - }, - :authenticated => false, - :headers => { - authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN'] - }) + pipeline.log_stderr(msg) nil end end @@ -380,10 +384,6 @@ class WhRunPipelineInstance if template.match /[^-0-9a-z]/ # Doesn't look like a uuid -- use it as a filename. @template = JSON.parse File.read(template), :symbolize_names => true - if !@template[:components] - abort ("#{$0}: Template loaded from #{template} " + - "does not have a \"components\" key") - end else result = $client.execute(:api_method => $arvados.pipeline_templates.get, :parameters => { @@ -422,8 +422,25 @@ class WhRunPipelineInstance end end + if not @template[:components].is_a?(Hash) + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash" + end @components = @template[:components].dup + bad_components = @components.each_pair.select do |cname, cspec| + not cspec.is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}" + end + + bad_components = @components.each_pair.select do |cname, cspec| + not cspec[:script_parameters].is_a?(Hash) + end + if bad_components.any? + abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}" + end + errors = [] @components.each do |componentname, component| component[:script_parameters].each do |parametername, parameter| @@ -467,6 +484,8 @@ class WhRunPipelineInstance end end else + description = $options[:description] + description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description @instance = PipelineInstance. create(components: @components, properties: { @@ -475,6 +494,7 @@ class WhRunPipelineInstance } }, pipeline_template_uuid: @template[:uuid], + description: description, state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient')) end self @@ -484,6 +504,10 @@ class WhRunPipelineInstance moretodo = true interrupted = false + if @instance[:started_at].nil? + @instance[:started_at] = Time.now + end + job_creation_failed = 0 while moretodo moretodo = false @@ -507,7 +531,6 @@ class WhRunPipelineInstance :script_version => c[:script_version], :repository => c[:repository], :nondeterministic => c[:nondeterministic], - :output_is_persistent => c[:output_is_persistent] || false, :runtime_constraints => c[:runtime_constraints], :owner_uuid => owner_uuid, }, { @@ -551,37 +574,54 @@ class WhRunPipelineInstance # This is my first time discovering that the job # succeeded. (At the top of this loop, I was still # waiting for it to finish.) - if c[:output_is_persistent] - # I need to make sure a resources/wants link is in - # place to protect the output from garbage - # collection. (Normally Crunch does this for me, but - # here I might be reusing the output of someone else's - # job and I need to make sure it's understood that the - # output is valuable to me, too.) - wanted = c[:job][:output] - debuglog "checking for existing persistence link for #{wanted}" - @my_user_uuid ||= $arv.user.current[:uuid] - links = $arv.link.list(limit: 1, - filters: - [%w(link_class = resources), - %w(name = wants), - %w(tail_uuid =) + [@my_user_uuid], - %w(head_uuid =) + [wanted] - ])[:items] - if links.any? - debuglog "link already exists, uuid #{links.first[:uuid]}" + + debuglog "names: #{@instance[:name]} #{@template[:name]}", 0 + if (not @instance[:name].nil?) and (not @instance[:name].empty?) + pipeline_name = @instance[:name] + elsif @instance[:pipeline_template_uuid] + fetch_template(@instance[:pipeline_template_uuid]) + pipeline_name = @template[:name] + else + pipeline_name = "pipeline started #{@instance[:started_at]}" + end + if c[:output_name] != false + # Create a collection located in the same project as the pipeline with the contents of the output. + portable_data_hash = c[:job][:output] + collections = $arv.collection.list(limit: 1, + filters: [['portable_data_hash', '=', portable_data_hash]], + select: ["portable_data_hash", "manifest_text"] + )[:items] + if collections.any? + name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}" + + # check if there is a name collision. + name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid], + ["name", "=", name]])[:items] + + newcollection_actual = nil + if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash + # There is already a collection with the same name and the + # same contents, so just point to that. + newcollection_actual = name_collisions.first + end + + if newcollection_actual.nil? + # Did not find a collection with the same name (or the + # collection has a different portable data hash) so create + # a new collection with ensure_unique_name: true. + newcollection = { + owner_uuid: owner_uuid, + name: name, + portable_data_hash: collections.first[:portable_data_hash], + manifest_text: collections.first[:manifest_text] + } + debuglog "Creating collection #{newcollection}", 0 + newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true + end + + c[:output_uuid] = newcollection_actual[:uuid] else - newlink = $arv.link.create link: \ - { - link_class: 'resources', - name: 'wants', - tail_kind: 'arvados#user', - tail_uuid: @my_user_uuid, - head_kind: 'arvados#collection', - head_uuid: wanted, - owner_uuid: owner_uuid - } - debuglog "added link, uuid #{newlink[:uuid]}" + debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0 end end end @@ -654,6 +694,10 @@ class WhRunPipelineInstance end end + if @instance[:finished_at].nil? and ['Complete', 'Failed'].include? @instance[:state] + @instance[:finished_at] = Time.now + end + debuglog "pipeline instance state is #{@instance[:state]}" # set components_summary @@ -711,6 +755,19 @@ class WhRunPipelineInstance end end end + + def abort(msg) + if @instance + if ["New", "Ready", "RunningOnClient", + "RunningOnServer"].include?(@instance[:state]) + @instance[:state] = "Failed" + @instance[:finished_at] = Time.now + @instance.save + end + @instance.log_stderr(msg) + end + Kernel::abort(msg) + end end runner = WhRunPipelineInstance.new($options)