# == Synopsis
#
# wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
+# wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
#
# Satisfy a pipeline template by finding or submitting a mapreduce job
# for each pipeline component.
#
# [--template uuid] Use the specified pipeline template.
#
+# [--instance uuid] Use the specified pipeline instance.
+#
# [-n, --dry-run] Do not start any new jobs or wait for existing jobs
# to finish. Just find out whether jobs are finished,
# queued, or running for each component
#
+# [--create-only] Do not try to satisfy any components. Just create an
+# instance, print its UUID to stdout, and exit.
+#
# [--no-wait] Make only as much progress as possible without entering
# a sleep/poll loop.
#
opt(:template,
"UUID of pipeline template.",
:short => :none,
- :type => :string,
- :required => true)
+ :type => :string)
+ opt(:instance,
+ "UUID of pipeline instance.",
+ :short => :none,
+ :type => :string)
+ opt(:create_only,
+ "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+ :short => :none,
+ :type => :boolean)
stop_on [:'--']
end
$options = Trollop::with_standard_exception_handling p do
end
$debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
+if $options[:instance]
+ if $options[:template] or $options[:create_only]
+ abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-only."
+ end
+elsif not $options[:template]
+ abort "#{$0}: syntax error: you must supply a --template or --instance."
+end
# Set up the API client.
class PipelineInstance
- def initialize(attributes)
- @attributes_to_update = {}
+ def self.find(uuid)
+ result = $client.execute(:api_method => $orvos.pipeline_instances.get,
+ :parameters => {
+ :api_token => ENV['ORVOS_API_TOKEN'],
+ :uuid => uuid
+ },
+ :authenticated => false)
+ j = JSON.parse result.body, :symbolize_names => true
+ unless j.is_a? Hash and j[:uuid]
+ debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
+ nil
+ else
+ debuglog "Retrieved pipeline_instance #{j[:uuid]}"
+ self.new(j)
+ end
+ end
+ def self.create(attributes)
result = $client.execute(:api_method => $orvos.pipeline_instances.create,
:parameters => {
:api_token => ENV['ORVOS_API_TOKEN'],
:authenticated => false)
j = JSON.parse result.body, :symbolize_names => true
unless j.is_a? Hash and j[:uuid]
- abort "Failed to create pipeline_instance: #{j[:errors] rescue nil}"
+ abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
end
debuglog "Created pipeline instance: #{j[:uuid]}"
- @pi = j
+ self.new(j)
end
def save
result = $client.execute(:api_method => $orvos.pipeline_instances.update,
def [](x)
@pi[x]
end
+ protected
+ def initialize(j)
+ @attributes_to_update = {}
+ @pi = j
+ end
end
class JobCache
end
class WhRunPipelineInstance
+ attr_reader :instance
+
def initialize(_options)
@options = _options
end
self
end
+ def fetch_instance(instance_uuid)
+ @instance = PipelineInstance.find(instance_uuid)
+ @template = @instance
+ self
+ end
+
def apply_parameters(params_args)
params_args.shift if params_args[0] == '--'
params = {}
self
end
+ def setup_instance
+ @instance ||= PipelineInstance.
+ create(:components => @components,
+ :pipeline_template_uuid => @template[:uuid],
+ :active => true)
+ self
+ end
+
def run
moretodo = true
while moretodo
moretodo = false
-
- @instance ||= PipelineInstance.new(:components => @components,
- :pipeline_template_uuid => @template[:uuid],
- :active => true)
@components.each do |cname, c|
job = nil
if !c[:job] and
debuglog "component #{cname} ready to satisfy."
c.delete :wait
+ second_place_job = nil # satisfies component, but not finished yet
JobCache.where(:script => c[:script],
:script_parameters => c[:script_parameters],
:script_version_descends_from => c[:script_version_descends_from]).
next
end
- job = candidate_job
+ if candidate_job[:success]
+ job = candidate_job
+ debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
+ c[:job] = job
+ else
+ second_place_job ||= candidate_job
+ end
+ break
+ end
+ if not c[:job] and second_place_job
+ job = second_place_job
debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
c[:job] = job
- break
end
if not c[:job]
debuglog "component #{cname} not satisfied by any existing job."
end
def cleanup
- @instance[:active] = false
- @instance.save
+ if @instance
+ @instance[:active] = false
+ @instance.save
+ end
+ end
+
+ def uuid
+ @instance[:uuid]
end
protected
c[:job][:output]
elsif c[:job][:cancelled_at]
"cancelled #{c[:job][:cancelled_at]}"
+ elsif c[:job][:finished_at]
+ "failed #{c[:job][:finished_at]}"
elsif c[:job][:started_at]
"started #{c[:job][:started_at]}"
else
end
end
-instance = WhRunPipelineInstance.new($options)
+runner = WhRunPipelineInstance.new($options)
begin
- instance.fetch_template($options[:template]).
- apply_parameters(p.leftovers).
- run
+ if $options[:template]
+ runner.fetch_template($options[:template])
+ else
+ runner.fetch_instance($options[:instance])
+ end
+ runner.apply_parameters(p.leftovers)
+ runner.setup_instance
+ if $options[:create_only]
+ runner.instance.save
+ puts runner.instance[:uuid]
+ else
+ runner.run
+ end
rescue Exception => e
- instance.cleanup
+ runner.cleanup
raise e
end