3550: Merge branch 'master' into 3550-local-pipeline
authorTom Clegg <tom@curoverse.com>
Wed, 17 Sep 2014 22:00:25 +0000 (18:00 -0400)
committerTom Clegg <tom@curoverse.com>
Wed, 17 Sep 2014 22:00:25 +0000 (18:00 -0400)
Conflicts:
sdk/cli/bin/arv-run-pipeline-instance

1  2 
sdk/cli/bin/arv-run-pipeline-instance

index dee57b059ebdd231d77775025644d59940fa611c,05fce2c9da36d0aae52cb178131d96a0befd4a81..472c20bd73a283feb3149c44b6b3f534d3ed50bb
@@@ -42,6 -42,8 +42,8 @@@
  # [--status-json path] Print JSON status report to a file or
  #                      fifo. Default: /dev/null
  #
+ # [--description] Description for the pipeline instance.
+ #
  # == Parameters
  #
  # [param_name=param_value]
@@@ -59,6 -61,8 +61,6 @@@
  class WhRunPipelineInstance
  end
  
 -$application_version = 1.0
 -
  if RUBY_VERSION < '1.9.3' then
    abort <<-EOS
  #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
@@@ -107,6 -111,21 +109,6 @@@ if $arvados_api_host.match /local
    suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
  end
  
 -class Google::APIClient
 -  def discovery_document(api, version)
 -    api = api.to_s
 -    return @discovery_documents["#{api}:#{version}"] ||=
 -      begin
 -        response = self.execute!(
 -                                 :http_method => :get,
 -                                 :uri => self.discovery_uri(api, version),
 -                                 :authenticated => false
 -                                 )
 -        response.body.class == String ? JSON.parse(response.body) : response.body
 -      end
 -  end
 -end
 -
  
  # Parse command line options (the kind that control the behavior of
  # this program, that is, not the pipeline component parameters).
@@@ -151,21 -170,17 +153,25 @@@ p = Trollop::Parser.new d
        :short => :none,
        :type => :string)
    opt(:submit,
 -      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
 +      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service satisfy the components by finding/running jobs.",
 +      :short => :none,
 +      :type => :boolean)
 +  opt(:run_pipeline_here,
 +      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
 +      :short => :none,
 +      :type => :boolean)
 +  opt(:run_jobs_here,
 +      "Run jobs in the local terminal session instead of submitting them to Crunch. Implies --run-pipeline-here. Note: this results in a significantly different job execution environment, and some Crunch features are not supported. It can be necessary to modify a pipeline in order to make it run this way.",
        :short => :none,
        :type => :boolean)
    opt(:run_here,
 -      "Manage the pipeline in process.",
 +      "Synonym for --run-jobs-here.",
        :short => :none,
        :type => :boolean)
+   opt(:description,
+       "Description for the pipeline instance.",
+       :short => :none,
+       :type => :string)
    stop_on [:'--']
  end
  $options = Trollop::with_standard_exception_handling p do
  end
  $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
  
 +$options[:run_jobs_here] ||= $options[:run_here] # old flag name
 +$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
 +
  if $options[:instance]
    if $options[:template] or $options[:submit]
      abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
@@@ -186,8 -198,8 +192,8 @@@ elsif not $options[:template
    abort
  end
  
 -if $options[:run_here] == $options[:submit]
 -  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
 +if $options[:run_pipeline_here] == $options[:submit]
 +  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
  end
  
  # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
@@@ -208,9 -220,13 +214,9 @@@ en
  
  # Set up the API client.
  
 -$client ||= Google::APIClient.
 -  new(:host => $arvados_api_host,
 -      :application_name => File.split($0).last,
 -      :application_version => $application_version.to_s)
 -$arvados = $client.discovered_api('arvados', $arvados_api_version)
  $arv = Arvados.new api_version: 'v1'
 -
 +$client = $arv.client
 +$arvados = $arv.arvados_api
  
  class PipelineInstance
    def self.find(uuid)
@@@ -468,6 -484,8 +474,8 @@@ class WhRunPipelineInstanc
          end
        end
      else
+       description = $options[:description]
+       description = ("Created at #{Time.now.localtime}" + (@template[:name].andand.size.andand>0 ? " using the pipeline template *#{@template[:name]}*" : "")) if !description
        @instance = PipelineInstance.
          create(components: @components,
                 properties: {
                   }
                 },
                 pipeline_template_uuid: @template[:uuid],
+                description: description,
                 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
      end
      self
            # No job yet associated with this component and is component inputs
            # are fully specified (any output_of script_parameters are resolved
            # to real value)
 +          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
            job = JobCache.create(@instance, cname, {
              :script => c[:script],
              :script_parameters => c[:script_parameters],
              :nondeterministic => c[:nondeterministic],
              :runtime_constraints => c[:runtime_constraints],
              :owner_uuid => owner_uuid,
 +            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
 +            :submit_id => my_submit_id,
            }, {
              # This is the right place to put these attributes when
              # dealing with new API servers.
            if job
              debuglog "component #{cname} new job #{job[:uuid]}"
              c[:job] = job
 +            c[:run_in_process] = (@options[:run_jobs_here] and
 +                                  job[:submit_id] == my_submit_id)
            else
              debuglog "component #{cname} new job failed", 0
              job_creation_failed += 1
            end
          end
  
 +        if c[:job] and c[:run_in_process] and c[:job][:success].nil?
 +          report_status
 +          begin
 +            require 'open3'
 +            Open3.popen3("arv-crunch-job", "--force-unlock",
 +                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
 +              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
 +              stdin.close
 +              while true
 +                rready, wready, = IO.select([stdout, stderr], [])
 +                break if !rready[0]
 +                begin
 +                  buf = rready[0].read_nonblock(2**20)
 +                rescue EOFError
 +                  break
 +                end
 +                (rready[0] == stdout ? $stdout : $stderr).write(buf)
 +              end
 +              stdout.close
 +              stderr.close
 +              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
 +            end
 +            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
 +              raise Exception.new("arv-crunch-job did not set finished_at.")
 +            end
 +          rescue Exception => e
 +            debuglog "Interrupted (#{e}). Failing job.", 0
 +            $arv.job.update(uuid: c[:job][:uuid],
 +                            job: {
 +                              finished_at: Time.now,
 +                              running: false,
 +                              success: false
 +                            })
 +          end
 +        end
 +
          if c[:job] and c[:job][:uuid]
            if (c[:job][:running] or
                not (c[:job][:finished_at] or c[:job][:cancelled_at]))
                # succeeded. (At the top of this loop, I was still
                # waiting for it to finish.)
  
 -              debuglog "names: #{@instance[:name]} #{@template[:name]}", 0
 -              if (not @instance[:name].nil?) and (not @instance[:name].empty?)
 +              if @instance[:name].andand.length.andand > 0
                  pipeline_name = @instance[:name]
 -              elsif @instance[:pipeline_template_uuid]
 -                fetch_template(@instance[:pipeline_template_uuid])
 +              elsif @template.andand[:name].andand.length.andand > 0
                  pipeline_name = @template[:name]
                else
 -                pipeline_name = "pipeline started #{@instance[:started_at]}"
 +                pipeline_name = @instance[:uuid]
                end
                if c[:output_name] != false
                  # Create a collection located in the same project as the pipeline with the contents of the output.
                        "failed #{c[:job][:finished_at]}"
                      elsif c[:job][:started_at]
                        "started #{c[:job][:started_at]}"
 +                    elsif c[:job][:is_locked_by_uuid]
 +                      "starting #{c[:job][:started_at]}"
                      else
                        "queued #{c[:job][:created_at]}"
                      end