Merge branch 'master' into 3654-combine-selections
[arvados.git] / sdk / cli / bin / arv-run-pipeline-instance
index ab3702cafc02b86825c3739ff63901828def793d..7ce1fa95146dbfdb57069ea94cf364fb1a999d90 100755 (executable)
@@ -244,7 +244,7 @@ class PipelineInstance
   def self.create(attributes)
     result = $client.execute(:api_method => $arvados.pipeline_instances.create,
                              :body_object => {
-                               :pipeline_instance => attributes.to_json
+                               :pipeline_instance => attributes
                              },
                              :authenticated => false,
                              :headers => {
@@ -263,7 +263,7 @@ class PipelineInstance
                                :uuid => @pi[:uuid]
                              },
                              :body_object => {
-                               :pipeline_instance => @attributes_to_update.to_json
+                               :pipeline_instance => @attributes_to_update
                              },
                              :authenticated => false,
                              :headers => {
@@ -285,6 +285,16 @@ class PipelineInstance
   def [](x)
     @pi[x]
   end
+
+  def log_stderr(msg)
+    $arv.log.create log: {
+      event_type: 'stderr',
+      object_uuid: self[:uuid],
+      owner_uuid: self[:owner_uuid],
+      properties: {"text" => msg},
+    }
+  end
+
   protected
   def initialize(j)
     @attributes_to_update = {}
@@ -345,19 +355,7 @@ class JobCache
       end
       msg += "Job submission was: #{body.to_json}"
 
-      $client.execute(:api_method => $arvados.logs.create,
-                      :body_object => {
-                        :log => {
-                          :object_uuid => pipeline[:uuid],
-                          :event_type => 'stderr',
-                          :owner_uuid => pipeline[:owner_uuid],
-                          :properties => {"text" => msg}
-                        }
-                      },
-                      :authenticated => false,
-                      :headers => {
-                        authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
-                      })
+      pipeline.log_stderr(msg)
       nil
     end
   end
@@ -380,10 +378,6 @@ class WhRunPipelineInstance
     if template.match /[^-0-9a-z]/
       # Doesn't look like a uuid -- use it as a filename.
       @template = JSON.parse File.read(template), :symbolize_names => true
-      if !@template[:components]
-        abort ("#{$0}: Template loaded from #{template} " +
-               "does not have a \"components\" key")
-      end
     else
       result = $client.execute(:api_method => $arvados.pipeline_templates.get,
                                :parameters => {
@@ -422,8 +416,25 @@ class WhRunPipelineInstance
       end
     end
 
+    if not @template[:components].is_a?(Hash)
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Template missing \"components\" hash"
+    end
     @components = @template[:components].dup
 
+    bad_components = @components.each_pair.select do |cname, cspec|
+      not cspec.is_a?(Hash)
+    end
+    if bad_components.any?
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components not specified with hashes: #{bad_components.map(&:first).join(', ')}"
+    end
+
+    bad_components = @components.each_pair.select do |cname, cspec|
+      not cspec[:script_parameters].is_a?(Hash)
+    end
+    if bad_components.any?
+      abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: Components missing \"script_parameters\" hashes: #{bad_components.map(&:first).join(', ')}"
+    end
+
     errors = []
     @components.each do |componentname, component|
       component[:script_parameters].each do |parametername, parameter|
@@ -454,16 +465,28 @@ class WhRunPipelineInstance
   end
 
   def setup_instance
-    if $options[:submit]
-      @instance ||= PipelineInstance.
-        create(:components => @components,
-              :pipeline_template_uuid => @template[:uuid],
-              :state => 'New')
+    if @instance
+      @instance[:properties][:run_options] ||= {}
+      if @options[:no_reuse]
+        # override properties of existing instance
+        @instance[:properties][:run_options][:enable_job_reuse] = false
+      else
+        # Default to "enable reuse" if not specified. (This code path
+        # can go away when old clients go away.)
+        if @instance[:properties][:run_options][:enable_job_reuse].nil?
+          @instance[:properties][:run_options][:enable_job_reuse] = true
+        end
+      end
     else
-      @instance ||= PipelineInstance.
-        create(:components => @components,
-             :pipeline_template_uuid => @template[:uuid],
-             :state => 'RunningOnClient')
+      @instance = PipelineInstance.
+        create(components: @components,
+               properties: {
+                 run_options: {
+                   enable_job_reuse: !@options[:no_reuse]
+                 }
+               },
+               pipeline_template_uuid: @template[:uuid],
+               state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
     end
     self
   end
@@ -495,7 +518,6 @@ class WhRunPipelineInstance
             :script_version => c[:script_version],
             :repository => c[:repository],
             :nondeterministic => c[:nondeterministic],
-            :output_is_persistent => c[:output_is_persistent] || false,
             :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
           }, {
@@ -503,7 +525,8 @@ class WhRunPipelineInstance
             # dealing with new API servers.
             :minimum_script_version => c[:minimum_script_version],
             :exclude_script_versions => c[:exclude_minimum_script_versions],
-            :find_or_create => !(@options[:no_reuse] || c[:nondeterministic]),
+            :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
+                                !c[:nondeterministic]),
             :filters => c[:filters]
           })
           if job
@@ -538,37 +561,52 @@ class WhRunPipelineInstance
               # This is my first time discovering that the job
               # succeeded. (At the top of this loop, I was still
               # waiting for it to finish.)
-              if c[:output_is_persistent]
-                # I need to make sure a resources/wants link is in
-                # place to protect the output from garbage
-                # collection. (Normally Crunch does this for me, but
-                # here I might be reusing the output of someone else's
-                # job and I need to make sure it's understood that the
-                # output is valuable to me, too.)
-                wanted = c[:job][:output]
-                debuglog "checking for existing persistence link for #{wanted}"
-                @my_user_uuid ||= $arv.user.current[:uuid]
-                links = $arv.link.list(limit: 1,
-                                       filters:
-                                       [%w(link_class = resources),
-                                        %w(name = wants),
-                                        %w(tail_uuid =) + [@my_user_uuid],
-                                        %w(head_uuid =) + [wanted]
-                                       ])[:items]
-                if links.any?
-                  debuglog "link already exists, uuid #{links.first[:uuid]}"
+
+              debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
+              if (not @instance[:name].nil?) and (not @instance[:name].empty?)
+                pipeline_name = @instance[:name]
+              else
+                fetch_template(@instance[:pipeline_template_uuid])
+                pipeline_name = @template[:name]
+              end
+              if c[:output_name] != false
+                # Create a collection located in the same project as the pipeline with the contents of the output.
+                portable_data_hash = c[:job][:output]
+                collections = $arv.collection.list(limit: 1,
+                                                   filters: [['portable_data_hash', '=', portable_data_hash]],
+                                                   select: ["portable_data_hash", "manifest_text"]
+                                                   )[:items]
+                if collections.any?
+                  name = c[:output_name] || "Output #{portable_data_hash[0..7]} of #{cname} of #{pipeline_name}"
+
+                  # check if there is a name collision.
+                  name_collisions = $arv.collection.list(filters: [["owner_uuid", "=", owner_uuid],
+                                                                   ["name", "=", name]])[:items]
+
+                  newcollection_actual = nil
+                  if name_collisions.any? and name_collisions.first[:portable_data_hash] == portable_data_hash
+                    # There is already a collection with the same name and the
+                    # same contents, so just point to that.
+                    newcollection_actual = name_collisions.first
+                  end
+
+                  if newcollection_actual.nil?
+                    # Did not find a collection with the same name (or the
+                    # collection has a different portable data hash) so create
+                    # a new collection with ensure_unique_name: true.
+                    newcollection = {
+                      owner_uuid: owner_uuid,
+                      name: name,
+                      portable_data_hash: collections.first[:portable_data_hash],
+                      manifest_text: collections.first[:manifest_text]                      
+                    }
+                    debuglog "Creating collection #{newcollection}", 0
+                    newcollection_actual = $arv.collection.create collection: newcollection, ensure_unique_name: true
+                  end
+
+                  c[:output_uuid] = newcollection_actual[:uuid]
                 else
-                  newlink = $arv.link.create link: \
-                  {
-                    link_class: 'resources',
-                    name: 'wants',
-                    tail_kind: 'arvados#user',
-                    tail_uuid: @my_user_uuid,
-                    head_kind: 'arvados#collection',
-                    head_uuid: wanted,
-                    owner_uuid: owner_uuid
-                  }
-                  debuglog "added link, uuid #{newlink[:uuid]}"
+                  debuglog "Could not find a collection with portable data hash #{portable_data_hash}", 0
                 end
               end
             end
@@ -698,6 +736,18 @@ class WhRunPipelineInstance
       end
     end
   end
+
+  def abort(msg)
+    if @instance
+      if ["New", "Ready", "RunningOnClient",
+          "RunningOnServer"].include?(@instance[:state])
+        @instance[:state] = "Failed"
+        @instance.save
+      end
+      @instance.log_stderr(msg)
+    end
+    Kernel::abort(msg)
+  end
 end
 
 runner = WhRunPipelineInstance.new($options)