module Net::HTTP::Pipeline

An HTTP/1.1 pipelining implementation atop Net::HTTP. This library is compliant with RFC 2616 8.1.2.2.

Pipeline allows you to create a bunch of requests then send them all to an HTTP/1.1 server without waiting for responses. The server will return HTTP responses in-order.

Net::HTTP::Pipeline does not assume the server supports pipelining. If you know the server supports pipelining you can set #pipelining to true.

Example

require 'net/http/pipeline'

Net::HTTP.start 'localhost' do |http|
  requests = []
  requests << Net::HTTP::Get.new('/')
  requests << Net::HTTP::Get.new('/')
  requests << Net::HTTP::Get.new('/')

  http.pipeline requests do |res|
    puts res.code
    puts res.body[0..60].inspect
    puts
  end
end

Constants

VERSION

The version of net-http-pipeline you are using

Attributes

pipelining[RW]

Pipelining capability accessor.

Pipeline assumes servers do not support pipelining by default. The first request is not pipelined while Pipeline ensures that the server is HTTP/1.1 or newer and defaults to persistent connections.

If you know the server is HTTP/1.1 and defaults to persistent connections you can set this to true when you create the Net::HTTP object.

Public Instance Methods

idempotent?(req) click to toggle source

Is req idempotent according to RFC 2616?

# File lib/net/http/pipeline.rb, line 146
def idempotent? req
  case req
  when Net::HTTP::Delete, Net::HTTP::Get, Net::HTTP::Head,
       Net::HTTP::Options, Net::HTTP::Put, Net::HTTP::Trace then
    true
  end
end
pipeline(requests) { |response| ... } click to toggle source

Pipelines requests to the HTTP server yielding responses if a block is given. Returns all responses recieved.

The Net::HTTP connection must be started before calling pipeline.

Raises an exception if the connection is not pipeline-capable or if the HTTP session has not been started.

# File lib/net/http/pipeline.rb, line 163
def pipeline requests, &block # :yields: response
  responses = []

  raise Error.new('Net::HTTP not started', requests, responses) unless
    started?

  raise VersionError.new(requests, responses) if '1.1' > @curr_http_version

  pipeline_check requests, responses, &block

  retried = responses.length

  until requests.empty? do
    begin
      in_flight = pipeline_send requests

      pipeline_receive in_flight, responses, &block
    rescue Net::HTTP::Pipeline::ResponseError => e
      e.requests.reverse_each do |request|
        requests.unshift request
      end

      raise if responses.length == retried or not idempotent? requests.first

      retried = responses.length

      pipeline_reset requests, responses

      retry
    end
  end

  responses
end
pipeline_check(requests, responses) { |res| ... } click to toggle source

Ensures the connection supports pipelining.

If the server has not been tested for pipelining support one of the requests will be consumed and placed in responses.

A VersionError will be raised if the server is not HTTP/1.1 or newer.

A PersistenceError will be raised if the server does not support persistent connections.

A PipelineError will be raised if the it was previously determined that the server does not support pipelining.

# File lib/net/http/pipeline.rb, line 212
def pipeline_check requests, responses
  if instance_variable_defined? :@pipelining then
    return if @pipelining
    raise PipelineError.new(requests, responses) unless @pipelining
  else
    @pipelining = false
  end

  req = requests.shift
  retried = false

  begin
    res = request req
  rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
         Errno::EPIPE, Net::HTTPBadResponse, IOError => e
    if retried then
      requests.unshift req
      raise ResponseError.new(e, requests, responses)
    end

    retried = true

    pipeline_reset requests, responses

    retry
  end

  responses << res

  yield res if block_given?

  @pipelining = pipeline_keep_alive? res

  if '1.1' > @curr_http_version then
    @pipelining = false
    raise VersionError.new(requests, responses)
  elsif not @pipelining then
    raise PersistenceError.new(requests, responses)
  end
end
pipeline_end_transport(res) click to toggle source

Updates the HTTP version and ensures the connection has keep-alive.

# File lib/net/http/pipeline.rb, line 256
def pipeline_end_transport res
  @curr_http_version = res.http_version

  if @socket.closed? then
    D 'Conn socket closed on pipeline'
  elsif pipeline_keep_alive? res then
    D 'Conn pipeline keep-alive'
  else
    D 'Conn close on pipeline'
    @socket.close
  end
end
pipeline_finish() click to toggle source

Closes the connection and rescues any IOErrors this may cause

# File lib/net/http/pipeline.rb, line 272
def pipeline_finish
  finish
rescue IOError
end
pipeline_keep_alive?(res) click to toggle source

Checks for an connection close header

# File lib/net/http/pipeline.rb, line 281
def pipeline_keep_alive? res
  not res.connection_close?
end
pipeline_receive(in_flight, responses) { |res| ... } click to toggle source

Receives HTTP responses for in_flight requests and adds them to responses

# File lib/net/http/pipeline.rb, line 294
def pipeline_receive in_flight, responses
  while req = in_flight.shift do
    begin
      begin
        res = Net::HTTPResponse.read_new @socket
      end while res.kind_of? Net::HTTPContinue

      res.reading_body @socket, req.response_body_permitted? do
        responses << res
        yield res if block_given?
      end

      pipeline_end_transport res
    rescue StandardError, Timeout::Error
      in_flight.unshift req
      raise
    end
  end

  responses
rescue Timeout::Error, EOFError, Errno::ECONNABORTED, Errno::ECONNRESET,
       Errno::EPIPE, Net::HTTPBadResponse, IOError => e
  pipeline_finish

  raise ResponseError.new(e, in_flight, responses)
end
pipeline_reset(requests, responses) click to toggle source

Resets this connection

# File lib/net/http/pipeline.rb, line 324
def pipeline_reset requests, responses
  pipeline_finish

  start
rescue Errno::ECONNREFUSED
  raise Error.new("connection refused: #{address}:#{port}", requests,
                  responses)
rescue Errno::EHOSTDOWN
  raise Error.new("host down: #{address}:#{port}", requests, responses)
end
pipeline_send(requests) click to toggle source

Sends requests to the HTTP server and removes them from the requests list. Returns the requests that have been pipelined and are in-flight.

If a non-idempotent request is first in requests it will be sent and no further requests will be pipelined.

If a non-idempotent request is encountered after an idempotent request it will not be sent.

# File lib/net/http/pipeline.rb, line 345
def pipeline_send requests
  in_flight = []

  while req = requests.shift do
    idempotent = idempotent? req

    unless idempotent or in_flight.empty? then
      requests.unshift req
      break
    end

    begin_transport req
    req.exec @socket, @curr_http_version, edit_path(req.path)
    in_flight << req

    break unless idempotent
  end

  in_flight
end