5 # arv-run-pipeline-instance --template pipeline-template-uuid [options] [--] [parameters]
6 # arv-run-pipeline-instance --instance pipeline-instance-uuid [options]
8 # Satisfy a pipeline template by finding or submitting a mapreduce job
9 # for each pipeline component.
13 # [--template uuid] Use the specified pipeline template.
15 # [--template path] Load the pipeline template from the specified
18 # [--instance uuid] Use the specified pipeline instance.
20 # [-n, --dry-run] Do not start any new jobs or wait for existing jobs
21 # to finish. Just find out whether jobs are finished,
22 # queued, or running for each component
24 # [--submit] Do not try to satisfy any components. Just
25 # create an instance, print its UUID to
28 # [--no-wait] Make only as much progress as possible without entering
31 # [--no-reuse] Do not reuse existing jobs to satisfy pipeline
32 # components. Submit a new job for every component.
34 # [--debug] Print extra debugging information on stderr.
36 # [--debug-level N] Increase amount of debugging information. Default
37 # 1, possible range 0..3.
39 # [--status-text path] Print plain text status report to a file or
40 # fifo. Default: /dev/stdout
42 # [--status-json path] Print JSON status report to a file or
43 # fifo. Default: /dev/null
47 # [param_name=param_value]
49 # [param_name param_value] Set (or override) the default value for
50 # every parameter with the given name.
52 # [component_name::param_name=param_value]
53 # [component_name::param_name param_value]
54 # [--component_name::param_name=param_value]
55 # [--component_name::param_name param_value] Set the value of a
56 # parameter for a single
59 class WhRunPipelineInstance
62 $application_version = 1.0
64 if RUBY_VERSION < '1.9.3' then
66 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
70 $arvados_api_version = ENV['ARVADOS_API_VERSION'] || 'v1'
71 $arvados_api_host = ENV['ARVADOS_API_HOST'] or
72 abort "#{$0}: fatal: ARVADOS_API_HOST environment variable not set."
73 $arvados_api_token = ENV['ARVADOS_API_TOKEN'] or
74 abort "#{$0}: fatal: ARVADOS_API_TOKEN environment variable not set."
82 require 'google/api_client'
86 #{$0}: fatal: #{l.message}
87 Some runtime dependencies may be missing.
88 Try: gem install arvados pp google-api-client json trollop
92 def debuglog(message, verbosity=1)
93 $stderr.puts "#{File.split($0).last} #{$$}: #{message}" if $debuglevel >= verbosity
98 original_verbosity = $VERBOSE
101 $VERBOSE = original_verbosity
106 if $arvados_api_host.match /local/
107 # You probably don't care about SSL certificate checks if you're
108 # testing with a dev server.
109 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
112 class Google::APIClient
113 def discovery_document(api, version)
115 return @discovery_documents["#{api}:#{version}"] ||=
117 response = self.execute!(
118 :http_method => :get,
119 :uri => self.discovery_uri(api, version),
120 :authenticated => false
122 response.body.class == String ? JSON.parse(response.body) : response.body
128 # Parse command line options (the kind that control the behavior of
129 # this program, that is, not the pipeline component parameters).
131 p = Trollop::Parser.new do
134 "Do not start any new jobs or wait for existing jobs to finish. Just find out whether jobs are finished, queued, or running for each component.",
138 "Store plain text status in given file.",
141 :default => '/dev/stdout')
143 "Store json-formatted pipeline in given file.",
146 :default => '/dev/null')
148 "Do not wait for jobs to finish. Just look up status, submit new jobs if needed, and exit.",
152 "Do not reuse existing jobs to satisfy pipeline components. Submit a new job for every component.",
156 "Print extra debugging information on stderr.",
159 "Set debug verbosity level.",
163 "UUID of pipeline template, or path to local pipeline template file.",
167 "UUID of pipeline instance.",
171 "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
175 "Manage the pipeline in process.",
180 $options = Trollop::with_standard_exception_handling p do
183 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
185 if $options[:instance]
186 if $options[:template] or $options[:submit]
187 abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
189 elsif not $options[:template]
190 puts "error: you must supply a --template or --instance."
195 if $options[:run_here] == $options[:submit]
196 abort "#{$0}: syntax error: you must supply either --run-here or --submit."
199 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
202 def suppress_warnings
203 original_verbosity = $VERBOSE
206 $VERBOSE = original_verbosity
211 if ENV['ARVADOS_API_HOST_INSECURE']
212 suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
215 # Set up the API client.
217 $client ||= Google::APIClient.
218 new(:host => $arvados_api_host,
219 :application_name => File.split($0).last,
220 :application_version => $application_version.to_s)
221 $arvados = $client.discovered_api('arvados', $arvados_api_version)
222 $arv = Arvados.new api_version: 'v1'
225 class PipelineInstance
227 result = $client.execute(:api_method => $arvados.pipeline_instances.get,
231 :authenticated => false,
233 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
235 j = JSON.parse result.body, :symbolize_names => true
236 unless j.is_a? Hash and j[:uuid]
237 debuglog "Failed to get pipeline_instance: #{j[:errors] rescue nil}", 0
240 debuglog "Retrieved pipeline_instance #{j[:uuid]}"
244 def self.create(attributes)
245 result = $client.execute(:api_method => $arvados.pipeline_instances.create,
247 :pipeline_instance => attributes.to_json
249 :authenticated => false,
251 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
253 j = JSON.parse result.body, :symbolize_names => true
254 unless j.is_a? Hash and j[:uuid]
255 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nFailed to create pipeline_instance: #{j[:errors] rescue nil} #{j.inspect}"
257 debuglog "Created pipeline instance: #{j[:uuid]}"
261 result = $client.execute(:api_method => $arvados.pipeline_instances.update,
266 :pipeline_instance => @attributes_to_update.to_json
268 :authenticated => false,
270 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
272 j = JSON.parse result.body, :symbolize_names => true
273 unless j.is_a? Hash and j[:uuid]
274 debuglog "Failed to save pipeline_instance: #{j[:errors] rescue nil}", 0
277 @attributes_to_update = {}
282 @attributes_to_update[x] = y
290 @attributes_to_update = {}
298 result = $client.execute(:api_method => $arvados.jobs.get,
302 :authenticated => false,
304 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
306 @cache[uuid] = JSON.parse result.body, :symbolize_names => true
308 def self.where(conditions)
309 result = $client.execute(:api_method => $arvados.jobs.list,
312 :where => conditions.to_json
314 :authenticated => false,
316 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
318 list = JSON.parse result.body, :symbolize_names => true
319 if list and list[:items].is_a? Array
325 def self.create(pipeline, component, job, create_params)
328 body = {job: no_nil_values(job)}.merge(no_nil_values(create_params))
330 result = $client.execute(:api_method => $arvados.jobs.create,
332 :authenticated => false,
334 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
336 j = JSON.parse result.body, :symbolize_names => true
337 if j.is_a? Hash and j[:uuid]
340 debuglog "create job: #{j[:errors] rescue nil} with attributes #{body}", 0
343 j[:errors].each do |err|
344 msg += "Error creating job for component #{component}: #{err}\n"
346 msg += "Job submission was: #{body.to_json}"
348 $client.execute(:api_method => $arvados.logs.create,
351 :object_uuid => pipeline[:uuid],
352 :event_type => 'stderr',
353 :owner_uuid => pipeline[:owner_uuid],
354 :properties => {"text" => msg}
357 :authenticated => false,
359 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
367 def self.no_nil_values(hash)
368 hash.reject { |key, value| value.nil? }
372 class WhRunPipelineInstance
373 attr_reader :instance
375 def initialize(_options)
379 def fetch_template(template)
380 if template.match /[^-0-9a-z]/
381 # Doesn't look like a uuid -- use it as a filename.
382 @template = JSON.parse File.read(template), :symbolize_names => true
383 if !@template[:components]
384 abort ("#{$0}: Template loaded from #{template} " +
385 "does not have a \"components\" key")
388 result = $client.execute(:api_method => $arvados.pipeline_templates.get,
392 :authenticated => false,
394 authorization: 'OAuth2 '+ENV['ARVADOS_API_TOKEN']
396 @template = JSON.parse result.body, :symbolize_names => true
398 abort "#{$0}: fatal: failed to retrieve pipeline template #{template} #{@template[:errors].inspect rescue nil}"
404 def fetch_instance(instance_uuid)
405 @instance = PipelineInstance.find(instance_uuid)
406 @template = @instance
410 def apply_parameters(params_args)
411 params_args.shift if params_args[0] == '--'
413 while !params_args.empty?
414 if (re = params_args[0].match /^(--)?([^-].*?)=(.+)/)
415 params[re[2]] = re[3]
417 elsif params_args.size > 1
418 param = params_args.shift.sub /^--/, ''
419 params[param] = params_args.shift
421 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nSyntax error: I do not know what to do with arg \"#{params_args[0]}\""
425 @components = @template[:components].dup
428 @components.each do |componentname, component|
429 component[:script_parameters].each do |parametername, parameter|
430 parameter = { :value => parameter } unless parameter.is_a? Hash
432 (params["#{componentname}::#{parametername}"] ||
434 (parameter[:output_of].nil? &&
435 (params[parametername.to_s] ||
436 parameter[:default])) ||
439 ![false,'false',0,'0'].index parameter[:required]
440 if parameter[:output_of]
443 errors << [componentname, parametername, "required parameter is missing"]
445 debuglog "parameter #{componentname}::#{parametername} == #{value}"
446 component[:script_parameters][parametername] = value
450 abort "\n#{Time.now} -- pipeline_template #{@template[:uuid]}\nErrors:\n#{errors.collect { |c,p,e| "#{c}::#{p} - #{e}\n" }.join ""}"
452 debuglog "options=" + @options.pretty_inspect
458 @instance[:properties][:run_options] ||= {}
459 if @options[:no_reuse]
460 # override properties of existing instance
461 @instance[:properties][:run_options][:enable_job_reuse] = false
463 # Default to "enable reuse" if not specified. (This code path
464 # can go away when old clients go away.)
465 if @instance[:properties][:run_options][:enable_job_reuse].nil?
466 @instance[:properties][:run_options][:enable_job_reuse] = true
470 @instance = PipelineInstance.
471 create(components: @components,
474 enable_job_reuse: !@options[:no_reuse]
477 pipeline_template_uuid: @template[:uuid],
478 state: ($options[:submit] ? 'RunningOnServer' : 'RunningOnClient'))
487 job_creation_failed = 0
490 @components.each do |cname, c|
492 owner_uuid = @instance[:owner_uuid]
493 # Is the job satisfying this component already known to be
494 # finished? (Already meaning "before we query API server about
495 # the job's current state")
496 c_already_finished = (c[:job] &&
498 !c[:job][:success].nil?)
500 c[:script_parameters].select { |pname, p| p.is_a? Hash and p[:output_of]}.empty?
501 # No job yet associated with this component and is component inputs
502 # are fully specified (any output_of script_parameters are resolved
504 job = JobCache.create(@instance, cname, {
505 :script => c[:script],
506 :script_parameters => c[:script_parameters],
507 :script_version => c[:script_version],
508 :repository => c[:repository],
509 :nondeterministic => c[:nondeterministic],
510 :output_is_persistent => c[:output_is_persistent] || false,
511 :runtime_constraints => c[:runtime_constraints],
512 :owner_uuid => owner_uuid,
514 # This is the right place to put these attributes when
515 # dealing with new API servers.
516 :minimum_script_version => c[:minimum_script_version],
517 :exclude_script_versions => c[:exclude_minimum_script_versions],
518 :find_or_create => (@instance[:properties][:run_options].andand[:enable_job_reuse] &&
519 !c[:nondeterministic]),
520 :filters => c[:filters]
523 debuglog "component #{cname} new job #{job[:uuid]}"
526 debuglog "component #{cname} new job failed", 0
527 job_creation_failed += 1
531 if c[:job] and c[:job][:uuid]
532 if (c[:job][:running] or
533 not (c[:job][:finished_at] or c[:job][:cancelled_at]))
534 # Job is running so update copy of job record
535 c[:job] = JobCache.get(c[:job][:uuid])
539 # Populate script_parameters of other components waiting for
541 @components.each do |c2name, c2|
542 c2[:script_parameters].each do |pname, p|
543 if p.is_a? Hash and p[:output_of] == cname.to_s
544 debuglog "parameter #{c2name}::#{pname} == #{c[:job][:output]}"
545 c2[:script_parameters][pname] = c[:job][:output]
550 unless c_already_finished
551 # This is my first time discovering that the job
552 # succeeded. (At the top of this loop, I was still
553 # waiting for it to finish.)
554 if c[:output_is_persistent]
555 # I need to make sure a resources/wants link is in
556 # place to protect the output from garbage
557 # collection. (Normally Crunch does this for me, but
558 # here I might be reusing the output of someone else's
559 # job and I need to make sure it's understood that the
560 # output is valuable to me, too.)
561 wanted = c[:job][:output]
562 debuglog "checking for existing persistence link for #{wanted}"
563 @my_user_uuid ||= $arv.user.current[:uuid]
564 links = $arv.link.list(limit: 1,
566 [%w(link_class = resources),
568 %w(tail_uuid =) + [@my_user_uuid],
569 %w(head_uuid =) + [wanted]
572 debuglog "link already exists, uuid #{links.first[:uuid]}"
574 newlink = $arv.link.create link: \
576 link_class: 'resources',
578 tail_kind: 'arvados#user',
579 tail_uuid: @my_user_uuid,
580 head_kind: 'arvados#collection',
582 owner_uuid: owner_uuid
584 debuglog "added link, uuid #{newlink[:uuid]}"
588 elsif c[:job][:running] ||
589 (!c[:job][:started_at] && !c[:job][:cancelled_at])
590 # Job is still running
592 elsif c[:job][:cancelled_at]
593 debuglog "component #{cname} job #{c[:job][:uuid]} cancelled."
597 @instance[:components] = @components
600 if @options[:no_wait]
604 # If job creation fails, just give up on this pipeline instance.
605 if job_creation_failed > 0
613 debuglog "interrupt", 0
623 @components.each do |cname, c|
625 if c[:job][:finished_at] or c[:job][:cancelled_at] or (c[:job][:running] == false and c[:job][:success] == false)
627 if c[:job][:success] == true
629 elsif c[:job][:success] == false or c[:job][:cancelled_at]
636 success = (succeeded == @components.length)
638 # A job create call failed. Just give up.
639 if job_creation_failed > 0
640 debuglog "job creation failed - giving up on this pipeline instance", 0
647 @instance[:state] = 'Complete'
649 @instance[:state] = 'Paused'
652 if ended == @components.length or failed > 0
653 @instance[:state] = success ? 'Complete' : 'Failed'
657 debuglog "pipeline instance state is #{@instance[:state]}"
659 # set components_summary
660 components_summary = {"todo" => @components.length - ended, "done" => succeeded, "failed" => failed}
661 @instance[:components_summary] = components_summary
667 if @instance and @instance[:state] == 'RunningOnClient'
668 @instance[:state] = 'Paused'
682 if @options[:status_json] != '/dev/null'
683 File.open(@options[:status_json], 'w') do |f|
684 f.puts @components.pretty_inspect
688 if @options[:status_text] != '/dev/null'
689 File.open(@options[:status_text], 'w') do |f|
691 f.puts "#{Time.now} -- pipeline_instance #{@instance[:uuid]}"
692 namewidth = @components.collect { |cname, c| cname.size }.max
693 @components.each do |cname, c|
694 jstatus = if !c[:job]
696 elsif c[:job][:running]
697 "#{c[:job][:tasks_summary].inspect}"
698 elsif c[:job][:success]
700 elsif c[:job][:cancelled_at]
701 "cancelled #{c[:job][:cancelled_at]}"
702 elsif c[:job][:finished_at]
703 "failed #{c[:job][:finished_at]}"
704 elsif c[:job][:started_at]
705 "started #{c[:job][:started_at]}"
707 "queued #{c[:job][:created_at]}"
709 f.puts "#{cname.to_s.ljust namewidth} #{c[:job] ? c[:job][:uuid] : '-'.ljust(27)} #{jstatus}"
716 runner = WhRunPipelineInstance.new($options)
718 if $options[:template]
719 runner.fetch_template($options[:template])
721 runner.fetch_instance($options[:instance])
723 runner.apply_parameters(p.leftovers)
724 runner.setup_instance
727 puts runner.instance[:uuid]
731 rescue Exception => e