blob: a566afa0bc27e2364cd84944ee0c996e3388d054 [file] [log] [blame]
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
require_relative '../grpc'
require_relative 'active_call'
require_relative 'service'
require 'thread'
# GRPC contains the General RPC module.
module GRPC
# Pool is a simple thread pool.
class Pool
# Default keep alive period is 1s
DEFAULT_KEEP_ALIVE = 1
def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE)
fail 'pool size must be positive' unless size > 0
@jobs = Queue.new
@size = size
@stopped = false
@stop_mutex = Mutex.new # needs to be held when accessing @stopped
@stop_cond = ConditionVariable.new
@workers = []
@keep_alive = keep_alive
# Each worker thread has its own queue to push and pull jobs
# these queues are put into @ready_queues when that worker is idle
@ready_workers = Queue.new
end
# Returns the number of jobs waiting
def jobs_waiting
@jobs.size
end
def ready_for_work?
# Busy worker threads are either doing work, or have a single job
# waiting on them. Workers that are idle with no jobs waiting
# have their "queues" in @ready_workers
!@ready_workers.empty?
end
# Runs the given block on the queue with the provided args.
#
# @param args the args passed blk when it is called
# @param blk the block to call
def schedule(*args, &blk)
return if blk.nil?
@stop_mutex.synchronize do
if @stopped
GRPC.logger.warn('did not schedule job, already stopped')
return
end
GRPC.logger.info('schedule another job')
fail 'No worker threads available' if @ready_workers.empty?
worker_queue = @ready_workers.pop
fail 'worker already has a task waiting' unless worker_queue.empty?
worker_queue << [blk, args]
end
end
# Starts running the jobs in the thread pool.
def start
@stop_mutex.synchronize do
fail 'already stopped' if @stopped
end
until @workers.size == @size.to_i
new_worker_queue = Queue.new
@ready_workers << new_worker_queue
next_thread = Thread.new(new_worker_queue) do |jobs|
catch(:exit) do # allows { throw :exit } to kill a thread
loop_execute_jobs(jobs)
end
remove_current_thread
end
@workers << next_thread
end
end
# Stops the jobs in the pool
def stop
GRPC.logger.info('stopping, will wait for all the workers to exit')
@stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop
@stopped = true
loop do
break unless ready_for_work?
worker_queue = @ready_workers.pop
worker_queue << [proc { throw :exit }, []]
end
@stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0
end
forcibly_stop_workers
GRPC.logger.info('stopped, all workers are shutdown')
end
protected
# Forcibly shutdown any threads that are still alive.
def forcibly_stop_workers
return unless @workers.size > 0
GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)")
@workers.each do |t|
next unless t.alive?
begin
t.exit
rescue StandardError => e
GRPC.logger.warn('error while terminating a worker')
GRPC.logger.warn(e)
end
end
end
# removes the threads from workers, and signal when all the
# threads are complete.
def remove_current_thread
@stop_mutex.synchronize do
@workers.delete(Thread.current)
@stop_cond.signal if @workers.size.zero?
end
end
def loop_execute_jobs(worker_queue)
loop do
begin
blk, args = worker_queue.pop
blk.call(*args)
rescue StandardError, GRPC::Core::CallError => e
GRPC.logger.warn('Error in worker thread')
GRPC.logger.warn(e)
end
# there shouldn't be any work given to this thread while its busy
fail('received a task while busy') unless worker_queue.empty?
@stop_mutex.synchronize do
return if @stopped
@ready_workers << worker_queue
end
end
end
end
# RpcServer hosts a number of services and makes them available on the
# network.
class RpcServer
include Core::CallOps
include Core::TimeConsts
extend ::Forwardable
def_delegators :@server, :add_http2_port
# Default thread pool size is 30
DEFAULT_POOL_SIZE = 30
# Deprecated due to internal changes to the thread pool
DEFAULT_MAX_WAITING_REQUESTS = 20
# Default poll period is 1s
DEFAULT_POLL_PERIOD = 1
# Signal check period is 0.25s
SIGNAL_CHECK_PERIOD = 0.25
# setup_connect_md_proc is used by #initialize to validate the
# connect_md_proc.
def self.setup_connect_md_proc(a_proc)
return nil if a_proc.nil?
fail(TypeError, '!Proc') unless a_proc.is_a? Proc
a_proc
end
# Creates a new RpcServer.
#
# The RPC server is configured using keyword arguments.
#
# There are some specific keyword args used to configure the RpcServer
# instance.
#
# * pool_size: the size of the thread pool the server uses to run its
# threads. No more concurrent requests can be made than the size
# of the thread pool
#
# * max_waiting_requests: Deprecated due to internal changes to the thread
# pool. This is still an argument for compatibility but is ignored.
#
# * poll_period: The amount of time in seconds to wait for
# currently-serviced RPC's to finish before cancelling them when shutting
# down the server.
#
# * pool_keep_alive: The amount of time in seconds to wait
# for currently busy thread-pool threads to finish before
# forcing an abrupt exit to each thread.
#
# * connect_md_proc:
# when non-nil is a proc for determining metadata to send back the client
# on receiving an invocation req. The proc signature is:
# {key: val, ..} func(method_name, {key: val, ...})
#
# * server_args:
# A server arguments hash to be passed down to the underlying core server
#
# * interceptors:
# An array of GRPC::ServerInterceptor objects that will be used for
# intercepting server handlers to provide extra functionality.
# Interceptors are an EXPERIMENTAL API.
#
def initialize(pool_size: DEFAULT_POOL_SIZE,
max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS,
poll_period: DEFAULT_POLL_PERIOD,
pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE,
connect_md_proc: nil,
server_args: {},
interceptors: [])
@connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
@max_waiting_requests = max_waiting_requests
@poll_period = poll_period
@pool_size = pool_size
@pool = Pool.new(@pool_size, keep_alive: pool_keep_alive)
@run_cond = ConditionVariable.new
@run_mutex = Mutex.new
# running_state can take 4 values: :not_started, :running, :stopping, and
# :stopped. State transitions can only proceed in that order.
@running_state = :not_started
@server = Core::Server.new(server_args)
@interceptors = InterceptorRegistry.new(interceptors)
end
# stops a running server
#
# the call has no impact if the server is already stopped, otherwise
# server's current call loop is it's last.
def stop
# if called via run_till_terminated_or_interrupted,
# signal stop_server_thread and dont do anything
if @stop_server.nil? == false && @stop_server == false
@stop_server = true
@stop_server_cv.broadcast
return
end
@run_mutex.synchronize do
fail 'Cannot stop before starting' if @running_state == :not_started
return if @running_state != :running
transition_running_state(:stopping)
deadline = from_relative_time(@poll_period)
@server.shutdown_and_notify(deadline)
end
@pool.stop
end
def running_state
@run_mutex.synchronize do
return @running_state
end
end
# Can only be called while holding @run_mutex
def transition_running_state(target_state)
state_transitions = {
not_started: :running,
running: :stopping,
stopping: :stopped
}
if state_transitions[@running_state] == target_state
@running_state = target_state
else
fail "Bad server state transition: #{@running_state}->#{target_state}"
end
end
def running?
running_state == :running
end
def stopped?
running_state == :stopped
end
# Is called from other threads to wait for #run to start up the server.
#
# If run has not been called, this returns immediately.
#
# @param timeout [Numeric] number of seconds to wait
# @return [true, false] true if the server is running, false otherwise
def wait_till_running(timeout = nil)
@run_mutex.synchronize do
@run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
return @running_state == :running
end
end
# handle registration of classes
#
# service is either a class that includes GRPC::GenericService and whose
# #new function can be called without argument or any instance of such a
# class.
#
# E.g, after
#
# class Divider
# include GRPC::GenericService
# rpc :div DivArgs, DivReply # single request, single response
# def initialize(optional_arg='default option') # no args
# ...
# end
#
# srv = GRPC::RpcServer.new(...)
#
# # Either of these works
#
# srv.handle(Divider)
#
# # or
#
# srv.handle(Divider.new('replace optional arg'))
#
# It raises RuntimeError:
# - if service is not valid service class or object
# - its handler methods are already registered
# - if the server is already running
#
# @param service [Object|Class] a service class or object as described
# above
def handle(service)
@run_mutex.synchronize do
unless @running_state == :not_started
fail 'cannot add services if the server has been started'
end
cls = service.is_a?(Class) ? service : service.class
assert_valid_service_class(cls)
add_rpc_descs_for(service)
end
end
# runs the server
#
# - if no rpc_descs are registered, this exits immediately, otherwise it
# continues running permanently and does not return until program exit.
#
# - #running? returns true after this is called, until #stop cause the
# the server to stop.
def run
@run_mutex.synchronize do
fail 'cannot run without registering services' if rpc_descs.size.zero?
@pool.start
@server.start
transition_running_state(:running)
@run_cond.broadcast
end
loop_handle_server_calls
end
alias_method :run_till_terminated, :run
# runs the server with signal handlers
# @param signals
# List of String, Integer or both representing signals that the user
# would like to send to the server for graceful shutdown
# @param wait_interval (optional)
# Integer seconds that user would like stop_server_thread to poll
# stop_server
def run_till_terminated_or_interrupted(signals, wait_interval = 60)
@stop_server = false
@stop_server_mu = Mutex.new
@stop_server_cv = ConditionVariable.new
@stop_server_thread = Thread.new do
loop do
break if @stop_server
@stop_server_mu.synchronize do
@stop_server_cv.wait(@stop_server_mu, wait_interval)
end
end
# stop is surrounded by mutex, should handle multiple calls to stop
# correctly
stop
end
valid_signals = Signal.list
# register signal handlers
signals.each do |sig|
# input validation
if sig.class == String
sig.upcase!
if sig.start_with?('SIG')
# cut out the SIG prefix to see if valid signal
sig = sig[3..-1]
end
end
# register signal traps for all valid signals
if valid_signals.value?(sig) || valid_signals.key?(sig)
Signal.trap(sig) do
@stop_server = true
@stop_server_cv.broadcast
end
else
fail "#{sig} not a valid signal"
end
end
run
@stop_server_thread.join
end
# Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs
def available?(an_rpc)
return an_rpc if @pool.ready_for_work?
GRPC.logger.warn('no free worker threads currently')
noop = proc { |x| x }
# Create a new active call that knows that metadata hasn't been
# sent yet
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true, started: false)
c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED,
'No free threads in thread pool')
nil
end
# Sends UNIMPLEMENTED if the method is not implemented by this server
def implemented?(an_rpc)
mth = an_rpc.method.to_sym
return an_rpc if rpc_descs.key?(mth)
GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
noop = proc { |x| x }
# Create a new active call that knows that
# metadata hasn't been sent yet
c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline,
metadata_received: true, started: false)
c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '')
nil
end
# handles calls to the server
def loop_handle_server_calls
fail 'not started' if running_state == :not_started
while running_state == :running
begin
an_rpc = @server.request_call
break if (!an_rpc.nil?) && an_rpc.call.nil?
active_call = new_active_server_call(an_rpc)
unless active_call.nil?
@pool.schedule(active_call) do |ac|
c, mth = ac
begin
rpc_descs[mth].run_server_method(
c,
rpc_handlers[mth],
@interceptors.build_context
)
rescue StandardError
c.send_status(GRPC::Core::StatusCodes::INTERNAL,
'Server handler failed')
end
end
end
rescue Core::CallError, RuntimeError => e
# these might happen for various reasons. The correct behavior of
# the server is to log them and continue, if it's not shutting down.
if running_state == :running
GRPC.logger.warn("server call failed: #{e}")
end
next
end
end
# @running_state should be :stopping here
@run_mutex.synchronize do
transition_running_state(:stopped)
GRPC.logger.info("stopped: #{self}")
@server.close
end
end
def new_active_server_call(an_rpc)
return nil if an_rpc.nil? || an_rpc.call.nil?
# allow the metadata to be accessed from the call
an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers
connect_md = nil
unless @connect_md_proc.nil?
connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
end
return nil unless available?(an_rpc)
return nil unless implemented?(an_rpc)
# Create the ActiveCall. Indicate that metadata hasnt been sent yet.
GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
rpc_desc = rpc_descs[an_rpc.method.to_sym]
c = ActiveCall.new(an_rpc.call,
rpc_desc.marshal_proc,
rpc_desc.unmarshal_proc(:input),
an_rpc.deadline,
metadata_received: true,
started: false,
metadata_to_send: connect_md)
c.attach_peer_cert(an_rpc.call.peer_cert)
mth = an_rpc.method.to_sym
[c, mth]
end
protected
def rpc_descs
@rpc_descs ||= {}
end
def rpc_handlers
@rpc_handlers ||= {}
end
def assert_valid_service_class(cls)
unless cls.include?(GenericService)
fail "#{cls} must 'include GenericService'"
end
fail "#{cls} should specify some rpc descriptions" if
cls.rpc_descs.size.zero?
end
# This should be called while holding @run_mutex
def add_rpc_descs_for(service)
cls = service.is_a?(Class) ? service : service.class
specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
cls.rpc_descs.each_pair do |name, spec|
route = "/#{cls.service_name}/#{name}".to_sym
fail "already registered: rpc #{route} from #{spec}" if specs.key? route
specs[route] = spec
rpc_name = GenericService.underscore(name.to_s).to_sym
if service.is_a?(Class)
handlers[route] = cls.new.method(rpc_name)
else
handlers[route] = service.method(rpc_name)
end
GRPC.logger.info("handling #{route} with #{handlers[route]}")
end
end
end
end