5 # wh-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # wh-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--create-instance-only] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 # pipeline components. Always submit a new job
33 # or use an existing job which has not yet
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 # components. Submit a new job for every component.
39 # [--debug] Print extra debugging information on stderr.
41 # [--debug-level N] Increase amount of debugging information. Default
42 # 1, possible range 0..3.
44 # [--status-text path] Print plain text status report to a file or
45 # fifo. Default: /dev/stdout
47 # [--status-json path] Print JSON status report to a file or
48 # fifo. Default: /dev/null
52 # [param_name=param_value]
54 # [param_name param_value] Set (or override) the default value for
55 # every parameter with the given name.
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 # parameter for a single
64 class WhRunPipelineInstance
67 $application_version = 1.0
69 if RUBY_VERSION < '1.9.3' then
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
83 require 'google/api_client'
89 #{$0}: fatal: some runtime dependencies are missing.
90 Try: gem install pp google-api-client json trollop
94 def debuglog(message, verbosity=1)
95 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
100 original_verbosity = $VERBOSE
103 $VERBOSE = original_verbosity
108 if $arvados_api_host.match /local/
109 # You probably don't care about SSL certificate checks if you're
110 # testing with a dev server.
111 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
114 class Google::APIClient
115 def discovery_document(api, version)
117 return @discovery_documents["#{api}:#{version}"] ||=
119 response = self.execute!(
120 :http_method => :get,
121 :uri => self.discovery_uri(api, version),
122 :authenticated => false
124 response.body.class == String ? JSON.parse(response.body) : response.body
130 # Parse command line options (the kind that control the behavior of
131 # this program, that is, not the pipeline component parameters).
133 p = Trollop::Parser.new do
135 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
139 "Store plain text status in given file.",
142 :default => '/dev/stdout')
144 "Store json-formatted pipeline in given file.",
147 :default => '/dev/null')
149 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 opt(:no_reuse_finished,
153 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
157 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
161 "Print extra debugging information on stderr.",
164 "Set debug verbosity level.",
168 "UUID of pipeline template, or path to local pipeline template file.",
172 "UUID of pipeline instance.",
175 opt(:create_instance_only,
176 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
181 $options = Trollop::with_standard_exception_handling p do
184 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
186 if $options[:instance]
187 if $options[:template] or $options[:create_instance_only]
188 abort "#{$0}: syntax error: --instance cannot be combined with --template or --create-instance-only."
190 elsif not $options[:template]
191 abort "#{$0}: syntax error: you must supply a --template or --instance."
194 # Set up the API client.
196 $client ||= Google::APIClient.
197 new(:host => $arvados_api_host,
198 :application_name => File.split($0).last,
199 :application_version => $application_version.to_s)
200 $arvados = $client.discovered_api('arvados', $arvados_api_version)
203 class PipelineInstance
205 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
210 :api_token => ENV['ARVADOS_API_TOKEN']
212 :authenticated => false)
213 j = JSON.parse result.body, :symbolize_names => true
214 unless j.is_a? Hash and j[:uuid]
215 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
218 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
222 def self.create(attributes)
223 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
225 :api_token => ENV['ARVADOS_API_TOKEN'],
226 :pipeline_instance => attributes
228 :authenticated => false)
229 j = JSON.parse result.body, :symbolize_names => true
230 unless j.is_a? Hash and j[:uuid]
231 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
233 debuglog "Created pipeline instance: #{j[:uuid]}"
237 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
242 :api_token => ENV['ARVADOS_API_TOKEN'],
243 :pipeline_instance => @attributes_to_update.to_json
245 :authenticated => false)
246 j = JSON.parse result.body, :symbolize_names => true
247 unless j.is_a? Hash and j[:uuid]
248 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
251 @attributes_to_update = {}
256 @attributes_to_update[x] = y
264 @attributes_to_update = {}
272 result = $client.execute(:api_method => $arvados.jobs.get,
274 :api_token => ENV['ARVADOS_API_TOKEN'],
277 :authenticated => false)
278 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
280 def self.where(conditions)
281 result = $client.execute(:api_method => $arvados.jobs.list,
283 :api_token => ENV['ARVADOS_API_TOKEN'],
285 :where => conditions.to_json
287 :authenticated => false)
288 list = JSON.parse result.body, :symbolize_names => true
289 if list and list[:items].is_a? Array
295 def self.create(attributes)
297 result = $client.execute(:api_method => $arvados.jobs.create,
299 :api_token => ENV['ARVADOS_API_TOKEN'],
300 :job => attributes.to_json
302 :authenticated => false)
303 j = JSON.parse result.body, :symbolize_names => true
304 if j.is_a? Hash and j[:uuid]
307 debuglog "create job: #{j[:errors] rescue nil}", 0
313 class WhRunPipelineInstance
314 attr_reader :instance
316 def initialize(_options)
320 def fetch_template(template)
321 if template.match /[^-0-9a-z]/
322 # Doesn't look like a uuid -- use it as a filename.
323 @template = JSON.parse File.read(template), :symbolize_names => true
324 if !@template[:components]
325 abort ("#{$0}: Template loaded from #{template} " +
326 "does not have a \"components\" key")
329 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
331 :api_token => ENV['ARVADOS_API_TOKEN'],
334 :authenticated => false)
335 @template = JSON.parse result.body, :symbolize_names => true
337 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
343 def fetch_instance(instance_uuid)
344 @instance = PipelineInstance.find(instance_uuid)
345 @template = @instance
349 def apply_parameters(params_args)
350 params_args.shift if params_args[0] == '--'
352 while !params_args.empty?
353 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
354 params[re[2]] = re[3]
356 elsif params_args.size > 1
357 param = params_args.shift.sub /^--/, ''
358 params[param] = params_args.shift
360 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
364 @components = @template[:components].dup
367 @components.each do |componentname, component|
368 component[:script_parameters].each do |parametername, parameter|
369 parameter = { :value => parameter } unless parameter.is_a? Hash
371 (params["#{componentname}::#{parametername}"] ||
373 (parameter[:output_of].nil? &&
374 (params[parametername.to_s] ||
375 parameter[:default])) ||
378 ![false,'false',0,'0'].index parameter[:required]
379 if parameter[:output_of]
382 errors << [componentname, parametername, "required parameter is missing"]
384 debuglog "parameter #{componentname}::#{parametername} == #{value}"
385 component[:script_parameters][parametername] = value
389 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
391 debuglog "options=" + @options.pretty_inspect
396 @instance ||= PipelineInstance.
397 create(:components => @components,
398 :pipeline_template_uuid => @template[:uuid],
407 @components.each do |cname, c|
410 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
411 # Job is fully specified (all parameter values are present) but
412 # no particular job has been found.
414 debuglog "component #{cname} ready to satisfy."
417 second_place_job = nil # satisfies component, but not finished yet
419 (@options[:no_reuse] ? [] : JobCache.
420 where(script: c[:script],
421 script_parameters: c[:script_parameters],
422 script_version_descends_from: c[:script_version])
423 ).each do |candidate_job|
424 candidate_params_downcase = Hash[candidate_job[:script_parameters].
425 map { |k,v| [k.downcase,v] }]
426 c_params_downcase = Hash[c[:script_parameters].
427 map { |k,v| [k.downcase,v] }]
429 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
431 unless candidate_params_downcase == c_params_downcase
435 if c[:script_version] !=
436 candidate_job[:script_version][0,c[:script_version].length]
437 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
441 unless candidate_job[:success] || candidate_job[:running] ||
442 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
443 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
447 if candidate_job[:success]
448 unless @options[:no_reuse_finished]
450 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
454 second_place_job ||= candidate_job
458 if not c[:job] and second_place_job
459 job = second_place_job
460 debuglog "component #{cname} satisfied by job #{job[:uuid]} version #{job[:script_version]}"
464 debuglog "component #{cname} not satisfied by any existing job."
465 if !@options[:dry_run]
466 debuglog "component #{cname} new job."
467 job = JobCache.create(:script => c[:script],
468 :script_parameters => c[:script_parameters],
469 :runtime_constraints => c[:runtime_constraints] || {},
470 :script_version => c[:script_version] || 'master')
472 debuglog "component #{cname} new job #{job[:uuid]}"
475 debuglog "component #{cname} new job failed"
482 if c[:job] and c[:job][:uuid]
483 if not c[:job][:finished_at] and not c[:job][:cancelled_at]
484 c[:job] = JobCache.get(c[:job][:uuid])
487 # Populate script_parameters of other components waiting for
489 @components.each do |c2name, c2|
490 c2[:script_parameters].each do |pname, p|
491 if p.is_a? Hash and p[:output_of] == cname.to_s
492 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
493 c2[:script_parameters][pname] = c[:job][:output]
497 elsif c[:job][:running] ||
498 (!c[:job][:started_at] && !c[:job][:cancelled_at])
499 moretodo ||= !@options[:no_wait]
500 elsif c[:job][:cancelled_at]
501 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
505 @instance[:components] = @components
506 @instance[:active] = moretodo
512 debuglog "interrupt", 0
517 @instance[:success] = @components.reject { |cname,c| c[:job] and c[:job][:success] }.empty?
523 @instance[:active] = false
537 if @options[:status_json] != '/dev/null'
538 File.open(@options[:status_json], 'w') do |f|
539 f.puts @components.pretty_inspect
543 if @options[:status_text] != '/dev/null'
544 File.open(@options[:status_text], 'w') do |f|
545 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
546 namewidth = @components.collect { |cname, c| cname.size }.max
547 @components.each do |cname, c|
548 jstatus = if !c[:job]
550 elsif c[:job][:running]
551 "#{c[:job][:tasks_summary].inspect}"
552 elsif c[:job][:success]
554 elsif c[:job][:cancelled_at]
555 "cancelled #{c[:job][:cancelled_at]}"
556 elsif c[:job][:finished_at]
557 "failed #{c[:job][:finished_at]}"
558 elsif c[:job][:started_at]
559 "started #{c[:job][:started_at]}"
561 "queued #{c[:job][:created_at]}"
563 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
570 runner = WhRunPipelineInstance.new($options)
572 if $options[:template]
573 runner.fetch_template($options[:template])
575 runner.fetch_instance($options[:instance])
577 runner.apply_parameters(p.leftovers)
578 runner.setup_instance
579 if $options[:create_instance_only]
581 puts runner.instance[:uuid]
585 rescue Exception => e