blob: 8a5ba157de4a8b8c4832ad8a807dbb8482e29549 [file] [log] [blame]
/*
* vim:noexpandtab:shiftwidth=8:tabstop=8:
*
* Copyright CEA/DAM/DIF (2008)
* contributeur : Philippe DENIEL philippe.deniel@cea.fr
* Thomas LEIBOVICI thomas.leibovici@cea.fr
*
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 3 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*
* ---------------------------------------
*/
/**
* @file nfs_rpc_dispatcher_thread.c
* @brief Contains the @c rpc_dispatcher_thread routine and support code
*/
#include "config.h"
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <fcntl.h>
#include <sys/file.h> /* for having FNDELAY */
#include <sys/select.h>
#include <poll.h>
#ifdef RPC_VSOCK
#include <sys/types.h>
#include <sys/socket.h>
#include <linux/vm_sockets.h>
#endif
#include <assert.h>
#include "hashtable.h"
#include "log.h"
#include "gsh_rpc.h"
#include "abstract_atomic.h"
#include "nfs23.h"
#include "nfs4.h"
#include "mount.h"
#include "nlm4.h"
#include "rquota.h"
#include "nfs_init.h"
#include "nfs_core.h"
#include "nfs_convert.h"
#include "nfs_exports.h"
#include "nfs_proto_functions.h"
#include "nfs_req_queue.h"
#include "nfs_dupreq.h"
#include "nfs_file_handle.h"
#include "fridgethr.h"
/**
* TI-RPC event channels. Each channel is a thread servicing an event
* demultiplexer.
*/
struct rpc_evchan {
uint32_t chan_id; /*< Channel ID */
pthread_t thread_id; /*< POSIX thread ID */
};
#define N_TCP_EVENT_CHAN 3 /*< We don't really want to have too many,
relative to the number of available cores. */
#define UDP_EVENT_CHAN 0 /*< Put UDP on a dedicated channel */
#define TCP_RDVS_CHAN 1 /*< Accepts new tcp connections */
#define TCP_EVCHAN_0 2
#define N_EVENT_CHAN (N_TCP_EVENT_CHAN + 2)
static struct rpc_evchan rpc_evchan[N_EVENT_CHAN];
struct fridgethr *req_fridge; /*< Decoder thread pool */
struct nfs_req_st nfs_req_st; /*< Shared request queues */
const char *req_q_s[N_REQ_QUEUES] = {
"REQ_Q_MOUNT",
"REQ_Q_CALL",
"REQ_Q_LOW_LATENCY",
"REQ_Q_HIGH_LATENCY"
};
static u_int nfs_rpc_recv_user_data(SVCXPRT *xprt, SVCXPRT *newxprt,
const u_int flags, void *u_data);
static bool nfs_rpc_getreq_ng(SVCXPRT *xprt /*, int chan_id */);
static void nfs_rpc_free_user_data(SVCXPRT *xprt);
const char *xprt_stat_s[4] = {
"XPRT_DIED",
"XPRT_MOREREQS",
"XPRT_IDLE",
"XPRT_DESTROYED"
};
/**
* @brief Function never called, but the symbol is needed for svc_register.
*
* @param[in] ptr_req Unused
* @param[in] ptr_svc Unused
*/
void nfs_rpc_dispatch_dummy(struct svc_req *req, SVCXPRT *xprt)
{
LogMajor(COMPONENT_DISPATCH,
"NFS DISPATCH DUMMY: Possible error, function nfs_rpc_dispatch_dummy should never be called");
}
const char *tags[] = {
"NFS",
"MNT",
"NLM",
"RQUOTA",
"NFS_VSOCK",
};
typedef struct proto_data {
struct sockaddr_in sinaddr_udp;
struct sockaddr_in sinaddr_tcp;
struct sockaddr_in6 sinaddr_udp6;
struct sockaddr_in6 sinaddr_tcp6;
struct netbuf netbuf_udp6;
struct netbuf netbuf_tcp6;
struct t_bind bindaddr_udp6;
struct t_bind bindaddr_tcp6;
struct __rpc_sockinfo si_udp6;
struct __rpc_sockinfo si_tcp6;
} proto_data;
proto_data pdata[P_COUNT];
struct netconfig *netconfig_udpv4;
struct netconfig *netconfig_tcpv4;
struct netconfig *netconfig_udpv6;
struct netconfig *netconfig_tcpv6;
/* RPC Service Sockets and Transports */
int udp_socket[P_COUNT];
int tcp_socket[P_COUNT];
SVCXPRT *udp_xprt[P_COUNT];
SVCXPRT *tcp_xprt[P_COUNT];
/* Flag to indicate if V6 interfaces on the host are enabled */
bool v6disabled;
bool vsock;
/**
* @brief Unregister an RPC program.
*
* @param[in] prog Program to unregister
* @param[in] vers1 Lowest version
* @param[in] vers2 Highest version
*/
static void unregister(const rpcprog_t prog, const rpcvers_t vers1,
const rpcvers_t vers2)
{
rpcvers_t vers;
for (vers = vers1; vers <= vers2; vers++) {
rpcb_unset(prog, vers, netconfig_udpv4);
rpcb_unset(prog, vers, netconfig_tcpv4);
if (netconfig_udpv6)
rpcb_unset(prog, vers, netconfig_udpv6);
if (netconfig_tcpv6)
rpcb_unset(prog, vers, netconfig_tcpv6);
}
}
static void unregister_rpc(void)
{
if ((nfs_param.core_param.core_options & CORE_OPTION_NFSV3) != 0) {
unregister(nfs_param.core_param.program[P_NFS], NFS_V2, NFS_V4);
unregister(nfs_param.core_param.program[P_MNT], MOUNT_V1,
MOUNT_V3);
} else {
unregister(nfs_param.core_param.program[P_NFS], NFS_V4, NFS_V4);
}
#ifdef _USE_NLM
if (nfs_param.core_param.enable_NLM)
unregister(nfs_param.core_param.program[P_NLM], 1, NLM4_VERS);
#endif /* _USE_NLM */
if (nfs_param.core_param.enable_RQUOTA) {
unregister(nfs_param.core_param.program[P_RQUOTA], RQUOTAVERS,
EXT_RQUOTAVERS);
}
}
static inline bool nfs_protocol_enabled(protos p)
{
bool nfsv3 = nfs_param.core_param.core_options & CORE_OPTION_NFSV3;
switch (p) {
case P_NFS:
return true;
case P_MNT: /* valid only for NFSv3 environments */
if (nfsv3)
return true;
break;
#ifdef _USE_NLM
case P_NLM: /* valid only for NFSv3 environments */
if (nfsv3 && nfs_param.core_param.enable_NLM)
return true;
break;
#endif
case P_RQUOTA:
if (nfs_param.core_param.enable_RQUOTA)
return true;
break;
default:
break;
}
return false;
}
/**
* @brief Close file descriptors used for RPC services.
*
* So that restarting the NFS server wont encounter issues of "Address
* Already In Use" - this has occurred even though we set the
* SO_REUSEADDR option when restarting the server with a single export
* (i.e.: a small config) & no logging at all, making the restart very
* fast. when closing a listening socket it will be closed
* immediately if no connection is pending on it, hence drastically
* reducing the probability for trouble.
*/
static void close_rpc_fd(void)
{
protos p;
for (p = P_NFS; p < P_COUNT; p++) {
if (udp_socket[p] != -1)
close(udp_socket[p]);
if (tcp_socket[p] != -1)
close(tcp_socket[p]);
}
if (vsock)
close(tcp_socket[P_NFS_VSOCK]);
}
void Create_udp(protos prot)
{
udp_xprt[prot] =
svc_dg_create(udp_socket[prot],
nfs_param.core_param.rpc.max_send_buffer_size,
nfs_param.core_param.rpc.max_recv_buffer_size);
if (udp_xprt[prot] == NULL)
LogFatal(COMPONENT_DISPATCH, "Cannot allocate %s/UDP SVCXPRT",
tags[prot]);
/* Hook xp_getreq */
(void)SVC_CONTROL(udp_xprt[prot], SVCSET_XP_GETREQ, nfs_rpc_getreq_ng);
/* Hook xp_free_user_data (finalize/free private data) */
(void)SVC_CONTROL(udp_xprt[prot], SVCSET_XP_FREE_USER_DATA,
nfs_rpc_free_user_data);
/* Setup private data */
(udp_xprt[prot])->xp_u1 =
alloc_gsh_xprt_private(udp_xprt[prot],
XPRT_PRIVATE_FLAG_NONE);
/* bind xprt to channel--unregister it from the global event
* channel (if applicable) */
(void)svc_rqst_evchan_reg(rpc_evchan[UDP_EVENT_CHAN].chan_id,
udp_xprt[prot], SVC_RQST_FLAG_XPRT_UREG);
}
void Create_tcp(protos prot)
{
tcp_xprt[prot] =
svc_vc_create2(tcp_socket[prot],
nfs_param.core_param.rpc.max_send_buffer_size,
nfs_param.core_param.rpc.max_recv_buffer_size,
SVC_VC_CREATE_LISTEN);
if (tcp_xprt[prot] == NULL)
LogFatal(COMPONENT_DISPATCH, "Cannot allocate %s/TCP SVCXPRT",
tags[prot]);
/* bind xprt to channel--unregister it from the global event
* channel (if applicable) */
(void)svc_rqst_evchan_reg(rpc_evchan[TCP_RDVS_CHAN].chan_id,
tcp_xprt[prot], SVC_RQST_FLAG_XPRT_UREG);
/* Hook xp_getreq */
(void)SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_GETREQ, nfs_rpc_getreq_ng);
/* Hook xp_recv_user_data -- allocate new xprts to event channels */
(void)SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_RECV_USER_DATA,
nfs_rpc_recv_user_data);
/* Hook xp_free_user_data (finalize/free private data) */
(void)SVC_CONTROL(tcp_xprt[prot], SVCSET_XP_FREE_USER_DATA,
nfs_rpc_free_user_data);
/* Setup private data */
(tcp_xprt[prot])->xp_u1 =
alloc_gsh_xprt_private(tcp_xprt[prot],
XPRT_PRIVATE_FLAG_NONE);
}
void create_vsock(void)
{
tcp_xprt[P_NFS_VSOCK] =
svc_vc_create2(tcp_socket[P_NFS_VSOCK],
nfs_param.core_param.rpc.max_send_buffer_size,
nfs_param.core_param.rpc.max_recv_buffer_size,
SVC_VC_CREATE_LISTEN);
if (tcp_xprt[P_NFS_VSOCK] == NULL)
LogFatal(COMPONENT_DISPATCH,
"Cannot allocate %s/TCP VSOCK SVCXPRT",
tags[P_NFS_VSOCK]);
/* bind xprt to channel--unregister it from the global event
* channel (if applicable) */
(void)svc_rqst_evchan_reg(rpc_evchan[TCP_RDVS_CHAN].chan_id,
tcp_xprt[P_NFS_VSOCK],
SVC_RQST_FLAG_XPRT_UREG);
/* Hook xp_getreq */
(void)SVC_CONTROL(tcp_xprt[P_NFS_VSOCK], SVCSET_XP_GETREQ,
nfs_rpc_getreq_ng);
/* Hook xp_recv_user_data -- allocate new xprts to event channels */
(void)SVC_CONTROL(tcp_xprt[P_NFS_VSOCK], SVCSET_XP_RECV_USER_DATA,
nfs_rpc_recv_user_data);
/* Hook xp_free_user_data (finalize/free private data) */
(void)SVC_CONTROL(tcp_xprt[P_NFS_VSOCK], SVCSET_XP_FREE_USER_DATA,
nfs_rpc_free_user_data);
/* Setup private data */
(tcp_xprt[P_NFS_VSOCK])->xp_u1 =
alloc_gsh_xprt_private(tcp_xprt[P_NFS_VSOCK],
XPRT_PRIVATE_FLAG_NONE);
}
/**
* @brief Create the SVCXPRT for each protocol in use
*/
void Create_SVCXPRTs(void)
{
protos p;
LogFullDebug(COMPONENT_DISPATCH, "Allocation of the SVCXPRT");
for (p = P_NFS; p < P_COUNT; p++)
if (nfs_protocol_enabled(p)) {
Create_udp(p);
Create_tcp(p);
}
#ifdef RPC_VSOCK
if (vsock)
create_vsock();
#endif /* RPC_VSOCK */
}
/**
* @brief Bind the udp and tcp sockets for V6 Interfaces
*/
static int Bind_sockets_V6(void)
{
protos p;
int rc = 0;
for (p = P_NFS; p < P_COUNT; p++) {
if (nfs_protocol_enabled(p)) {
proto_data *pdatap = &pdata[p];
memset(&pdatap->sinaddr_udp6, 0,
sizeof(pdatap->sinaddr_udp6));
pdatap->sinaddr_udp6.sin6_family = AF_INET6;
/* all interfaces */
pdatap->sinaddr_udp6.sin6_addr = in6addr_any;
pdatap->sinaddr_udp6.sin6_port =
htons(nfs_param.core_param.port[p]);
pdatap->netbuf_udp6.maxlen =
sizeof(pdatap->sinaddr_udp6);
pdatap->netbuf_udp6.len = sizeof(pdatap->sinaddr_udp6);
pdatap->netbuf_udp6.buf = &pdatap->sinaddr_udp6;
pdatap->bindaddr_udp6.qlen = SOMAXCONN;
pdatap->bindaddr_udp6.addr = pdatap->netbuf_udp6;
if (!__rpc_fd2sockinfo(udp_socket[p],
&pdatap->si_udp6)) {
LogWarn(COMPONENT_DISPATCH,
"Cannot get %s socket info for udp6 socket errno=%d (%s)",
tags[p], errno, strerror(errno));
return -1;
}
rc = bind(udp_socket[p],
(struct sockaddr *)pdatap->bindaddr_udp6.addr.buf,
(socklen_t) pdatap->si_udp6.si_alen);
if (rc == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot bind %s udp6 socket, error %d (%s)",
tags[p], errno, strerror(errno));
goto exit;
}
memset(&pdatap->sinaddr_tcp6, 0,
sizeof(pdatap->sinaddr_tcp6));
pdatap->sinaddr_tcp6.sin6_family = AF_INET6;
/* all interfaces */
pdatap->sinaddr_tcp6.sin6_addr = in6addr_any;
pdatap->sinaddr_tcp6.sin6_port =
htons(nfs_param.core_param.port[p]);
pdatap->netbuf_tcp6.maxlen =
sizeof(pdatap->sinaddr_tcp6);
pdatap->netbuf_tcp6.len = sizeof(pdatap->sinaddr_tcp6);
pdatap->netbuf_tcp6.buf = &pdatap->sinaddr_tcp6;
pdatap->bindaddr_tcp6.qlen = SOMAXCONN;
pdatap->bindaddr_tcp6.addr = pdatap->netbuf_tcp6;
if (!__rpc_fd2sockinfo(tcp_socket[p],
&pdatap->si_tcp6)) {
LogWarn(COMPONENT_DISPATCH,
"Cannot get %s socket info for tcp6 socket errno=%d (%s)",
tags[p], errno, strerror(errno));
return -1;
}
rc = bind(tcp_socket[p],
(struct sockaddr *)
pdatap->bindaddr_tcp6.addr.buf,
(socklen_t) pdatap->si_tcp6.si_alen);
if (rc == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot bind %s tcp6 socket, error %d (%s)",
tags[p], errno, strerror(errno));
goto exit;
}
}
}
exit:
return rc;
}
/**
* @brief Bind the udp and tcp sockets for V4 Interfaces
*/
static int Bind_sockets_V4(void)
{
protos p;
int rc = 0;
for (p = P_NFS; p < P_COUNT; p++) {
if (nfs_protocol_enabled(p)) {
proto_data *pdatap = &pdata[p];
memset(&pdatap->sinaddr_udp, 0,
sizeof(pdatap->sinaddr_udp));
pdatap->sinaddr_udp.sin_family = AF_INET;
/* all interfaces */
pdatap->sinaddr_udp.sin_addr.s_addr = htonl(INADDR_ANY);
pdatap->sinaddr_udp.sin_port =
htons(nfs_param.core_param.port[p]);
pdatap->netbuf_udp6.maxlen =
sizeof(pdatap->sinaddr_udp);
pdatap->netbuf_udp6.len = sizeof(pdatap->sinaddr_udp);
pdatap->netbuf_udp6.buf = &pdatap->sinaddr_udp;
pdatap->bindaddr_udp6.qlen = SOMAXCONN;
pdatap->bindaddr_udp6.addr = pdatap->netbuf_udp6;
if (!__rpc_fd2sockinfo(udp_socket[p],
&pdatap->si_udp6)) {
LogWarn(COMPONENT_DISPATCH,
"Cannot get %s socket info for udp6 socket errno=%d (%s)",
tags[p], errno, strerror(errno));
return -1;
}
rc = bind(udp_socket[p],
(struct sockaddr *)
pdatap->bindaddr_udp6.addr.buf,
(socklen_t) pdatap->si_udp6.si_alen);
if (rc == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot bind %s udp6 socket, error %d (%s)",
tags[p], errno, strerror(errno));
return -1;
}
memset(&pdatap->sinaddr_tcp, 0,
sizeof(pdatap->sinaddr_tcp));
pdatap->sinaddr_tcp.sin_family = AF_INET;
/* all interfaces */
pdatap->sinaddr_tcp.sin_addr.s_addr = htonl(INADDR_ANY);
pdatap->sinaddr_tcp.sin_port =
htons(nfs_param.core_param.port[p]);
pdatap->netbuf_tcp6.maxlen =
sizeof(pdatap->sinaddr_tcp);
pdatap->netbuf_tcp6.len = sizeof(pdatap->sinaddr_tcp);
pdatap->netbuf_tcp6.buf = &pdatap->sinaddr_tcp;
pdatap->bindaddr_tcp6.qlen = SOMAXCONN;
pdatap->bindaddr_tcp6.addr = pdatap->netbuf_tcp6;
if (!__rpc_fd2sockinfo(tcp_socket[p],
&pdatap->si_tcp6)) {
LogWarn(COMPONENT_DISPATCH,
"V4 : Cannot get %s socket info for tcp socket error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
rc = bind(tcp_socket[p],
(struct sockaddr *)
pdatap->bindaddr_tcp6.addr.buf,
(socklen_t) pdatap->si_tcp6.si_alen);
if (rc == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot bind %s tcp socket, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
}
}
return rc;
}
#ifdef RPC_VSOCK
int bind_sockets_vsock(void)
{
int rc = 0;
struct sockaddr_vm sa_listen = {
.svm_family = AF_VSOCK,
.svm_cid = VMADDR_CID_ANY,
.svm_port = nfs_param.core_param.port[P_NFS],
};
rc = bind(tcp_socket[P_NFS_VSOCK], (struct sockaddr *)
(struct sockaddr *)&sa_listen, sizeof(sa_listen));
if (rc == -1) {
LogWarn(COMPONENT_DISPATCH,
"cannot bind %s stream socket, error %d(%s)",
tags[P_NFS_VSOCK], errno, strerror(errno));
}
return rc;
}
#endif /* RPC_VSOCK */
void Bind_sockets(void)
{
int rc = 0;
/*
* See Allocate_sockets(), which should already
* have set the global v6disabled accordingly
*/
if (v6disabled) {
rc = Bind_sockets_V4();
if (rc)
LogFatal(COMPONENT_DISPATCH,
"Error binding to V4 interface. Cannot continue.");
} else {
rc = Bind_sockets_V6();
if (rc)
LogFatal(COMPONENT_DISPATCH,
"Error binding to V6 interface. Cannot continue.");
}
#ifdef RPC_VSOCK
if (vsock) {
rc = bind_sockets_vsock();
if (rc)
LogMajor(COMPONENT_DISPATCH,
"AF_VSOCK bind failed (continuing startup)");
}
#endif /* RPC_VSOCK */
LogInfo(COMPONENT_DISPATCH,
"Bind_sockets() successful, v6disabled = %d, vsock = %d",
v6disabled, vsock);
}
/**
* @brief Function to set the socket options on the allocated
* udp and tcp sockets
*
*/
static int alloc_socket_setopts(int p)
{
int one = 1;
/* Use SO_REUSEADDR in order to avoid wait
* the 2MSL timeout */
if (setsockopt(udp_socket[p],
SOL_SOCKET, SO_REUSEADDR,
&one, sizeof(one))) {
LogWarn(COMPONENT_DISPATCH,
"Bad udp socket options for %s, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
if (setsockopt(tcp_socket[p],
SOL_SOCKET, SO_REUSEADDR,
&one, sizeof(one))) {
LogWarn(COMPONENT_DISPATCH,
"Bad tcp socket options for %s, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
/* We prefer using non-blocking socket
* in the specific case */
if (fcntl(udp_socket[p], F_SETFL, FNDELAY) == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot set udp socket for %s as non blocking, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
return 0;
}
/**
* @brief Allocate the tcp and udp sockets for the nfs daemon
* using V4 interfaces
*/
static int Allocate_sockets_V4(int p)
{
udp_socket[p] = socket(AF_INET,
SOCK_DGRAM,
IPPROTO_UDP);
if (udp_socket[p] == -1) {
if (errno == EAFNOSUPPORT) {
LogInfo(COMPONENT_DISPATCH,
"No V6 and V4 intfs configured?!");
}
LogWarn(COMPONENT_DISPATCH,
"Cannot allocate a udp socket for %s, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
tcp_socket[p] = socket(AF_INET,
SOCK_STREAM,
IPPROTO_TCP);
if (tcp_socket[p] == -1) {
LogWarn(COMPONENT_DISPATCH,
"Cannot allocate a tcp socket for %s, error %d(%s)",
tags[p], errno, strerror(errno));
return -1;
}
return 0;
}
#ifdef RPC_VSOCK
/**
* @brief Create vmci stream socket
*/
static int allocate_socket_vsock(void)
{
int one = 1;
tcp_socket[P_NFS_VSOCK] = socket(AF_VSOCK, SOCK_STREAM, 0);
if (tcp_socket[P_NFS_VSOCK] == -1) {
LogWarn(COMPONENT_DISPATCH,
"socket create failed for %s, error %d(%s)",
tags[P_NFS_VSOCK], errno, strerror(errno));
return -1;
}
if (setsockopt(tcp_socket[P_NFS_VSOCK],
SOL_SOCKET, SO_REUSEADDR,
&one, sizeof(one))) {
LogWarn(COMPONENT_DISPATCH,
"bad tcp socket options for %s, error %d(%s)",
tags[P_NFS_VSOCK], errno, strerror(errno));
return -1;
}
return 0;
}
#endif /* RPC_VSOCK */
/**
* @brief Allocate the tcp and udp sockets for the nfs daemon
*/
static void Allocate_sockets(void)
{
protos p;
int rc = 0;
LogFullDebug(COMPONENT_DISPATCH, "Allocation of the sockets");
for (p = P_NFS; p < P_COUNT; p++) {
if (nfs_protocol_enabled(p)) {
/* Initialize all the sockets to -1 because
* it makes some code later easier */
udp_socket[p] = -1;
tcp_socket[p] = -1;
if (v6disabled)
goto try_V4;
udp_socket[p] = socket(AF_INET6,
SOCK_DGRAM,
IPPROTO_UDP);
if (udp_socket[p] == -1) {
/*
* We assume that EAFNOSUPPORT points
* to the likely case when the host has
* V6 interfaces disabled. So we will
* try to use the existing V4 interfaces
* instead
*/
if (errno == EAFNOSUPPORT) {
v6disabled = true;
LogWarn(COMPONENT_DISPATCH,
"System may not have V6 intfs configured error %d(%s)",
errno, strerror(errno));
goto try_V4;
}
LogFatal(COMPONENT_DISPATCH,
"Cannot allocate a udp socket for %s, error %d(%s)",
tags[p], errno, strerror(errno));
}
tcp_socket[p] = socket(AF_INET6,
SOCK_STREAM,
IPPROTO_TCP);
/* We fail with LogFatal here on error because it
* shouldn't be that we have managed to create a
* V6 based udp socket and have failed for the tcp
* sock. If it were a case of V6 being disabled,
* then we would have encountered that case with
* the first udp sock create and would have moved
* on to create the V4 sockets.
*/
if (tcp_socket[p] == -1)
LogFatal(COMPONENT_DISPATCH,
"Cannot allocate a tcp socket for %s, error %d(%s)",
tags[p], errno, strerror(errno));
try_V4:
if (v6disabled) {
rc = Allocate_sockets_V4(p);
if (rc) {
LogFatal(COMPONENT_DISPATCH,
"Error allocating V4 socket for proto %d, %s",
p, tags[p]);
}
}
rc = alloc_socket_setopts(p);
if (rc) {
LogFatal(COMPONENT_DISPATCH,
"Error setting socket option for proto %d, %s",
p, tags[p]);
}
}
}
#ifdef RPC_VSOCK
if (vsock)
allocate_socket_vsock();
#endif /* RPC_VSOCK */
}
/* The following routine must ONLY be called from the shutdown
* thread */
void Clean_RPC(void)
{
/**
* @todo Consider the need to call Svc_dg_destroy for UDP & ?? for
* TCP based services
*/
unregister_rpc();
close_rpc_fd();
}
#define UDP_REGISTER(prot, vers, netconfig) \
svc_reg(udp_xprt[prot], nfs_param.core_param.program[prot], \
(u_long) vers, \
nfs_rpc_dispatch_dummy, netconfig)
#define TCP_REGISTER(prot, vers, netconfig) \
svc_reg(tcp_xprt[prot], nfs_param.core_param.program[prot], \
(u_long) vers, \
nfs_rpc_dispatch_dummy, netconfig)
void Register_program(protos prot, int flag, int vers)
{
if ((nfs_param.core_param.core_options & flag) != 0) {
LogInfo(COMPONENT_DISPATCH, "Registering %s V%d/UDP",
tags[prot], (int)vers);
/* XXXX fix svc_register! */
if (!UDP_REGISTER(prot, vers, netconfig_udpv4))
LogCrit(COMPONENT_DISPATCH,
"Cannot register %s V%d on UDP", tags[prot],
(int)vers);
if (netconfig_udpv6) {
LogInfo(COMPONENT_DISPATCH, "Registering %s V%d/UDPv6",
tags[prot], (int)vers);
if (!UDP_REGISTER(prot, vers, netconfig_udpv6))
LogCrit(COMPONENT_DISPATCH,
"Cannot register %s V%d on UDPv6",
tags[prot], (int)vers);
}
#ifndef _NO_TCP_REGISTER
LogInfo(COMPONENT_DISPATCH, "Registering %s V%d/TCP",
tags[prot], (int)vers);
if (!TCP_REGISTER(prot, vers, netconfig_tcpv4))
LogCrit(COMPONENT_DISPATCH,
"Cannot register %s V%d on TCP", tags[prot],
(int)vers);
if (netconfig_tcpv6) {
LogInfo(COMPONENT_DISPATCH, "Registering %s V%d/TCPv6",
tags[prot], (int)vers);
if (!TCP_REGISTER(prot, vers, netconfig_tcpv6))
LogCrit(COMPONENT_DISPATCH,
"Cannot register %s V%d on TCPv6",
tags[prot], (int)vers);
}
#endif /* _NO_TCP_REGISTER */
}
}
tirpc_pkg_params ntirpc_pp = {
0,
0,
(mem_format_t)rpc_warnx,
gsh_free_size,
gsh_malloc__,
gsh_malloc_aligned__,
gsh_calloc__,
gsh_realloc__,
};
/**
* @brief Init the svc descriptors for the nfs daemon
*
* Perform all the required initialization for the RPC subsystem and event
* channels.
*/
void nfs_Init_svc(void)
{
svc_init_params svc_params;
int ix, code __attribute__ ((unused)) = 0;
LogDebug(COMPONENT_DISPATCH, "NFS INIT: Core options = %d",
nfs_param.core_param.core_options);
/* Init request queue before RPC stack */
nfs_rpc_queue_init();
LogInfo(COMPONENT_DISPATCH, "NFS INIT: using TIRPC");
memset(&svc_params, 0, sizeof(svc_params));
#ifdef __FreeBSD__
v6disabled = true;
#else
v6disabled = false;
#endif
/* Set TIRPC debug flags */
ntirpc_pp.debug_flags = nfs_param.core_param.rpc.debug_flags;
/* Redirect TI-RPC allocators, log channel */
if (!tirpc_control(TIRPC_PUT_PARAMETERS, &ntirpc_pp))
LogCrit(COMPONENT_INIT, "Setting nTI-RPC parameters failed");
#ifdef RPC_VSOCK
vsock = nfs_param.core_param.core_options & CORE_OPTION_NFS_VSOCK;
#endif
/* New TI-RPC package init function */
svc_params.flags = SVC_INIT_EPOLL; /* use EPOLL event mgmt */
svc_params.flags |= SVC_INIT_NOREG_XPRTS; /* don't call xprt_register */
svc_params.max_connections = nfs_param.core_param.rpc.max_connections;
svc_params.max_events = 1024; /* length of epoll event queue */
svc_params.svc_ioq_maxbuf =
nfs_param.core_param.rpc.max_send_buffer_size;
svc_params.idle_timeout = nfs_param.core_param.rpc.idle_timeout_s;
svc_params.ioq_thrd_max = /* max ioq worker threads */
nfs_param.core_param.rpc.ioq_thrd_max;
/* GSS ctx cache tuning, expiration */
svc_params.gss_ctx_hash_partitions =
nfs_param.core_param.rpc.gss.ctx_hash_partitions;
svc_params.gss_max_ctx =
nfs_param.core_param.rpc.gss.max_ctx;
svc_params.gss_max_gc =
nfs_param.core_param.rpc.gss.max_gc;
/* Only after TI-RPC allocators, log channel are setup */
if (!svc_init(&svc_params))
LogFatal(COMPONENT_INIT, "SVC initialization failed");
for (ix = 0; ix < N_EVENT_CHAN; ++ix) {
rpc_evchan[ix].chan_id = 0;
code = svc_rqst_new_evchan(&rpc_evchan[ix].chan_id,
NULL /* u_data */,
SVC_RQST_FLAG_NONE);
if (code)
LogFatal(COMPONENT_DISPATCH,
"Cannot create TI-RPC event channel (%d, %d)",
ix, code);
/* XXX bail?? */
}
/* Get the netconfig entries from /etc/netconfig */
netconfig_udpv4 = (struct netconfig *)getnetconfigent("udp");
if (netconfig_udpv4 == NULL)
LogFatal(COMPONENT_DISPATCH,
"Cannot get udp netconfig, cannot get an entry for udp in netconfig file. Check file /etc/netconfig...");
/* Get the netconfig entries from /etc/netconfig */
netconfig_tcpv4 = (struct netconfig *)getnetconfigent("tcp");
if (netconfig_tcpv4 == NULL)
LogFatal(COMPONENT_DISPATCH,
"Cannot get tcp netconfig, cannot get an entry for tcp in netconfig file. Check file /etc/netconfig...");
/* A short message to show that /etc/netconfig parsing was a success */
LogFullDebug(COMPONENT_DISPATCH, "netconfig found for UDPv4 and TCPv4");
LogInfo(COMPONENT_DISPATCH, "NFS INIT: Using IPv6");
/* Get the netconfig entries from /etc/netconfig */
netconfig_udpv6 = (struct netconfig *)getnetconfigent("udp6");
if (netconfig_udpv6 == NULL)
LogInfo(COMPONENT_DISPATCH,
"Cannot get udp6 netconfig, cannot get an entry for udp6 in netconfig file. Check file /etc/netconfig...");
/* Get the netconfig entries from /etc/netconfig */
netconfig_tcpv6 = (struct netconfig *)getnetconfigent("tcp6");
if (netconfig_tcpv6 == NULL)
LogInfo(COMPONENT_DISPATCH,
"Cannot get tcp6 netconfig, cannot get an entry for tcp in netconfig file. Check file /etc/netconfig...");
/* A short message to show that /etc/netconfig parsing was a success
* for ipv6
*/
if (netconfig_udpv6 && netconfig_tcpv6)
LogFullDebug(COMPONENT_DISPATCH,
"netconfig found for UDPv6 and TCPv6");
/* Allocate the UDP and TCP sockets for the RPC */
Allocate_sockets();
if ((nfs_param.core_param.core_options & CORE_OPTION_NFSV3) != 0) {
/* Some log that can be useful when debug ONC/RPC
* and RPCSEC_GSS matter */
LogDebug(COMPONENT_DISPATCH,
"Socket numbers are: nfs_udp=%u nfs_tcp=%u nfs_vsock=%u mnt_udp=%u mnt_tcp=%u nlm_tcp=%u nlm_udp=%u",
udp_socket[P_NFS],
tcp_socket[P_NFS],
tcp_socket[P_NFS_VSOCK],
udp_socket[P_MNT],
tcp_socket[P_MNT],
udp_socket[P_NLM],
tcp_socket[P_NLM]);
} else {
/* Some log that can be useful when debug ONC/RPC
* and RPCSEC_GSS matter */
LogDebug(COMPONENT_DISPATCH,
"Socket numbers are: nfs_udp=%u nfs_tcp=%u nfs_vsock=%u",
udp_socket[P_NFS],
tcp_socket[P_NFS],
tcp_socket[P_NFS_VSOCK]);
}
/* Some log that can be useful when debug ONC/RPC
* and RPCSEC_GSS matter */
LogDebug(COMPONENT_DISPATCH,
"Socket numbers are: rquota_udp=%u rquota_tcp=%u",
udp_socket[P_RQUOTA], tcp_socket[P_RQUOTA]);
if ((nfs_param.core_param.core_options &
CORE_OPTION_ALL_NFS_VERS) != 0) {
/* Bind the tcp and udp sockets */
Bind_sockets();
/* Unregister from portmapper/rpcbind */
unregister_rpc();
/* Set up well-known xprt handles */
Create_SVCXPRTs();
}
#ifdef _HAVE_GSSAPI
/* Acquire RPCSEC_GSS basis if needed */
if (nfs_param.krb5_param.active_krb5) {
if (!svcauth_gss_import_name
(nfs_param.krb5_param.svc.principal)) {
LogFatal(COMPONENT_DISPATCH,
"Could not import principal name %s into GSSAPI",
nfs_param.krb5_param.svc.principal);
} else {
LogInfo(COMPONENT_DISPATCH,
"Successfully imported principal %s into GSSAPI",
nfs_param.krb5_param.svc.principal);
/* Trying to acquire a credentials
* for checking name's validity */
if (!svcauth_gss_acquire_cred())
LogCrit(COMPONENT_DISPATCH,
"Cannot acquire credentials for principal %s",
nfs_param.krb5_param.svc.principal);
else
LogDebug(COMPONENT_DISPATCH,
"Principal %s is suitable for acquiring credentials",
nfs_param.krb5_param.svc.principal);
}
}
#endif /* _HAVE_GSSAPI */
#ifndef _NO_PORTMAPPER
/* Perform all the RPC registration, for UDP and TCP,
* for NFS_V2, NFS_V3 and NFS_V4 */
#ifdef _USE_NFS3
Register_program(P_NFS, CORE_OPTION_NFSV3, NFS_V3);
#endif /* _USE_NFS3 */
Register_program(P_NFS, CORE_OPTION_NFSV4, NFS_V4);
Register_program(P_MNT, CORE_OPTION_NFSV3, MOUNT_V1);
Register_program(P_MNT, CORE_OPTION_NFSV3, MOUNT_V3);
#ifdef _USE_NLM
if (nfs_param.core_param.enable_NLM)
Register_program(P_NLM, CORE_OPTION_NFSV3, NLM4_VERS);
#endif /* _USE_NLM */
if (nfs_param.core_param.enable_RQUOTA &&
(nfs_param.core_param.core_options & (CORE_OPTION_NFSV3 |
CORE_OPTION_NFSV4))) {
Register_program(P_RQUOTA, CORE_OPTION_ALL_VERS, RQUOTAVERS);
Register_program(P_RQUOTA, CORE_OPTION_ALL_VERS,
EXT_RQUOTAVERS);
}
#endif /* _NO_PORTMAPPER */
}
/* forward declaration in lieu of moving code {WAS} */
static void *rpc_dispatcher_thread(void *arg);
/**
* @brief Start service threads
*
* @param[in] attr_thr Attributes for started threads
*/
void nfs_rpc_dispatch_threads(pthread_attr_t *attr_thr)
{
int ix, code = 0;
/* Start event channel service threads */
for (ix = 0; ix < N_EVENT_CHAN; ++ix) {
code = pthread_create(&rpc_evchan[ix].thread_id, attr_thr,
rpc_dispatcher_thread,
(void *)&rpc_evchan[ix].chan_id);
if (code != 0)
LogFatal(COMPONENT_THREAD,
"Could not create rpc_dispatcher_thread #%u, error = %d (%s)",
ix, errno, strerror(errno));
}
LogInfo(COMPONENT_THREAD,
"%d rpc dispatcher threads were started successfully",
N_EVENT_CHAN);
}
void nfs_rpc_dispatch_stop(void)
{
int ix;
for (ix = 0; ix < N_EVENT_CHAN; ++ix) {
svc_rqst_thrd_signal(rpc_evchan[ix].chan_id,
SVC_RQST_SIGNAL_SHUTDOWN);
}
}
/**
* @brief Rendezvous callout. This routine will be called by TI-RPC
* after newxprt has been accepted.
*
* Register newxprt on a TCP event channel. Balancing events/channels
* could become involved. To start with, just cycle through them as
* new connections are accepted.
*
* @param[in] xprt Transport
* @param[in] newxprt Newly created transport
* @param[in] flags Unused
* @param[in] u_data Whatever
*
* @return Always returns 0.
*/
static u_int nfs_rpc_recv_user_data(SVCXPRT *xprt, SVCXPRT *newxprt,
const u_int flags, void *u_data)
{
static uint32_t next_chan = TCP_EVCHAN_0;
static pthread_mutex_t mtx = PTHREAD_MUTEX_INITIALIZER;
uint32_t tchan;
PTHREAD_MUTEX_lock(&mtx);
tchan = next_chan;
assert((next_chan >= TCP_EVCHAN_0) && (next_chan < N_EVENT_CHAN));
if (++next_chan >= N_EVENT_CHAN)
next_chan = TCP_EVCHAN_0;
/* setup private data (freed when xprt is destroyed) */
newxprt->xp_u1 =
alloc_gsh_xprt_private(newxprt, XPRT_PRIVATE_FLAG_NONE);
/* NB: xu->drc is allocated on first request--we need shared
* TCP DRC for v3, but per-connection for v4 */
PTHREAD_MUTEX_unlock(&mtx);
(void)svc_rqst_evchan_reg(rpc_evchan[tchan].chan_id, newxprt,
SVC_RQST_FLAG_NONE);
return 0;
}
/**
* @brief xprt destructor callout
*
* @param[in] xprt Transport to destroy
*/
static void nfs_rpc_free_user_data(SVCXPRT *xprt)
{
if (xprt->xp_u2) {
nfs_dupreq_put_drc(xprt, xprt->xp_u2, DRC_FLAG_RELEASE);
xprt->xp_u2 = NULL;
}
free_gsh_xprt_private(xprt);
}
uint32_t nfs_rpc_outstanding_reqs_est(void)
{
static uint32_t ctr;
static uint32_t nreqs;
struct req_q_pair *qpair;
uint32_t treqs;
int ix;
if ((atomic_inc_uint32_t(&ctr) % 10) != 0)
return atomic_fetch_uint32_t(&nreqs);
treqs = 0;
for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
qpair = &(nfs_req_st.reqs.nfs_request_q.qset[ix]);
treqs += atomic_fetch_uint32_t(&qpair->producer.size);
treqs += atomic_fetch_uint32_t(&qpair->consumer.size);
}
atomic_store_uint32_t(&nreqs, treqs);
return treqs;
}
static inline bool stallq_should_unstall(SVCXPRT *xprt)
{
return ((xprt->xp_requests
< nfs_param.core_param.dispatch_max_reqs_xprt / 2)
|| (xprt->xp_flags & SVC_XPRT_FLAG_DESTROYED));
}
void thr_stallq(struct fridgethr_context *thr_ctx)
{
gsh_xprt_private_t *xu;
struct glist_head *l;
SVCXPRT *xprt;
while (1) {
thread_delay_ms(1000);
PTHREAD_MUTEX_lock(&nfs_req_st.stallq.mtx);
restart:
if (nfs_req_st.stallq.stalled == 0) {
nfs_req_st.stallq.active = false;
PTHREAD_MUTEX_unlock(&nfs_req_st.stallq.mtx);
break;
}
glist_for_each(l, &nfs_req_st.stallq.q) {
xu = glist_entry(l, gsh_xprt_private_t, stallq);
xprt = xu->xprt;
/* handle stalled xprts that idle out */
if (stallq_should_unstall(xprt)) {
/* lock ordering
* (cf. nfs_rpc_cond_stall_xprt) */
PTHREAD_MUTEX_unlock(&nfs_req_st.stallq.mtx);
/* !LOCKED */
LogDebug(COMPONENT_DISPATCH,
"unstalling stalled xprt %p", xprt);
PTHREAD_MUTEX_lock(&xprt->xp_lock);
PTHREAD_MUTEX_lock(&nfs_req_st.stallq.mtx);
/* check that we're still stalled */
if (xu->flags & XPRT_PRIVATE_FLAG_STALLED) {
glist_del(&xu->stallq);
--(nfs_req_st.stallq.stalled);
atomic_clear_uint16_t_bits(&xu->flags,
XPRT_PRIVATE_FLAG_STALLED);
(void)svc_rqst_rearm_events(
xprt, SVC_RQST_FLAG_NONE);
/* drop stallq ref */
gsh_xprt_unref(
xprt, XPRT_PRIVATE_FLAG_LOCKED,
__func__, __LINE__);
}
goto restart;
}
}
PTHREAD_MUTEX_unlock(&nfs_req_st.stallq.mtx);
}
LogDebug(COMPONENT_DISPATCH, "stallq idle, thread exit");
}
static bool nfs_rpc_cond_stall_xprt(SVCXPRT *xprt)
{
gsh_xprt_private_t *xu;
bool activate = false;
uint32_t nreqs = xprt->xp_requests;
/* check per-xprt quota */
if (likely(nreqs < nfs_param.core_param.dispatch_max_reqs_xprt)) {
LogDebug(COMPONENT_DISPATCH,
"xprt %p xp_refs %" PRIu32 " has %" PRIu32
" reqs active (max %d)",
xprt,
xprt->xp_refs,
nreqs,
nfs_param.core_param.dispatch_max_reqs_xprt);
return false;
}
PTHREAD_MUTEX_lock(&xprt->xp_lock);
xu = (gsh_xprt_private_t *) xprt->xp_u1;
/* XXX can't happen */
if (unlikely(xu->flags & XPRT_PRIVATE_FLAG_STALLED)) {
PTHREAD_MUTEX_unlock(&xprt->xp_lock);
LogDebug(COMPONENT_DISPATCH, "xprt %p already stalled (oops)",
xprt);
return true;
}
LogDebug(COMPONENT_DISPATCH, "xprt %p has %u reqs, marking stalled",
xprt, nreqs);
/* ok, need to stall */
PTHREAD_MUTEX_lock(&nfs_req_st.stallq.mtx);
glist_add_tail(&nfs_req_st.stallq.q, &xu->stallq);
++(nfs_req_st.stallq.stalled);
atomic_set_uint16_t_bits(&xu->flags, XPRT_PRIVATE_FLAG_STALLED);
PTHREAD_MUTEX_unlock(&xprt->xp_lock);
/* if no thread is servicing the stallq, start one */
if (!nfs_req_st.stallq.active) {
nfs_req_st.stallq.active = true;
activate = true;
}
PTHREAD_MUTEX_unlock(&nfs_req_st.stallq.mtx);
if (activate) {
int rc = 0;
LogDebug(COMPONENT_DISPATCH, "starting stallq service thread");
rc = fridgethr_submit(req_fridge, thr_stallq,
NULL /* no arg */);
if (rc != 0)
LogCrit(COMPONENT_DISPATCH,
"Failed to start stallq: %d", rc);
}
/* stalled */
return true;
}
void nfs_rpc_queue_init(void)
{
struct fridgethr_params reqparams;
struct req_q_pair *qpair;
int rc = 0;
int ix;
memset(&reqparams, 0, sizeof(struct fridgethr_params));
/**
* @todo Add a configuration parameter to set a max.
*/
reqparams.thr_max = 0;
reqparams.thr_min = 1;
reqparams.thread_delay =
nfs_param.core_param.decoder_fridge_expiration_delay;
reqparams.deferment = fridgethr_defer_block;
reqparams.block_delay =
nfs_param.core_param.decoder_fridge_block_timeout;
/* decoder thread pool */
rc = fridgethr_init(&req_fridge, "decoder", &reqparams);
if (rc != 0)
LogFatal(COMPONENT_DISPATCH,
"Unable to initialize decoder thread pool: %d", rc);
/* queues */
pthread_spin_init(&nfs_req_st.reqs.sp, PTHREAD_PROCESS_PRIVATE);
nfs_req_st.reqs.size = 0;
for (ix = 0; ix < N_REQ_QUEUES; ++ix) {
qpair = &(nfs_req_st.reqs.nfs_request_q.qset[ix]);
qpair->s = req_q_s[ix];
nfs_rpc_q_init(&qpair->producer);
nfs_rpc_q_init(&qpair->consumer);
}
/* waitq */
glist_init(&nfs_req_st.reqs.wait_list);
nfs_req_st.reqs.waiters = 0;
/* stallq */
gsh_mutex_init(&nfs_req_st.stallq.mtx, NULL);
glist_init(&nfs_req_st.stallq.q);
nfs_req_st.stallq.active = false;
nfs_req_st.stallq.stalled = 0;
}
static uint32_t enqueued_reqs;
static uint32_t dequeued_reqs;
uint32_t get_enqueue_count(void)
{
return enqueued_reqs;
}
uint32_t get_dequeue_count(void)
{
return dequeued_reqs;
}
void nfs_rpc_enqueue_req(request_data_t *reqdata)
{
struct req_q_set *nfs_request_q;
struct req_q_pair *qpair;
struct req_q *q;
#if defined(HAVE_BLKIN)
BLKIN_TIMESTAMP(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"enqueue-enter");
#endif
nfs_request_q = &nfs_req_st.reqs.nfs_request_q;
switch (reqdata->rtype) {
case NFS_REQUEST:
LogFullDebug(COMPONENT_DISPATCH,
"enter rq_xid=%u lookahead.flags=%u",
reqdata->r_u.req.svc.rq_xid,
reqdata->r_u.req.lookahead.flags);
if (reqdata->r_u.req.lookahead.flags & NFS_LOOKAHEAD_MOUNT) {
qpair = &(nfs_request_q->qset[REQ_Q_MOUNT]);
break;
}
if (NFS_LOOKAHEAD_HIGH_LATENCY(reqdata->r_u.req.lookahead))
qpair = &(nfs_request_q->qset[REQ_Q_HIGH_LATENCY]);
else
qpair = &(nfs_request_q->qset[REQ_Q_LOW_LATENCY]);
break;
case NFS_CALL:
qpair = &(nfs_request_q->qset[REQ_Q_CALL]);
break;
#ifdef _USE_9P
case _9P_REQUEST:
/* XXX identify high-latency requests and allocate
* to the high-latency queue, as above */
qpair = &(nfs_request_q->qset[REQ_Q_LOW_LATENCY]);
break;
#endif
default:
goto out;
}
/* this one is real, timestamp it
*/
now(&reqdata->time_queued);
/* always append to producer queue */
q = &qpair->producer;
pthread_spin_lock(&q->sp);
glist_add_tail(&q->q, &reqdata->req_q);
++(q->size);
pthread_spin_unlock(&q->sp);
(void) atomic_inc_uint32_t(&enqueued_reqs);
#if defined(HAVE_BLKIN)
/* log the queue depth */
BLKIN_KEYVAL_INTEGER(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"reqs-est",
nfs_rpc_outstanding_reqs_est()
);
BLKIN_TIMESTAMP(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"enqueue-exit");
#endif
LogDebug(COMPONENT_DISPATCH,
"enqueued req, q %p (%s %p:%p) size is %d (enq %u deq %u)",
q, qpair->s, &qpair->producer, &qpair->consumer, q->size,
enqueued_reqs, dequeued_reqs);
/* potentially wakeup some thread */
/* global waitq */
{
wait_q_entry_t *wqe;
/* SPIN LOCKED */
pthread_spin_lock(&nfs_req_st.reqs.sp);
if (nfs_req_st.reqs.waiters) {
wqe = glist_first_entry(&nfs_req_st.reqs.wait_list,
wait_q_entry_t, waitq);
LogFullDebug(COMPONENT_DISPATCH,
"nfs_req_st.reqs.waiters %u signal wqe %p (for q %p)",
nfs_req_st.reqs.waiters, wqe, q);
/* release 1 waiter */
glist_del(&wqe->waitq);
--(nfs_req_st.reqs.waiters);
--(wqe->waiters);
/* ! SPIN LOCKED */
pthread_spin_unlock(&nfs_req_st.reqs.sp);
PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
/* XXX reliable handoff */
wqe->flags |= Wqe_LFlag_SyncDone;
if (wqe->flags & Wqe_LFlag_WaitSync)
pthread_cond_signal(&wqe->lwe.cv);
PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
} else
/* ! SPIN LOCKED */
pthread_spin_unlock(&nfs_req_st.reqs.sp);
}
out:
return;
}
/* static inline */
request_data_t *nfs_rpc_consume_req(struct req_q_pair *qpair)
{
request_data_t *reqdata = NULL;
pthread_spin_lock(&qpair->consumer.sp);
if (qpair->consumer.size > 0) {
reqdata =
glist_first_entry(&qpair->consumer.q, request_data_t,
req_q);
glist_del(&reqdata->req_q);
--(qpair->consumer.size);
pthread_spin_unlock(&qpair->consumer.sp);
goto out;
} else {
char *s = NULL;
uint32_t csize = ~0U;
uint32_t psize = ~0U;
pthread_spin_lock(&qpair->producer.sp);
if (isFullDebug(COMPONENT_DISPATCH)) {
s = (char *)qpair->s;
csize = qpair->consumer.size;
psize = qpair->producer.size;
}
if (qpair->producer.size > 0) {
/* splice */
glist_splice_tail(&qpair->consumer.q,
&qpair->producer.q);
qpair->consumer.size = qpair->producer.size;
qpair->producer.size = 0;
/* consumer.size > 0 */
pthread_spin_unlock(&qpair->producer.sp);
reqdata =
glist_first_entry(&qpair->consumer.q,
request_data_t, req_q);
glist_del(&reqdata->req_q);
--(qpair->consumer.size);
pthread_spin_unlock(&qpair->consumer.sp);
if (s)
LogFullDebug(COMPONENT_DISPATCH,
"try splice, qpair %s consumer qsize=%u producer qsize=%u",
s, csize, psize);
goto out;
}
pthread_spin_unlock(&qpair->producer.sp);
pthread_spin_unlock(&qpair->consumer.sp);
if (s)
LogFullDebug(COMPONENT_DISPATCH,
"try splice, qpair %s consumer qsize=%u producer qsize=%u",
s, csize, psize);
}
out:
return reqdata;
}
request_data_t *nfs_rpc_dequeue_req(nfs_worker_data_t *worker)
{
request_data_t *reqdata = NULL;
struct req_q_set *nfs_request_q = &nfs_req_st.reqs.nfs_request_q;
struct req_q_pair *qpair;
uint32_t ix, slot;
struct timespec timeout;
/* XXX: the following stands in for a more robust/flexible
* weighting function */
/* slot in 1..4 */
retry_deq:
slot = (nfs_rpc_q_next_slot() % 4);
for (ix = 0; ix < 4; ++ix) {
switch (slot) {
case 0:
/* MOUNT */
qpair = &(nfs_request_q->qset[REQ_Q_MOUNT]);
break;
case 1:
/* NFS_CALL */
qpair = &(nfs_request_q->qset[REQ_Q_CALL]);
break;
case 2:
/* LL */
qpair = &(nfs_request_q->qset[REQ_Q_LOW_LATENCY]);
break;
case 3:
/* HL */
qpair = &(nfs_request_q->qset[REQ_Q_HIGH_LATENCY]);
break;
default:
/* not here */
abort();
break;
}
LogFullDebug(COMPONENT_DISPATCH,
"dequeue_req try qpair %s %p:%p", qpair->s,
&qpair->producer, &qpair->consumer);
/* anything? */
reqdata = nfs_rpc_consume_req(qpair);
if (reqdata) {
(void) atomic_inc_uint32_t(&dequeued_reqs);
break;
}
++slot;
slot = slot % 4;
} /* for */
/* wait */
if (!reqdata) {
struct fridgethr_context *ctx =
container_of(worker, struct fridgethr_context, wd);
wait_q_entry_t *wqe = &worker->wqe;
assert(wqe->waiters == 0); /* wqe is not on any wait queue */
PTHREAD_MUTEX_lock(&wqe->lwe.mtx);
wqe->flags = Wqe_LFlag_WaitSync;
wqe->waiters = 1;
/* XXX functionalize */
pthread_spin_lock(&nfs_req_st.reqs.sp);
glist_add_tail(&nfs_req_st.reqs.wait_list, &wqe->waitq);
++(nfs_req_st.reqs.waiters);
pthread_spin_unlock(&nfs_req_st.reqs.sp);
while (!(wqe->flags & Wqe_LFlag_SyncDone)) {
timeout.tv_sec = time(NULL) + 5;
timeout.tv_nsec = 0;
pthread_cond_timedwait(&wqe->lwe.cv, &wqe->lwe.mtx,
&timeout);
if (fridgethr_you_should_break(ctx)) {
/* We are returning;
* so take us out of the waitq */
pthread_spin_lock(&nfs_req_st.reqs.sp);
if (wqe->waitq.next != NULL
|| wqe->waitq.prev != NULL) {
/* Element is still in wqitq,
* remove it */
glist_del(&wqe->waitq);
--(nfs_req_st.reqs.waiters);
--(wqe->waiters);
wqe->flags &=
~(Wqe_LFlag_WaitSync |
Wqe_LFlag_SyncDone);
}
pthread_spin_unlock(&nfs_req_st.reqs.sp);
PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
return NULL;
}
}
/* XXX wqe was removed from nfs_req_st.waitq
* (by signalling thread) */
wqe->flags &= ~(Wqe_LFlag_WaitSync | Wqe_LFlag_SyncDone);
PTHREAD_MUTEX_unlock(&wqe->lwe.mtx);
LogFullDebug(COMPONENT_DISPATCH, "wqe wakeup %p", wqe);
goto retry_deq;
} /* !reqdata */
#if defined(HAVE_BLKIN)
/* thread id */
BLKIN_KEYVAL_INTEGER(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"worker-id",
worker->worker_index
);
BLKIN_TIMESTAMP(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"dequeue-req");
#endif
return reqdata;
}
/**
* @brief Allocate a new request
*
* @param[in] xprt Transport to use
*
* @return New request data
*/
static inline request_data_t *alloc_nfs_request(SVCXPRT *xprt)
{
request_data_t *reqdata = pool_alloc(request_pool);
/* set the request as NFS already-read */
reqdata->rtype = NFS_REQUEST;
/* set up req */
reqdata->r_u.req.svc.rq_xprt = xprt;
reqdata->r_u.req.svc.rq_daddr_len = 0;
reqdata->r_u.req.svc.rq_raddr_len = 0;
return reqdata;
}
static inline void free_nfs_request(request_data_t *reqdata)
{
switch (reqdata->rtype) {
case NFS_REQUEST:
/* dispose RPC header */
if (reqdata->r_u.req.svc.rq_msg)
(void)free_rpc_msg(reqdata->r_u.req.svc.rq_msg);
if (reqdata->r_u.req.svc.rq_auth)
SVCAUTH_RELEASE(reqdata->r_u.req.svc.rq_auth,
&(reqdata->r_u.req.svc));
break;
default:
break;
}
pool_free(request_pool, reqdata);
}
/**
* @brief Helper function to validate rpc calls.
*
* Validate the rpc call as proper program,version, and within range proc
* Reply at svc level on errors. On return false will bypass straight to
* returning error.
*
* @param[in] req Request to validate
*
* @return True if the request is valid, false otherwise.
*/
static bool is_rpc_call_valid(struct svc_req *req)
{
/* This function is only ever called from one point, and the
read-lock is always held at that call. If this changes,
we'll have to pass in the value of rlocked. */
int lo_vers, hi_vers;
if (req->rq_prog == nfs_param.core_param.program[P_NFS]) {
if (req->rq_vers == NFS_V3) {
#ifdef _USE_NFS3
if ((nfs_param.core_param.
core_options & CORE_OPTION_NFSV3)
&& req->rq_proc <= NFSPROC3_COMMIT)
return true;
else
#endif /* _USE_NFS3 */
goto noproc_err;
} else if (req->rq_vers == NFS_V4) {
if ((nfs_param.core_param.
core_options & CORE_OPTION_NFSV4)
&& req->rq_proc <= NFSPROC4_COMPOUND)
return true;
else
goto noproc_err;
} else { /* version error, set the range and throw the error */
lo_vers = NFS_V4;
hi_vers = NFS_V3;
#ifdef _USE_NFS3
if ((nfs_param.core_param.
core_options & CORE_OPTION_NFSV3) != 0)
lo_vers = NFS_V3;
#endif /* _USE_NFS3 */
if ((nfs_param.core_param.
core_options & CORE_OPTION_NFSV4) != 0)
hi_vers = NFS_V4;
goto progvers_err;
}
} else if (req->rq_prog == nfs_param.core_param.program[P_NLM]
#ifdef _USE_NLM
&& ((nfs_param.core_param.core_options & CORE_OPTION_NFSV3)
!= 0)) {
if (req->rq_vers == NLM4_VERS) {
if (req->rq_proc <= NLMPROC4_FREE_ALL)
return true;
else
goto noproc_err;
} else {
lo_vers = NLM4_VERS;
hi_vers = NLM4_VERS;
goto progvers_err;
}
} else if (req->rq_prog == nfs_param.core_param.program[P_MNT]
#endif /* _USE_NLM */
&& ((nfs_param.core_param.core_options & CORE_OPTION_NFSV3)
!= 0)) {
/* Some clients may use the wrong mount version to umount, so
* always allow umount, otherwise only allow request if the
* appropriate mount version is enabled. Also need to allow
* dump and export, so just disallow mount if version not
* supported.
*/
if (req->rq_vers == MOUNT_V3) {
if (req->rq_proc <= MOUNTPROC3_EXPORT)
return true;
else
goto noproc_err;
} else if (req->rq_vers == MOUNT_V1) {
if (req->rq_proc <= MOUNTPROC2_EXPORT
&& req->rq_proc != MOUNTPROC2_MNT)
return true;
else
goto noproc_err;
} else {
lo_vers = MOUNT_V1;
hi_vers = MOUNT_V3;
goto progvers_err;
}
} else if (req->rq_prog
== nfs_param.core_param.program[P_RQUOTA]) {
if (req->rq_vers == RQUOTAVERS) {
if (req->rq_proc <= RQUOTAPROC_SETACTIVEQUOTA)
return true;
else
goto noproc_err;
} else if (req->rq_vers == EXT_RQUOTAVERS) {
if (req->rq_proc <= RQUOTAPROC_SETACTIVEQUOTA)
return true;
else
goto noproc_err;
} else {
lo_vers = RQUOTAVERS;
hi_vers = EXT_RQUOTAVERS;
goto progvers_err;
}
} else { /* No such program */
LogFullDebug(COMPONENT_DISPATCH,
"Invalid Program number #%d",
(int)req->rq_prog);
svcerr_noprog(req->rq_xprt, req);
return false;
}
progvers_err:
LogFullDebug(COMPONENT_DISPATCH,
"Invalid protocol Version #%d for program number #%d",
(int)req->rq_vers,
(int)req->rq_prog);
svcerr_progvers(req->rq_xprt, req, lo_vers, hi_vers);
return false;
noproc_err:
LogFullDebug(COMPONENT_DISPATCH,
"Invalid protocol program number #%d",
(int)req->rq_prog);
svcerr_noproc(req->rq_xprt, req);
return false;
} /* is_rpc_call_valid */
enum xprt_stat thr_decode_rpc_request(void *context, SVCXPRT *xprt)
{
request_data_t *reqdata;
enum auth_stat why;
enum xprt_stat stat = XPRT_IDLE;
bool no_dispatch = false;
bool rlocked = false;
bool enqueued = false;
bool recv_status;
if (!xprt) {
LogCrit(COMPONENT_DISPATCH,
"missing xprt!");
return XPRT_DIED;
}
LogDebug(COMPONENT_DISPATCH,
"%p context %p",
xprt, context);
reqdata = alloc_nfs_request(xprt); /* ! NULL */
#if HAVE_BLKIN
blkin_init_new_trace(&reqdata->r_u.req.svc.bl_trace, "nfs-ganesha",
&xprt->blkin.endp);
#endif
/* pass private context to _recv */
reqdata->r_u.req.svc.rq_context = context;
DISP_RLOCK(xprt);
#if defined(HAVE_BLKIN)
BLKIN_TIMESTAMP(
&reqdata->r_u.req.svc.bl_trace, &xprt->blkin.endp, "pre-recv");
#endif
recv_status = SVC_RECV(xprt, &reqdata->r_u.req.svc);
#if defined(HAVE_BLKIN)
BLKIN_TIMESTAMP(
&reqdata->r_u.req.svc.bl_trace, &xprt->blkin.endp, "post-recv");
BLKIN_KEYVAL_INTEGER(
&reqdata->r_u.req.svc.bl_trace,
&reqdata->r_u.req.xprt->blkin.endp,
"rq-xid",
reqdata->r_u.req.svc.rq_xid);
#endif
LogFullDebug(COMPONENT_DISPATCH,
"SVC_RECV on socket %d returned %s, xid=%u", xprt->xp_fd,
(recv_status) ? "true" : "false",
(recv_status && reqdata->r_u.req.svc.rq_msg)
? reqdata->r_u.req.svc.rq_msg->rm_xid
: 0);
if (unlikely(!recv_status)) {
/* RPC over TCP specific: RPC/UDP's xprt know only one state:
* XPRT_IDLE, because UDP is mostly a stateless protocol.
* With RPC/TCP, they can be XPRT_DIED especially when the
* client closes the peer's socket.
* We have to cope with this aspect in the next lines. Finally,
* xdrrec uses XPRT_MOREREQS to indicate that additional
* records are ready to be consumed immediately. */
/* XXXX */
sockaddr_t addr;
char addrbuf[SOCK_NAME_MAX + 1];
if (isDebug(COMPONENT_DISPATCH)) {
if (copy_xprt_addr(&addr, xprt) == 1)
sprint_sockaddr(&addr, addrbuf,
sizeof(addrbuf));
else
sprintf(addrbuf, "<unresolved>");
}
stat = SVC_STAT(xprt);
DISP_RUNLOCK(xprt);
if (stat == XPRT_IDLE) {
/* typically, a new connection */
LogDebug(COMPONENT_DISPATCH,
"Client on socket=%d, addr=%s has status XPRT_IDLE",
xprt->xp_fd, addrbuf);
} else if (stat == XPRT_DIED) {
LogDebug(COMPONENT_DISPATCH,
"Client on socket=%d, addr=%s disappeared (XPRT_DIED)",
xprt->xp_fd, addrbuf);
} else if (stat == XPRT_MOREREQS) {
/* unexpected case */
LogDebug(COMPONENT_DISPATCH,
"Client on socket=%d, addr=%s has status XPRT_MOREREQS",
xprt->xp_fd, addrbuf);
} else {
LogDebug(COMPONENT_DISPATCH,
"Client on socket=%d, addr=%s has unknown status (%d)",
xprt->xp_fd, addrbuf, stat);
}
goto done;
}
/* XXX so long as nfs_rpc_get_funcdesc calls is_rpc_call_valid
* and fails if that call fails, there is no reason to call that
* function again, below */
if (!is_rpc_call_valid(&reqdata->r_u.req.svc))
goto finish;
reqdata->r_u.req.funcdesc = nfs_rpc_get_funcdesc(&reqdata->r_u.req);
LogFullDebug(COMPONENT_DISPATCH,
"About to authenticate Prog=%d, vers=%d, proc=%d xid=%u xprt=%p",
(int)reqdata->r_u.req.svc.rq_prog,
(int)reqdata->r_u.req.svc.rq_vers,
(int)reqdata->r_u.req.svc.rq_proc,
reqdata->r_u.req.svc.rq_xid,
xprt);
/* If authentication is AUTH_NONE or AUTH_UNIX, then the value of
* no_dispatch remains false and the request proceeds normally.
*
* If authentication is RPCSEC_GSS, no_dispatch may have value true,
* this means that gc->gc_proc != RPCSEC_GSS_DATA and that the message
* is in fact an internal negotiation message from RPCSEC_GSS using
* GSSAPI. It should not be processed by the worker and SVC_STAT
* should be returned to the dispatcher.
*/
why = svc_auth_authenticate(&reqdata->r_u.req.svc,
reqdata->r_u.req.svc.rq_msg,
&no_dispatch);
if (why != AUTH_OK) {
LogInfo(COMPONENT_DISPATCH,
"Could not authenticate request... rejecting with AUTH_STAT=%s",
auth_stat2str(why));
svcerr_auth(xprt, &reqdata->r_u.req.svc, why);
goto finish;
#ifdef _HAVE_GSSAPI
} else if (reqdata->r_u.req.svc.rq_verf.oa_flavor == RPCSEC_GSS) {
struct rpc_gss_cred *gc = (struct rpc_gss_cred *)
reqdata->r_u.req.svc.rq_clntcred;
LogFullDebug(COMPONENT_DISPATCH,
"RPCSEC_GSS no_dispatch=%d gc->gc_proc=(%u) %s",
no_dispatch, gc->gc_proc,
str_gc_proc(gc->gc_proc));
if (no_dispatch)
goto finish;
#endif
}
/*
* Extract RPC argument.
*/
memset(&reqdata->r_u.req.arg_nfs, 0, sizeof(nfs_arg_t));
LogFullDebug(COMPONENT_DISPATCH,
"Before SVC_GETARGS on socket %d, xprt=%p",
xprt->xp_fd, xprt);
if (!SVC_GETARGS(xprt, &reqdata->r_u.req.svc,
reqdata->r_u.req.funcdesc->xdr_decode_func,
&reqdata->r_u.req.arg_nfs,
&reqdata->r_u.req.lookahead)) {
LogInfo(COMPONENT_DISPATCH,
"SVC_GETARGS failed for Program %d, Version %d, Function %d xid=%u",
(int)reqdata->r_u.req.svc.rq_prog,
(int)reqdata->r_u.req.svc.rq_vers,
(int)reqdata->r_u.req.svc.rq_proc,
reqdata->r_u.req.svc.rq_xid);
svcerr_decode(xprt, &reqdata->r_u.req.svc);
goto finish;
}
if (context) {
/* already running worker thread, do not enqueue */
DISP_RUNLOCK(xprt);
nfs_rpc_execute(reqdata);
return XPRT_IDLE;
}
gsh_xprt_ref(xprt, XPRT_PRIVATE_FLAG_INCREQ, __func__, __LINE__);
/* XXX as above, the call has already passed is_rpc_call_valid,
* the former check here is removed. */
nfs_rpc_enqueue_req(reqdata);
enqueued = true;
finish:
stat = SVC_STAT(xprt);
DISP_RUNLOCK(xprt);
done:
/* if recv failed, request is not enqueued */
if (!enqueued)
free_nfs_request(reqdata);
return stat;
}
static inline bool thr_continue_decoding(SVCXPRT *xprt, enum xprt_stat stat)
{
if (unlikely(xprt->xp_requests
> nfs_param.core_param.dispatch_max_reqs_xprt))
return false;
return (stat == XPRT_MOREREQS);
}
void thr_decode_rpc_requests(struct fridgethr_context *thr_ctx)
{
enum xprt_stat stat;
SVCXPRT *xprt = (SVCXPRT *) thr_ctx->arg;
LogFullDebug(COMPONENT_RPC, "enter xprt=%p", xprt);
do {
stat = thr_decode_rpc_request(NULL, xprt);
} while (thr_continue_decoding(xprt, stat));
LogDebug(COMPONENT_DISPATCH, "exiting, stat=%s", xprt_stat_s[stat]);
/* order MUST be SVC_DESTROY, gsh_xprt_unref
* (current refcnt balancing) */
if (stat != XPRT_DIED)
(void)svc_rqst_rearm_events(xprt, SVC_RQST_FLAG_NONE);
else
SVC_DESTROY(xprt);
/* update accounting, clear decoding flag */
gsh_xprt_unref(xprt, XPRT_PRIVATE_FLAG_DECODING, __func__, __LINE__);
}
static bool nfs_rpc_getreq_ng(SVCXPRT *xprt /*, int chan_id */)
{
/* Ok, in the new world, TI-RPC's job is merely to tell us there is
* activity on a specific xprt handle.
*
* Note that we have a builtin mechanism to bind, unbind, and (in
* response to connect events, through a new callout made from within
* the rendezvous in vc xprts) rebind/rebalance xprt handles to
* independent event channels, each with their own platform event
* demultiplexer. The current callout is one event (request, or, if
* applicable, new vc connect) on the active xprt handle xprt.
*
* We are a blocking call from the svc_run thread specific to our
* current event channel (whatever it is). Our goal is to hand off
* processing of xprt to a request dispatcher thread as quickly as
* possible, to minimize latency of all xprts on this channel.
*
* Next, the preferred dispatch thread should be, I speculate, one
* which has (most) recently handled a request for this xprt.
*/
/*
* UDP RPCs are quite simple: everything comes to the same socket, so
* several SVCXPRT can be defined, one per tbuf to handle the stuff
* TCP RPCs are more complex:
* - a unique SVCXPRT exists that deals with initial tcp rendez vous.
* It does the accept with the client, but recv no message from the
* client. But SVC_RECV on it creates a new SVCXPRT dedicated to the
* client. This specific SVXPRT is bound on TCPSocket
*
* while receiving, I must know if this is a UDP request, an initial TCP
* request or a TCP socket from an already connected client.
*
* This is how to distinguish the cases:
* UDP connections are bound to socket NFS_UDPSocket
* TCP initial connections are bound to socket NFS_TCPSocket
* all the other cases are requests from already connected TCP Clients
*/
/* The following actions are now purely diagnostic, the only side effect
* is a message to the log. */
int code = 0;
int rpc_fd = xprt->xp_fd;
uint32_t nreqs;
LogFullDebug(COMPONENT_RPC, "enter xprt=%p", xprt);
if (udp_socket[P_NFS] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH, "A NFS UDP request fd %d",
rpc_fd);
else if (udp_socket[P_MNT] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH, "A MOUNT UDP request %d",
rpc_fd);
else if (udp_socket[P_NLM] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH, "A NLM UDP request %d",
rpc_fd);
else if (udp_socket[P_RQUOTA] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH, "A RQUOTA UDP request %d",
rpc_fd);
else if (tcp_socket[P_NFS] == rpc_fd) {
/* In this case, the SVC_RECV only produces a new connected
* socket (it does just a call to accept) */
LogFullDebug(COMPONENT_DISPATCH,
"An initial NFS TCP request from a new client %d",
rpc_fd);
} else if (tcp_socket[P_MNT] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH,
"An initial MOUNT TCP request from a new client %d",
rpc_fd);
else if (tcp_socket[P_NLM] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH,
"An initial NLM request from a new client %d",
rpc_fd);
else if (tcp_socket[P_RQUOTA] == rpc_fd)
LogFullDebug(COMPONENT_DISPATCH,
"An initial RQUOTA request from a new client %d",
rpc_fd);
else
LogFullDebug(COMPONENT_DISPATCH,
"An NFS TCP request from an already connected client %d",
rpc_fd);
/* XXX
* Decoder backpressure. There are multiple considerations here.
* One is to avoid decoding if doing so would cause the server to exceed
* global resource constraints. Another is to adjust flow parameters on
* underlying network resources, to avoid moving the problem back into
* the kernel. The latter requires continuous, but low overhead, flow
* measurement with hysteretic control. For now, just do global and
* per-xprt request quotas.
*/
/* check max outstanding quota */
nreqs = nfs_rpc_outstanding_reqs_est();
if (unlikely(nreqs > nfs_param.core_param.dispatch_max_reqs)) {
/* request queue is flow controlled */
LogDebug(COMPONENT_DISPATCH,
"global outstanding reqs quota exceeded (have %u, allowed %u)",
nreqs, nfs_param.core_param.dispatch_max_reqs);
thread_delay_ms(5); /* don't busy-wait */
(void)svc_rqst_rearm_events(xprt, SVC_RQST_FLAG_NONE);
SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
goto out;
}
LogFullDebug(COMPONENT_RPC, "before decoder guard %p", xprt);
/* clock duplicate, queued+stalled wakeups, queued wakeups */
if (!gsh_xprt_decoder_guard(xprt, XPRT_PRIVATE_FLAG_NONE)) {
LogFullDebug(COMPONENT_RPC, "already decoding %p", xprt);
thread_delay_ms(5);
(void)svc_rqst_rearm_events(xprt, SVC_RQST_FLAG_NONE);
SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
goto out;
}
LogFullDebug(COMPONENT_RPC, "before cond stall %p", xprt);
/* Check per-xprt max outstanding quota */
if (nfs_rpc_cond_stall_xprt(xprt)) {
/* Xprt stalled--bail. Stall queue owns xprt ref and state. */
LogDebug(COMPONENT_DISPATCH, "stalled, bail");
/* clear decoding flag */
gsh_xprt_clear_flag(xprt, XPRT_PRIVATE_FLAG_DECODING);
goto out;
}
LogFullDebug(COMPONENT_DISPATCH, "before fridgethr_get");
/* schedule a thread to decode */
code = fridgethr_submit(req_fridge, thr_decode_rpc_requests, xprt);
if (code == ETIMEDOUT) {
LogFullDebug(COMPONENT_RPC,
"Decode dispatch timed out, rearming. xprt=%p",
xprt);
(void)svc_rqst_rearm_events(xprt, SVC_RQST_FLAG_NONE);
gsh_xprt_unref(xprt, XPRT_PRIVATE_FLAG_DECODING, __func__,
__LINE__);
} else if (code != 0) {
LogMajor(COMPONENT_DISPATCH, "Unable to get decode thread: %d",
code);
}
LogFullDebug(COMPONENT_DISPATCH, "after fridgethr_get");
out:
return true;
}
/**
* @brief Thread used to service an (epoll, etc) event channel.
*
* @param[in] arg Poitner to ID of the associated event channel
*
* @return Pointer to the result (but this function will mostly loop forever).
*
*/
static void *rpc_dispatcher_thread(void *arg)
{
int32_t chan_id = *((int32_t *) arg);
SetNameFunction("disp");
/* Calling dispatcher main loop */
LogInfo(COMPONENT_DISPATCH, "Entering nfs/rpc dispatcher");
LogDebug(COMPONENT_DISPATCH, "My pthread id is %p",
(caddr_t) pthread_self());
svc_rqst_thrd_run(chan_id, SVC_RQST_FLAG_NONE);
return NULL;
} /* rpc_dispatcher_thread */