Unify processing of api/resumable/batch requests
[arvados.git] / lib / google / api_client / batch.rb
1 # Copyright 2012 Google Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 require 'addressable/uri'
16 require 'google/api_client/reference'
17 require 'uuidtools'
18
19 module Google
20   class APIClient
21
22     # Helper class to contain a response to an individual batched call.
23     class BatchedCallResponse
24       attr_reader :call_id
25       attr_accessor :status, :headers, :body
26
27       def initialize(call_id, status = nil, headers = nil, body = nil)
28         @call_id, @status, @headers, @body = call_id, status, headers, body
29       end
30     end
31     
32     ##
33     # Wraps multiple API calls into a single over-the-wire HTTP request.
34     class BatchRequest < Request
35       BATCH_BOUNDARY = "-----------RubyApiBatchRequest".freeze
36
37       attr_reader :calls, :callbacks
38
39       ##
40       # Creates a new batch request.
41       #
42       # @param [Hash] options
43       #   Set of options for this request, the only important one being
44       #   :connection, which specifies an HTTP connection to use.
45       # @param [Proc] block
46       #   Callback for every call's response. Won't be called if a call defined
47       #   a callback of its own.
48       #
49       # @return [Google::APIClient::BatchRequest] The constructed object.
50       def initialize(options = {}, &block)
51         @calls = []
52         @global_callback = block if block_given?
53         @last_auto_id = 0
54         
55         # TODO(sgomes): Use SecureRandom.uuid, drop UUIDTools when we drop 1.8
56         @base_id = UUIDTools::UUID.random_create.to_s
57
58         options[:uri] ||= 'https://www.googleapis.com/batch'
59         options[:http_method] ||= 'POST'
60
61         super options
62       end
63
64       ##
65       # Add a new call to the batch request.
66       # Each call must have its own call ID; if not provided, one will
67       # automatically be generated, avoiding collisions. If duplicate call IDs
68       # are provided, an error will be thrown.
69       #
70       # @param [Hash, Google::APIClient::Reference] call: the call to be added.
71       # @param [String] call_id: the ID to be used for this call. Must be unique
72       # @param [Proc] block: callback for this call's response.
73       #
74       # @return [Google::APIClient::BatchRequest] The BatchRequest, for chaining
75       def add(call, call_id = nil, &block)
76         unless call.kind_of?(Google::APIClient::Reference)
77           call = Google::APIClient::Reference.new(call)
78         end
79         call_id ||= new_id
80         if @calls.assoc(call_id)
81           raise BatchError,
82               'A call with this ID already exists: %s' % call_id
83         end
84         callback = block_given? ? block : @global_callback
85         @calls << [call_id, call, callback]        
86         return self
87       end
88
89       ##
90       # Processes the HTTP response to the batch request, issuing callbacks.
91       #
92       # @param [Faraday::Response] response: the HTTP response.
93       def process_response(response)
94         content_type = find_header('Content-Type', response.headers)
95         boundary = /.*boundary=(.+)/.match(content_type)[1]
96         parts = response.body.split(/--#{Regexp.escape(boundary)}/)
97         parts = parts[1...-1]
98         parts.each do |part|
99           call_response = deserialize_call_response(part)
100           _, call, callback = @calls.assoc(call_response.call_id)
101           result = Google::APIClient::Result.new(call, call_response)
102           callback.call(result) if callback
103         end
104       end
105
106       ##
107       # Return the request body for the BatchRequest's HTTP request.
108       #
109       # @return [String] The request body.
110       def to_http_request
111         if @calls.nil? || @calls.empty?
112           raise BatchError, 'Cannot make an empty batch request'
113         end
114         parts = @calls.map {|(call_id, call, callback)| serialize_call(call_id, call)}
115         build_multipart(parts, 'multipart/mixed', BATCH_BOUNDARY)
116         super
117       end
118       
119       
120       protected
121
122       ##
123       # Helper method to find a header from its name, regardless of case.
124       #
125       # @param [String] name: The name of the header to find.
126       # @param [Hash] headers: The hash of headers and their values.
127       #
128       # @return [String] The value of the desired header.
129       def find_header(name, headers)
130         _, header = headers.detect do |h, v|
131           h.downcase == name.downcase
132         end
133         return header
134       end
135
136       ##
137       # Create a new call ID. Uses an auto-incrementing, conflict-avoiding ID.
138       #
139       # @return [String] the new, unique ID.
140       def new_id
141         @last_auto_id += 1
142         while @calls.assoc(@last_auto_id)
143           @last_auto_id += 1
144         end
145         return @last_auto_id.to_s
146       end
147
148   
149
150       ##
151       # Convert a Content-ID header value to an id. Presumes the Content-ID
152       # header conforms to the format that id_to_header() returns.
153       #
154       # @param [String] header: Content-ID header value.
155       #
156       # @return [String] The extracted ID value.
157       def header_to_id(header)
158         if !header.start_with?('<') || !header.end_with?('>') ||
159             !header.include?('+')
160           raise BatchError, 'Invalid value for Content-ID: "%s"' % header
161         end
162
163         base, call_id = header[1...-1].split('+')
164         return Addressable::URI.unencode(call_id)
165       end
166
167       ##
168       # Auxiliary method to split the headers from the body in an HTTP response.
169       #
170       # @param [String] response: the response to parse.
171       #
172       # @return [Array<Hash>, String] The headers and the body, separately.
173       def split_headers_and_body(response)
174         headers = {}
175         payload = response.lstrip
176         while payload
177           line, payload = payload.split("\n", 2)
178           line.sub!(/\s+\z/, '')
179           break if line.empty?
180           match = /\A([^:]+):\s*/.match(line)
181           if match
182             headers[match[1]] = match.post_match
183           else
184             raise BatchError, 'Invalid header line in response: %s' % line
185           end
186         end
187         return headers, payload
188       end
189
190       ##
191       # Convert a single batched response into a BatchedCallResponse object.
192       #
193       # @param [Google::APIClient::Reference] response:
194       #   the request to deserialize.
195       #
196       # @return [BatchedCallResponse] The parsed and converted response.
197       def deserialize_call_response(call_response)
198         outer_headers, outer_body = split_headers_and_body(call_response)
199         status_line, payload = outer_body.split("\n", 2)
200         protocol, status, reason = status_line.split(' ', 3)
201
202         headers, body = split_headers_and_body(payload)
203         content_id = find_header('Content-ID', outer_headers)
204         call_id = header_to_id(content_id)
205         return BatchedCallResponse.new(call_id, status.to_i, headers, body)
206       end
207
208       ##
209       # Convert a single batched call into a string.
210       #
211       # @param [Google::APIClient::Reference] call: the call to serialize.
212       #
213       # @return [StringIO] The request as a string in application/http format.
214       def serialize_call(call_id, call)
215         http_request = call.to_http_request
216         body = "#{http_request.method.to_s.upcase} #{http_request.path} HTTP/1.1"
217         http_request.headers.each do |header, value|
218           body << "\r\n%s: %s" % [header, value]
219         end
220         if http_request.body
221           # TODO - CompositeIO if body is a stream 
222           body << "\r\n\r\n"
223           if http_request.body.respond_to?(:read)
224             body << http_request.body.read
225           else
226             body << http_request.body.to_s
227           end
228         end
229         Faraday::UploadIO.new(StringIO.new(body), 'application/http', 'ruby-api-request', 'Content-ID' => id_to_header(call_id))
230       end
231       
232       ##
233       # Convert an id to a Content-ID header value.
234       #
235       # @param [String] call_id: identifier of individual call.
236       #
237       # @return [String]
238       #   A Content-ID header with the call_id encoded into it. A UUID is
239       #   prepended to the value because Content-ID headers are supposed to be
240       #   universally unique.
241       def id_to_header(call_id)
242         return '<%s+%s>' % [@base_id, Addressable::URI.encode(call_id)]
243       end
244       
245     end
246   end
247 end