5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse-finished] Do not reuse existing outputs to satisfy
32 # pipeline components. Always submit a new job
33 # or use an existing job which has not yet
36 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
37 # components. Submit a new job for every component.
39 # [--debug] Print extra debugging information on stderr.
41 # [--debug-level N] Increase amount of debugging information. Default
42 # 1, possible range 0..3.
44 # [--status-text path] Print plain text status report to a file or
45 # fifo. Default: /dev/stdout
47 # [--status-json path] Print JSON status report to a file or
48 # fifo. Default: /dev/null
52 # [param_name=param_value]
54 # [param_name param_value] Set (or override) the default value for
55 # every parameter with the given name.
57 # [component_name::param_name=param_value]
58 # [component_name::param_name param_value]
59 # [--component_name::param_name=param_value]
60 # [--component_name::param_name param_value] Set the value of a
61 # parameter for a single
64 class WhRunPipelineInstance
67 $application_version = 1.0
69 if RUBY_VERSION < '1.9.3' then
71 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
75 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
76 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
77 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
78 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
79 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
86 require 'google/api_client'
90 #{$0}: fatal: #{l.message}
91 Some runtime dependencies may be missing.
92 Try: gem install pp google-api-client json trollop
96 def debuglog(message, verbosity=1)
97 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
101 def suppress_warnings
102 original_verbosity = $VERBOSE
105 $VERBOSE = original_verbosity
110 if $arvados_api_host.match /local/
111 # You probably don't care about SSL certificate checks if you're
112 # testing with a dev server.
113 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
116 class Google::APIClient
117 def discovery_document(api, version)
119 return @discovery_documents["#{api}:#{version}"] ||=
121 response = self.execute!(
122 :http_method => :get,
123 :uri => self.discovery_uri(api, version),
124 :authenticated => false
126 response.body.class == String ? JSON.parse(response.body) : response.body
132 # Parse command line options (the kind that control the behavior of
133 # this program, that is, not the pipeline component parameters).
135 p = Trollop::Parser.new do
138 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
142 "Store plain text status in given file.",
145 :default => '/dev/stdout')
147 "Store json-formatted pipeline in given file.",
150 :default => '/dev/null')
152 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
155 opt(:no_reuse_finished,
156 "Do not reuse existing outputs to satisfy pipeline components. Always submit a new job or use an existing job which has not yet finished.",
160 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
164 "Print extra debugging information on stderr.",
167 "Set debug verbosity level.",
171 "UUID of pipeline template, or path to local pipeline template file.",
175 "UUID of pipeline instance.",
179 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
183 "Manage the pipeline in process.",
188 $options = Trollop::with_standard_exception_handling p do
191 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
193 if $options[:instance]
194 if $options[:template] or $options[:submit]
195 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
197 elsif not $options[:template]
198 abort "#{$0}: syntax error: you must supply a --template or --instance."
201 if $options[:run_here] == $options[:submit]
202 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
205 # Set up the API client.
207 $client ||= Google::APIClient.
208 new(:host => $arvados_api_host,
209 :application_name => File.split($0).last,
210 :application_version => $application_version.to_s)
211 $arvados = $client.discovered_api('arvados', $arvados_api_version)
214 class PipelineInstance
216 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
221 :api_token => ENV['ARVADOS_API_TOKEN']
223 :authenticated => false)
224 j = JSON.parse result.body, :symbolize_names => true
225 unless j.is_a? Hash and j[:uuid]
226 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
229 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
233 def self.create(attributes)
234 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
236 :api_token => ENV['ARVADOS_API_TOKEN'],
237 :pipeline_instance => attributes
239 :authenticated => false)
240 j = JSON.parse result.body, :symbolize_names => true
241 unless j.is_a? Hash and j[:uuid]
242 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
244 debuglog "Created pipeline instance: #{j[:uuid]}"
248 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
253 :api_token => ENV['ARVADOS_API_TOKEN'],
254 :pipeline_instance => @attributes_to_update.to_json
256 :authenticated => false)
257 j = JSON.parse result.body, :symbolize_names => true
258 unless j.is_a? Hash and j[:uuid]
259 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
262 @attributes_to_update = {}
267 @attributes_to_update[x] = y
275 @attributes_to_update = {}
283 result = $client.execute(:api_method => $arvados.jobs.get,
285 :api_token => ENV['ARVADOS_API_TOKEN'],
288 :authenticated => false)
289 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
291 def self.where(conditions)
292 result = $client.execute(:api_method => $arvados.jobs.list,
294 :api_token => ENV['ARVADOS_API_TOKEN'],
296 :where => conditions.to_json
298 :authenticated => false)
299 list = JSON.parse result.body, :symbolize_names => true
300 if list and list[:items].is_a? Array
306 def self.create(attributes)
308 result = $client.execute(:api_method => $arvados.jobs.create,
310 :api_token => ENV['ARVADOS_API_TOKEN'],
311 :job => attributes.to_json
313 :authenticated => false)
314 j = JSON.parse result.body, :symbolize_names => true
315 if j.is_a? Hash and j[:uuid]
318 debuglog "create job: #{j[:errors] rescue nil}", 0
324 class WhRunPipelineInstance
325 attr_reader :instance
327 def initialize(_options)
331 def fetch_template(template)
332 if template.match /[^-0-9a-z]/
333 # Doesn't look like a uuid -- use it as a filename.
334 @template = JSON.parse File.read(template), :symbolize_names => true
335 if !@template[:components]
336 abort ("#{$0}: Template loaded from #{template} " +
337 "does not have a \"components\" key")
340 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
342 :api_token => ENV['ARVADOS_API_TOKEN'],
345 :authenticated => false)
346 @template = JSON.parse result.body, :symbolize_names => true
348 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
354 def fetch_instance(instance_uuid)
355 @instance = PipelineInstance.find(instance_uuid)
356 @template = @instance
360 def apply_parameters(params_args)
361 params_args.shift if params_args[0] == '--'
363 while !params_args.empty?
364 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
365 params[re[2]] = re[3]
367 elsif params_args.size > 1
368 param = params_args.shift.sub /^--/, ''
369 params[param] = params_args.shift
371 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
375 @components = @template[:components].dup
378 @components.each do |componentname, component|
379 component[:script_parameters].each do |parametername, parameter|
380 parameter = { :value => parameter } unless parameter.is_a? Hash
382 (params["#{componentname}::#{parametername}"] ||
384 (parameter[:output_of].nil? &&
385 (params[parametername.to_s] ||
386 parameter[:default])) ||
389 ![false,'false',0,'0'].index parameter[:required]
390 if parameter[:output_of]
393 errors << [componentname, parametername, "required parameter is missing"]
395 debuglog "parameter #{componentname}::#{parametername} == #{value}"
396 component[:script_parameters][parametername] = value
400 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
402 debuglog "options=" + @options.pretty_inspect
407 @instance ||= PipelineInstance.
408 create(:components => @components,
409 :pipeline_template_uuid => @template[:uuid],
418 @components.each do |cname, c|
421 c[:script_parameters].select { |pname, p| p.is_a? Hash }.empty?
422 # Job is fully specified (all parameter values are present) but
423 # no particular job has been found.
425 debuglog "component #{cname} ready to satisfy."
428 second_place_job = nil # satisfies component, but not finished yet
430 (@options[:no_reuse] ? [] : JobCache.
431 where(script: c[:script],
432 script_parameters: c[:script_parameters],
433 script_version_descends_from: c[:script_version])
434 ).each do |candidate_job|
435 candidate_params_downcase = Hash[candidate_job[:script_parameters].
436 map { |k,v| [k.downcase,v] }]
437 c_params_downcase = Hash[c[:script_parameters].
438 map { |k,v| [k.downcase,v] }]
440 debuglog "component #{cname} considering job #{candidate_job[:uuid]} version #{candidate_job[:script_version]} parameters #{candidate_params_downcase.inspect}", 3
442 unless candidate_params_downcase == c_params_downcase
446 if c[:script_version] !=
447 candidate_job[:script_version][0,c[:script_version].length]
448 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if script_version matched.", 2
452 unless candidate_job[:success] || candidate_job[:running] ||
453 (!candidate_job[:started_at] && !candidate_job[:cancelled_at])
454 debuglog "component #{cname} would be satisfied by job #{candidate_job[:uuid]} if it were running or successful.", 2
458 if candidate_job[:success]
459 unless @options[:no_reuse_finished]
461 $stderr.puts "using #{job[:uuid]} (finished at #{job[:finished_at]}) for component #{cname}"
465 second_place_job ||= candidate_job
469 if not c[:job] and second_place_job
470 job = second_place_job
471 $stderr.puts "using #{job[:uuid]} (running since #{job[:started_at]}) for component #{cname}"
475 debuglog "component #{cname} not satisfied by any existing job."
476 if !@options[:dry_run]
477 debuglog "component #{cname} new job."
478 job = JobCache.create(:script => c[:script],
479 :script_parameters => c[:script_parameters],
480 :runtime_constraints => c[:runtime_constraints] || {},
481 :script_version => c[:script_version] || 'master')
483 debuglog "component #{cname} new job #{job[:uuid]}"
486 debuglog "component #{cname} new job failed"
493 if c[:job] and c[:job][:uuid]
494 if (c[:job][:running] or
495 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
496 c[:job] = JobCache.get(c[:job][:uuid])
499 # Populate script_parameters of other components waiting for
501 @components.each do |c2name, c2|
502 c2[:script_parameters].each do |pname, p|
503 if p.is_a? Hash and p[:output_of] == cname.to_s
504 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
505 c2[:script_parameters][pname] = c[:job][:output]
509 elsif c[:job][:running] ||
510 (!c[:job][:started_at] && !c[:job][:cancelled_at])
512 elsif c[:job][:cancelled_at]
513 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
517 @instance[:components] = @components
518 @instance[:active] = moretodo
521 if @options[:no_wait]
529 debuglog "interrupt", 0
538 @components.each do |cname, c|
540 if c[:job][:finished_at]
542 if c[:job][:success] == true
549 if ended == @components.length
550 @instance[:active] = false
551 @instance[:success] = (succeeded == @components.length)
559 @instance[:active] = false
573 if @options[:status_json] != '/dev/null'
574 File.open(@options[:status_json], 'w') do |f|
575 f.puts @components.pretty_inspect
579 if @options[:status_text] != '/dev/null'
580 File.open(@options[:status_text], 'w') do |f|
582 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
583 namewidth = @components.collect { |cname, c| cname.size }.max
584 @components.each do |cname, c|
585 jstatus = if !c[:job]
587 elsif c[:job][:running]
588 "#{c[:job][:tasks_summary].inspect}"
589 elsif c[:job][:success]
591 elsif c[:job][:cancelled_at]
592 "cancelled #{c[:job][:cancelled_at]}"
593 elsif c[:job][:finished_at]
594 "failed #{c[:job][:finished_at]}"
595 elsif c[:job][:started_at]
596 "started #{c[:job][:started_at]}"
598 "queued #{c[:job][:created_at]}"
600 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
607 runner = WhRunPipelineInstance.new($options)
609 if $options[:template]
610 runner.fetch_template($options[:template])
612 runner.fetch_instance($options[:instance])
614 runner.apply_parameters(p.leftovers)
615 runner.setup_instance
618 puts runner.instance[:uuid]
622 rescue Exception => e