blob: dd9edf094d25cf37ed7186f26ef2f74cb30accdc [file] [log] [blame]
/*
* Copyright (c) 2013 Linux Box Corporation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <sys/cdefs.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/param.h>
#include <sys/poll.h>
#include <sys/un.h>
#include <sys/time.h>
#include <sys/uio.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <assert.h>
#include <err.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <misc/timespec.h>
#include <rpc/types.h>
#include <misc/portable.h>
#include <rpc/rpc.h>
#include <rpc/svc.h>
#include <rpc/svc_auth.h>
#include <intrinsic.h>
#include "rpc_com.h"
#include "clnt_internal.h"
#include "svc_internal.h"
#include "svc_xprt.h"
#include "rpc_dplx_internal.h"
#include "rpc_ctx.h"
#include <rpc/svc_rqst.h>
#include <rpc/xdr_inrec.h>
#include <rpc/xdr_ioq.h>
#include <getpeereid.h>
#include <misc/opr.h>
#include "svc_ioq.h"
static inline void
cfconn_set_dead(SVCXPRT *xprt, struct x_vc_data *xd)
{
mutex_lock(&xprt->xp_lock);
xd->sx.strm_stat = XPRT_DIED;
mutex_unlock(&xprt->xp_lock);
}
#define LAST_FRAG ((u_int32_t)(1 << 31))
#define MAXALLOCA (256)
static inline void
ioq_flushv(SVCXPRT *xprt, struct x_vc_data *xd, struct xdr_ioq *xioq)
{
struct iovec *iov, *tiov, *wiov;
struct poolq_entry *have;
struct xdr_ioq_uv *data;
ssize_t result;
u_int32_t frag_header;
u_int32_t fbytes;
u_int32_t remaining = 0;
u_int32_t vsize = (xioq->ioq_uv.uvqh.qcount + 1) * sizeof(struct iovec);
int iw = 0;
int ix = 1;
if (unlikely(vsize > MAXALLOCA)) {
iov = mem_alloc(vsize);
} else {
iov = alloca(vsize);
}
wiov = iov; /* position at initial fragment header */
/* update the most recent data length, just in case */
xdr_tail_update(xioq->xdrs);
/* build list after initial fragment header (ix = 1 above) */
TAILQ_FOREACH(have, &(xioq->ioq_uv.uvqh.qh), q) {
data = IOQ_(have);
tiov = iov + ix;
tiov->iov_base = data->v.vio_head;
tiov->iov_len = ioquv_length(data);
remaining += tiov->iov_len;
ix++;
}
while (remaining > 0) {
if (iw == 0) {
/* new fragment header, determine last iov */
fbytes = 0;
for (tiov = &wiov[++iw];
(tiov < &iov[ix]) && (iw < __svc_maxiov);
++tiov, ++iw) {
fbytes += tiov->iov_len;
/* check for fragment value overflow */
/* never happens, see ganesha FSAL_MAXIOSIZE */
if (unlikely(fbytes >= LAST_FRAG)) {
fbytes -= tiov->iov_len;
break;
}
} /* for */
/* fragment length doesn't include fragment header */
if (&wiov[iw] < &iov[ix]) {
frag_header = htonl((u_int32_t) (fbytes));
} else {
frag_header = htonl((u_int32_t) (fbytes | LAST_FRAG));
}
wiov->iov_base = &(frag_header);
wiov->iov_len = sizeof(u_int32_t);
/* writev return includes fragment header */
remaining += sizeof(u_int32_t);
fbytes += sizeof(u_int32_t);
}
/* blocking write */
result = writev(xprt->xp_fd, wiov, iw);
remaining -= result;
if (result == fbytes) {
wiov += iw - 1;
iw = 0;
continue;
}
if (unlikely(result < 0)) {
__warnx(TIRPC_DEBUG_FLAG_ERROR,
"%s() writev failed (%d)\n",
__func__, errno);
cfconn_set_dead(xprt, xd);
break;
}
fbytes -= result;
/* rare? writev underrun? (assume never overrun) */
for (tiov = wiov; iw > 0; ++tiov, --iw) {
if (tiov->iov_len > result) {
tiov->iov_len -= result;
tiov->iov_base += result;
wiov = tiov;
break;
} else {
result -= tiov->iov_len;
}
} /* for */
} /* while */
if (unlikely(vsize > MAXALLOCA)) {
mem_free(iov, vsize);
}
}
static void
svc_ioq_callback(struct work_pool_entry *wpe)
{
struct x_vc_data *xd = (struct x_vc_data *)wpe;
SVCXPRT *xprt = (SVCXPRT *)wpe->arg;
struct poolq_entry *have;
struct xdr_ioq *xioq;
/* qmutex more fine grained than xp_lock */
for (;;) {
mutex_lock(&xd->shared.ioq.qmutex);
have = TAILQ_FIRST(&xd->shared.ioq.qh);
if (unlikely(!have)) {
xd->shared.active = false;
mutex_unlock(&xd->shared.ioq.qmutex);
SVC_RELEASE(xprt, SVC_RELEASE_FLAG_NONE);
return;
}
TAILQ_REMOVE(&xd->shared.ioq.qh, have, q);
(xd->shared.ioq.qcount)--;
/* do i/o unlocked */
mutex_unlock(&xd->shared.ioq.qmutex);
xioq = _IOQ(have);
if (svc_work_pool.params.thrd_max
&& !(xprt->xp_flags & SVC_XPRT_FLAG_DESTROYED)) {
/* all systems are go! */
ioq_flushv(xprt, xd, xioq);
}
XDR_DESTROY(xioq->xdrs);
}
return;
}
void
svc_ioq_append(SVCXPRT *xprt, struct x_vc_data *xd, XDR *xdrs)
{
if (unlikely(!svc_work_pool.params.thrd_max
|| (xprt->xp_flags & SVC_XPRT_FLAG_DESTROYED))) {
/* discard */
XDR_DESTROY(xdrs);
return;
}
/* qmutex more fine grained than xp_lock */
mutex_lock(&xd->shared.ioq.qmutex);
(xd->shared.ioq.qcount)++;
TAILQ_INSERT_TAIL(&xd->shared.ioq.qh, &(XIOQ(xdrs)->ioq_s), q);
if (!xd->shared.active) {
xd->shared.active = true;
xd->wpe.fun = svc_ioq_callback;
xd->wpe.arg = xprt;
mutex_unlock(&xd->shared.ioq.qmutex);
SVC_REF(xprt, SVC_REF_FLAG_NONE);
work_pool_submit(&svc_work_pool, &xd->wpe);
} else {
/* queuing multiple output requests for worker efficiency */
mutex_unlock(&xd->shared.ioq.qmutex);
}
}