| # Copyright 2015 gRPC authors. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| require_relative '../grpc' |
| require_relative 'active_call' |
| require_relative 'service' |
| require 'thread' |
| |
| # GRPC contains the General RPC module. |
| module GRPC |
| # Pool is a simple thread pool. |
| class Pool |
| # Default keep alive period is 1s |
| DEFAULT_KEEP_ALIVE = 1 |
| |
| def initialize(size, keep_alive: DEFAULT_KEEP_ALIVE) |
| fail 'pool size must be positive' unless size > 0 |
| @jobs = Queue.new |
| @size = size |
| @stopped = false |
| @stop_mutex = Mutex.new # needs to be held when accessing @stopped |
| @stop_cond = ConditionVariable.new |
| @workers = [] |
| @keep_alive = keep_alive |
| |
| # Each worker thread has its own queue to push and pull jobs |
| # these queues are put into @ready_queues when that worker is idle |
| @ready_workers = Queue.new |
| end |
| |
| # Returns the number of jobs waiting |
| def jobs_waiting |
| @jobs.size |
| end |
| |
| def ready_for_work? |
| # Busy worker threads are either doing work, or have a single job |
| # waiting on them. Workers that are idle with no jobs waiting |
| # have their "queues" in @ready_workers |
| !@ready_workers.empty? |
| end |
| |
| # Runs the given block on the queue with the provided args. |
| # |
| # @param args the args passed blk when it is called |
| # @param blk the block to call |
| def schedule(*args, &blk) |
| return if blk.nil? |
| @stop_mutex.synchronize do |
| if @stopped |
| GRPC.logger.warn('did not schedule job, already stopped') |
| return |
| end |
| GRPC.logger.info('schedule another job') |
| fail 'No worker threads available' if @ready_workers.empty? |
| worker_queue = @ready_workers.pop |
| |
| fail 'worker already has a task waiting' unless worker_queue.empty? |
| worker_queue << [blk, args] |
| end |
| end |
| |
| # Starts running the jobs in the thread pool. |
| def start |
| @stop_mutex.synchronize do |
| fail 'already stopped' if @stopped |
| end |
| until @workers.size == @size.to_i |
| new_worker_queue = Queue.new |
| @ready_workers << new_worker_queue |
| next_thread = Thread.new(new_worker_queue) do |jobs| |
| catch(:exit) do # allows { throw :exit } to kill a thread |
| loop_execute_jobs(jobs) |
| end |
| remove_current_thread |
| end |
| @workers << next_thread |
| end |
| end |
| |
| # Stops the jobs in the pool |
| def stop |
| GRPC.logger.info('stopping, will wait for all the workers to exit') |
| @stop_mutex.synchronize do # wait @keep_alive seconds for workers to stop |
| @stopped = true |
| loop do |
| break unless ready_for_work? |
| worker_queue = @ready_workers.pop |
| worker_queue << [proc { throw :exit }, []] |
| end |
| @stop_cond.wait(@stop_mutex, @keep_alive) if @workers.size > 0 |
| end |
| forcibly_stop_workers |
| GRPC.logger.info('stopped, all workers are shutdown') |
| end |
| |
| protected |
| |
| # Forcibly shutdown any threads that are still alive. |
| def forcibly_stop_workers |
| return unless @workers.size > 0 |
| GRPC.logger.info("forcibly terminating #{@workers.size} worker(s)") |
| @workers.each do |t| |
| next unless t.alive? |
| begin |
| t.exit |
| rescue StandardError => e |
| GRPC.logger.warn('error while terminating a worker') |
| GRPC.logger.warn(e) |
| end |
| end |
| end |
| |
| # removes the threads from workers, and signal when all the |
| # threads are complete. |
| def remove_current_thread |
| @stop_mutex.synchronize do |
| @workers.delete(Thread.current) |
| @stop_cond.signal if @workers.size.zero? |
| end |
| end |
| |
| def loop_execute_jobs(worker_queue) |
| loop do |
| begin |
| blk, args = worker_queue.pop |
| blk.call(*args) |
| rescue StandardError, GRPC::Core::CallError => e |
| GRPC.logger.warn('Error in worker thread') |
| GRPC.logger.warn(e) |
| end |
| # there shouldn't be any work given to this thread while its busy |
| fail('received a task while busy') unless worker_queue.empty? |
| @stop_mutex.synchronize do |
| return if @stopped |
| @ready_workers << worker_queue |
| end |
| end |
| end |
| end |
| |
| # RpcServer hosts a number of services and makes them available on the |
| # network. |
| class RpcServer |
| include Core::CallOps |
| include Core::TimeConsts |
| extend ::Forwardable |
| |
| def_delegators :@server, :add_http2_port |
| |
| # Default thread pool size is 30 |
| DEFAULT_POOL_SIZE = 30 |
| |
| # Deprecated due to internal changes to the thread pool |
| DEFAULT_MAX_WAITING_REQUESTS = 20 |
| |
| # Default poll period is 1s |
| DEFAULT_POLL_PERIOD = 1 |
| |
| # Signal check period is 0.25s |
| SIGNAL_CHECK_PERIOD = 0.25 |
| |
| # setup_connect_md_proc is used by #initialize to validate the |
| # connect_md_proc. |
| def self.setup_connect_md_proc(a_proc) |
| return nil if a_proc.nil? |
| fail(TypeError, '!Proc') unless a_proc.is_a? Proc |
| a_proc |
| end |
| |
| # Creates a new RpcServer. |
| # |
| # The RPC server is configured using keyword arguments. |
| # |
| # There are some specific keyword args used to configure the RpcServer |
| # instance. |
| # |
| # * pool_size: the size of the thread pool the server uses to run its |
| # threads. No more concurrent requests can be made than the size |
| # of the thread pool |
| # |
| # * max_waiting_requests: Deprecated due to internal changes to the thread |
| # pool. This is still an argument for compatibility but is ignored. |
| # |
| # * poll_period: The amount of time in seconds to wait for |
| # currently-serviced RPC's to finish before cancelling them when shutting |
| # down the server. |
| # |
| # * pool_keep_alive: The amount of time in seconds to wait |
| # for currently busy thread-pool threads to finish before |
| # forcing an abrupt exit to each thread. |
| # |
| # * connect_md_proc: |
| # when non-nil is a proc for determining metadata to send back the client |
| # on receiving an invocation req. The proc signature is: |
| # {key: val, ..} func(method_name, {key: val, ...}) |
| # |
| # * server_args: |
| # A server arguments hash to be passed down to the underlying core server |
| # |
| # * interceptors: |
| # An array of GRPC::ServerInterceptor objects that will be used for |
| # intercepting server handlers to provide extra functionality. |
| # Interceptors are an EXPERIMENTAL API. |
| # |
| def initialize(pool_size: DEFAULT_POOL_SIZE, |
| max_waiting_requests: DEFAULT_MAX_WAITING_REQUESTS, |
| poll_period: DEFAULT_POLL_PERIOD, |
| pool_keep_alive: Pool::DEFAULT_KEEP_ALIVE, |
| connect_md_proc: nil, |
| server_args: {}, |
| interceptors: []) |
| @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) |
| @max_waiting_requests = max_waiting_requests |
| @poll_period = poll_period |
| @pool_size = pool_size |
| @pool = Pool.new(@pool_size, keep_alive: pool_keep_alive) |
| @run_cond = ConditionVariable.new |
| @run_mutex = Mutex.new |
| # running_state can take 4 values: :not_started, :running, :stopping, and |
| # :stopped. State transitions can only proceed in that order. |
| @running_state = :not_started |
| @server = Core::Server.new(server_args) |
| @interceptors = InterceptorRegistry.new(interceptors) |
| end |
| |
| # stops a running server |
| # |
| # the call has no impact if the server is already stopped, otherwise |
| # server's current call loop is it's last. |
| def stop |
| # if called via run_till_terminated_or_interrupted, |
| # signal stop_server_thread and dont do anything |
| if @stop_server.nil? == false && @stop_server == false |
| @stop_server = true |
| @stop_server_cv.broadcast |
| return |
| end |
| @run_mutex.synchronize do |
| fail 'Cannot stop before starting' if @running_state == :not_started |
| return if @running_state != :running |
| transition_running_state(:stopping) |
| deadline = from_relative_time(@poll_period) |
| @server.shutdown_and_notify(deadline) |
| end |
| @pool.stop |
| end |
| |
| def running_state |
| @run_mutex.synchronize do |
| return @running_state |
| end |
| end |
| |
| # Can only be called while holding @run_mutex |
| def transition_running_state(target_state) |
| state_transitions = { |
| not_started: :running, |
| running: :stopping, |
| stopping: :stopped |
| } |
| if state_transitions[@running_state] == target_state |
| @running_state = target_state |
| else |
| fail "Bad server state transition: #{@running_state}->#{target_state}" |
| end |
| end |
| |
| def running? |
| running_state == :running |
| end |
| |
| def stopped? |
| running_state == :stopped |
| end |
| |
| # Is called from other threads to wait for #run to start up the server. |
| # |
| # If run has not been called, this returns immediately. |
| # |
| # @param timeout [Numeric] number of seconds to wait |
| # @return [true, false] true if the server is running, false otherwise |
| def wait_till_running(timeout = nil) |
| @run_mutex.synchronize do |
| @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started |
| return @running_state == :running |
| end |
| end |
| |
| # handle registration of classes |
| # |
| # service is either a class that includes GRPC::GenericService and whose |
| # #new function can be called without argument or any instance of such a |
| # class. |
| # |
| # E.g, after |
| # |
| # class Divider |
| # include GRPC::GenericService |
| # rpc :div DivArgs, DivReply # single request, single response |
| # def initialize(optional_arg='default option') # no args |
| # ... |
| # end |
| # |
| # srv = GRPC::RpcServer.new(...) |
| # |
| # # Either of these works |
| # |
| # srv.handle(Divider) |
| # |
| # # or |
| # |
| # srv.handle(Divider.new('replace optional arg')) |
| # |
| # It raises RuntimeError: |
| # - if service is not valid service class or object |
| # - its handler methods are already registered |
| # - if the server is already running |
| # |
| # @param service [Object|Class] a service class or object as described |
| # above |
| def handle(service) |
| @run_mutex.synchronize do |
| unless @running_state == :not_started |
| fail 'cannot add services if the server has been started' |
| end |
| cls = service.is_a?(Class) ? service : service.class |
| assert_valid_service_class(cls) |
| add_rpc_descs_for(service) |
| end |
| end |
| |
| # runs the server |
| # |
| # - if no rpc_descs are registered, this exits immediately, otherwise it |
| # continues running permanently and does not return until program exit. |
| # |
| # - #running? returns true after this is called, until #stop cause the |
| # the server to stop. |
| def run |
| @run_mutex.synchronize do |
| fail 'cannot run without registering services' if rpc_descs.size.zero? |
| @pool.start |
| @server.start |
| transition_running_state(:running) |
| @run_cond.broadcast |
| end |
| loop_handle_server_calls |
| end |
| |
| alias_method :run_till_terminated, :run |
| |
| # runs the server with signal handlers |
| # @param signals |
| # List of String, Integer or both representing signals that the user |
| # would like to send to the server for graceful shutdown |
| # @param wait_interval (optional) |
| # Integer seconds that user would like stop_server_thread to poll |
| # stop_server |
| def run_till_terminated_or_interrupted(signals, wait_interval = 60) |
| @stop_server = false |
| @stop_server_mu = Mutex.new |
| @stop_server_cv = ConditionVariable.new |
| |
| @stop_server_thread = Thread.new do |
| loop do |
| break if @stop_server |
| @stop_server_mu.synchronize do |
| @stop_server_cv.wait(@stop_server_mu, wait_interval) |
| end |
| end |
| |
| # stop is surrounded by mutex, should handle multiple calls to stop |
| # correctly |
| stop |
| end |
| |
| valid_signals = Signal.list |
| |
| # register signal handlers |
| signals.each do |sig| |
| # input validation |
| if sig.class == String |
| sig.upcase! |
| if sig.start_with?('SIG') |
| # cut out the SIG prefix to see if valid signal |
| sig = sig[3..-1] |
| end |
| end |
| |
| # register signal traps for all valid signals |
| if valid_signals.value?(sig) || valid_signals.key?(sig) |
| Signal.trap(sig) do |
| @stop_server = true |
| @stop_server_cv.broadcast |
| end |
| else |
| fail "#{sig} not a valid signal" |
| end |
| end |
| |
| run |
| |
| @stop_server_thread.join |
| end |
| |
| # Sends RESOURCE_EXHAUSTED if there are too many unprocessed jobs |
| def available?(an_rpc) |
| return an_rpc if @pool.ready_for_work? |
| GRPC.logger.warn('no free worker threads currently') |
| noop = proc { |x| x } |
| |
| # Create a new active call that knows that metadata hasn't been |
| # sent yet |
| c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, |
| metadata_received: true, started: false) |
| c.send_status(GRPC::Core::StatusCodes::RESOURCE_EXHAUSTED, |
| 'No free threads in thread pool') |
| nil |
| end |
| |
| # Sends UNIMPLEMENTED if the method is not implemented by this server |
| def implemented?(an_rpc) |
| mth = an_rpc.method.to_sym |
| return an_rpc if rpc_descs.key?(mth) |
| GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") |
| noop = proc { |x| x } |
| |
| # Create a new active call that knows that |
| # metadata hasn't been sent yet |
| c = ActiveCall.new(an_rpc.call, noop, noop, an_rpc.deadline, |
| metadata_received: true, started: false) |
| c.send_status(GRPC::Core::StatusCodes::UNIMPLEMENTED, '') |
| nil |
| end |
| |
| # handles calls to the server |
| def loop_handle_server_calls |
| fail 'not started' if running_state == :not_started |
| while running_state == :running |
| begin |
| an_rpc = @server.request_call |
| break if (!an_rpc.nil?) && an_rpc.call.nil? |
| active_call = new_active_server_call(an_rpc) |
| unless active_call.nil? |
| @pool.schedule(active_call) do |ac| |
| c, mth = ac |
| begin |
| rpc_descs[mth].run_server_method( |
| c, |
| rpc_handlers[mth], |
| @interceptors.build_context |
| ) |
| rescue StandardError |
| c.send_status(GRPC::Core::StatusCodes::INTERNAL, |
| 'Server handler failed') |
| end |
| end |
| end |
| rescue Core::CallError, RuntimeError => e |
| # these might happen for various reasons. The correct behavior of |
| # the server is to log them and continue, if it's not shutting down. |
| if running_state == :running |
| GRPC.logger.warn("server call failed: #{e}") |
| end |
| next |
| end |
| end |
| # @running_state should be :stopping here |
| @run_mutex.synchronize do |
| transition_running_state(:stopped) |
| GRPC.logger.info("stopped: #{self}") |
| @server.close |
| end |
| end |
| |
| def new_active_server_call(an_rpc) |
| return nil if an_rpc.nil? || an_rpc.call.nil? |
| |
| # allow the metadata to be accessed from the call |
| an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers |
| connect_md = nil |
| unless @connect_md_proc.nil? |
| connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) |
| end |
| |
| return nil unless available?(an_rpc) |
| return nil unless implemented?(an_rpc) |
| |
| # Create the ActiveCall. Indicate that metadata hasnt been sent yet. |
| GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") |
| rpc_desc = rpc_descs[an_rpc.method.to_sym] |
| c = ActiveCall.new(an_rpc.call, |
| rpc_desc.marshal_proc, |
| rpc_desc.unmarshal_proc(:input), |
| an_rpc.deadline, |
| metadata_received: true, |
| started: false, |
| metadata_to_send: connect_md) |
| c.attach_peer_cert(an_rpc.call.peer_cert) |
| mth = an_rpc.method.to_sym |
| [c, mth] |
| end |
| |
| protected |
| |
| def rpc_descs |
| @rpc_descs ||= {} |
| end |
| |
| def rpc_handlers |
| @rpc_handlers ||= {} |
| end |
| |
| def assert_valid_service_class(cls) |
| unless cls.include?(GenericService) |
| fail "#{cls} must 'include GenericService'" |
| end |
| fail "#{cls} should specify some rpc descriptions" if |
| cls.rpc_descs.size.zero? |
| end |
| |
| # This should be called while holding @run_mutex |
| def add_rpc_descs_for(service) |
| cls = service.is_a?(Class) ? service : service.class |
| specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) |
| cls.rpc_descs.each_pair do |name, spec| |
| route = "/#{cls.service_name}/#{name}".to_sym |
| fail "already registered: rpc #{route} from #{spec}" if specs.key? route |
| specs[route] = spec |
| rpc_name = GenericService.underscore(name.to_s).to_sym |
| if service.is_a?(Class) |
| handlers[route] = cls.new.method(rpc_name) |
| else |
| handlers[route] = service.method(rpc_name) |
| end |
| GRPC.logger.info("handling #{route} with #{handlers[route]}") |
| end |
| end |
| end |
| end |