5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
81 require 'google/api_client'
85 #{$0}: fatal: #{l.message}
86 Some runtime dependencies may be missing.
87 Try: gem install pp google-api-client json trollop
91 def debuglog(message, verbosity=1)
92 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
97 original_verbosity = $VERBOSE
100 $VERBOSE = original_verbosity
105 if $arvados_api_host.match /local/
106 # You probably don't care about SSL certificate checks if you're
107 # testing with a dev server.
108 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
111 class Google::APIClient
112 def discovery_document(api, version)
114 return @discovery_documents["#{api}:#{version}"] ||=
116 response = self.execute!(
117 :http_method => :get,
118 :uri => self.discovery_uri(api, version),
119 :authenticated => false
121 response.body.class == String ? JSON.parse(response.body) : response.body
127 # Parse command line options (the kind that control the behavior of
128 # this program, that is, not the pipeline component parameters).
130 p = Trollop::Parser.new do
133 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
137 "Store plain text status in given file.",
140 :default => '/dev/stdout')
142 "Store json-formatted pipeline in given file.",
145 :default => '/dev/null')
147 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
151 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
155 "Print extra debugging information on stderr.",
158 "Set debug verbosity level.",
162 "UUID of pipeline template, or path to local pipeline template file.",
166 "UUID of pipeline instance.",
170 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
174 "Manage the pipeline in process.",
179 $options = Trollop::with_standard_exception_handling p do
182 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
184 if $options[:instance]
185 if $options[:template] or $options[:submit]
186 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
188 elsif not $options[:template]
189 abort "#{$0}: syntax error: you must supply a --template or --instance."
192 if $options[:run_here] == $options[:submit]
193 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
196 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
199 def suppress_warnings
200 original_verbosity = $VERBOSE
203 $VERBOSE = original_verbosity
208 if ENV['ARVADOS_API_HOST_INSECURE']
209 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
212 # Set up the API client.
214 $client ||= Google::APIClient.
215 new(:host => $arvados_api_host,
216 :application_name => File.split($0).last,
217 :application_version => $application_version.to_s)
218 $arvados = $client.discovered_api('arvados', $arvados_api_version)
221 class PipelineInstance
223 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
228 :api_token => ENV['ARVADOS_API_TOKEN']
230 :authenticated => false)
231 j = JSON.parse result.body, :symbolize_names => true
232 unless j.is_a? Hash and j[:uuid]
233 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
236 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
240 def self.create(attributes)
241 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
243 :api_token => ENV['ARVADOS_API_TOKEN'],
244 :pipeline_instance => attributes
246 :authenticated => false)
247 j = JSON.parse result.body, :symbolize_names => true
248 unless j.is_a? Hash and j[:uuid]
249 abort "Failed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
251 debuglog "Created pipeline instance: #{j[:uuid]}"
255 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
260 :api_token => ENV['ARVADOS_API_TOKEN'],
261 :pipeline_instance => @attributes_to_update.to_json
263 :authenticated => false)
264 j = JSON.parse result.body, :symbolize_names => true
265 unless j.is_a? Hash and j[:uuid]
266 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
269 @attributes_to_update = {}
274 @attributes_to_update[x] = y
282 @attributes_to_update = {}
290 result = $client.execute(:api_method => $arvados.jobs.get,
292 :api_token => ENV['ARVADOS_API_TOKEN'],
295 :authenticated => false)
296 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
298 def self.where(conditions)
299 result = $client.execute(:api_method => $arvados.jobs.list,
301 :api_token => ENV['ARVADOS_API_TOKEN'],
303 :where => conditions.to_json
305 :authenticated => false)
306 list = JSON.parse result.body, :symbolize_names => true
307 if list and list[:items].is_a? Array
313 def self.create(attributes)
315 result = $client.execute(:api_method => $arvados.jobs.create,
317 :api_token => ENV['ARVADOS_API_TOKEN'],
318 :job => attributes.to_json
320 :authenticated => false)
321 j = JSON.parse result.body, :symbolize_names => true
322 if j.is_a? Hash and j[:uuid]
325 debuglog "create job: #{j[:errors] rescue nil} with attribute #{attributes}", 0
331 class WhRunPipelineInstance
332 attr_reader :instance
334 def initialize(_options)
338 def fetch_template(template)
339 if template.match /[^-0-9a-z]/
340 # Doesn't look like a uuid -- use it as a filename.
341 @template = JSON.parse File.read(template), :symbolize_names => true
342 if !@template[:components]
343 abort ("#{$0}: Template loaded from #{template} " +
344 "does not have a \"components\" key")
347 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
349 :api_token => ENV['ARVADOS_API_TOKEN'],
352 :authenticated => false)
353 @template = JSON.parse result.body, :symbolize_names => true
355 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
361 def fetch_instance(instance_uuid)
362 @instance = PipelineInstance.find(instance_uuid)
363 @template = @instance
367 def apply_parameters(params_args)
368 params_args.shift if params_args[0] == '--'
370 while !params_args.empty?
371 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
372 params[re[2]] = re[3]
374 elsif params_args.size > 1
375 param = params_args.shift.sub /^--/, ''
376 params[param] = params_args.shift
378 abort "Syntax error: I do not know what to do with arg \"#{params_args[0]}\""
382 @components = @template[:components].dup
385 @components.each do |componentname, component|
386 component[:script_parameters].each do |parametername, parameter|
387 parameter = { :value => parameter } unless parameter.is_a? Hash
389 (params["#{componentname}::#{parametername}"] ||
391 (parameter[:output_of].nil? &&
392 (params[parametername.to_s] ||
393 parameter[:default])) ||
396 ![false,'false',0,'0'].index parameter[:required]
397 if parameter[:output_of]
400 errors << [componentname, parametername, "required parameter is missing"]
402 debuglog "parameter #{componentname}::#{parametername} == #{value}"
403 component[:script_parameters][parametername] = value
407 abort "Errors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
409 debuglog "options=" + @options.pretty_inspect
414 @instance ||= PipelineInstance.
415 create(:components => @components,
416 :pipeline_template_uuid => @template[:uuid],
425 @components.each do |cname, c|
429 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
430 # No job yet associated with this component and is component inputs
431 # are fully specified (any output_of script_parameters are resolved
433 job = JobCache.create({:script => c[:script],
434 :script_parameters => c[:script_parameters],
435 :script_version => c[:script_version],
436 :minimum_script_version => c[:minimum_script_version],
437 :exclude_script_versions => c[:exclude_minimum_script_versions],
438 :nondeterministic => c[:nondeterministic],
439 :no_reuse => @options[:no_reuse]})
441 debuglog "component #{cname} new job #{job[:uuid]}"
444 debuglog "component #{cname} new job failed"
448 if c[:job] and c[:job][:uuid]
449 if (c[:job][:running] or
450 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
451 # Job is running so update copy of job record
452 c[:job] = JobCache.get(c[:job][:uuid])
456 # Populate script_parameters of other components waiting for
458 @components.each do |c2name, c2|
459 c2[:script_parameters].each do |pname, p|
460 if p.is_a? Hash and p[:output_of] == cname.to_s
461 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
462 c2[:script_parameters][pname] = c[:job][:output]
467 elsif c[:job][:running] ||
468 (!c[:job][:started_at] && !c[:job][:cancelled_at])
469 # Job is still running
471 elsif c[:job][:cancelled_at]
472 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
476 @instance[:components] = @components
477 @instance[:active] = moretodo
480 if @options[:no_wait]
488 debuglog "interrupt", 0
497 @components.each do |cname, c|
499 if c[:job][:finished_at]
501 if c[:job][:success] == true
504 if c[:job][:success] == false
511 if ended == @components.length or failed > 0
512 @instance[:active] = false
513 @instance[:success] = (succeeded == @components.length)
521 @instance[:active] = false
535 if @options[:status_json] != '/dev/null'
536 File.open(@options[:status_json], 'w') do |f|
537 f.puts @components.pretty_inspect
541 if @options[:status_text] != '/dev/null'
542 File.open(@options[:status_text], 'w') do |f|
544 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
545 namewidth = @components.collect { |cname, c| cname.size }.max
546 @components.each do |cname, c|
547 jstatus = if !c[:job]
549 elsif c[:job][:running]
550 "#{c[:job][:tasks_summary].inspect}"
551 elsif c[:job][:success]
553 elsif c[:job][:cancelled_at]
554 "cancelled #{c[:job][:cancelled_at]}"
555 elsif c[:job][:finished_at]
556 "failed #{c[:job][:finished_at]}"
557 elsif c[:job][:started_at]
558 "started #{c[:job][:started_at]}"
560 "queued #{c[:job][:created_at]}"
562 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
569 runner = WhRunPipelineInstance.new($options)
571 if $options[:template]
572 runner.fetch_template($options[:template])
574 runner.fetch_instance($options[:instance])
576 runner.apply_parameters(p.leftovers)
577 runner.setup_instance
580 puts runner.instance[:uuid]
584 rescue Exception => e