blob: 6c99927f998e780849b9cbc83f7e79c6782d9321 [file] [log] [blame]
/*
* Copyright (c) 2012 Linux Box Corporation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR `AS IS'' AND ANY EXPRESS OR
* IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
* IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
* NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
* THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <config.h>
#include <sys/types.h>
#include <stdint.h>
#include <assert.h>
#if !defined(_WIN32)
#include <err.h>
#endif
#include <errno.h>
#include <rpc/types.h>
#include <reentrant.h>
#include <misc/portable.h>
#include <signal.h>
#include <rpc/xdr.h>
#include <rpc/rpc.h>
#include <rpc/svc.h>
#include "rpc_com.h"
#include <misc/rbtree_x.h>
#include "clnt_internal.h"
#include "rpc_dplx_internal.h"
#include "rpc_ctx.h"
#define tv_to_ms(tv) (1000 * ((tv)->tv_sec) + (tv)->tv_usec/1000)
rpc_ctx_t *
alloc_rpc_call_ctx(CLIENT *clnt, rpcproc_t proc, xdrproc_t xdr_args,
void *args_ptr, xdrproc_t xdr_results,
void *results_ptr, struct timeval timeout)
{
struct x_vc_data *xd = (struct x_vc_data *)clnt->cl_p1;
struct rpc_dplx_rec *rec = xd->rec;
rpc_ctx_t *ctx;
ctx = mem_alloc(sizeof(rpc_ctx_t));
if (!ctx)
goto out;
/* potects this */
mutex_init(&ctx->we.mtx, NULL);
cond_init(&ctx->we.cv, 0, NULL);
/* rec->calls and rbtree protected by (adaptive) mtx */
REC_LOCK(rec);
/* XXX we hold the client-fd lock */
ctx->xid = ++(xd->cx.calls.xid);
/* some of this looks like overkill; it's here to support future,
* fully async calls */
ctx->ctx_u.clnt.clnt = clnt;
ctx->ctx_u.clnt.timeout.tv_sec = 0;
ctx->ctx_u.clnt.timeout.tv_nsec = 0;
timespec_addms(&ctx->ctx_u.clnt.timeout, tv_to_ms(&timeout));
ctx->msg = alloc_rpc_msg();
ctx->flags = 0;
/* stash it */
if (opr_rbtree_insert(&xd->cx.calls.t, &ctx->node_k)) {
__warnx(TIRPC_DEBUG_FLAG_RPC_CTX,
"%s: call ctx insert failed (xid %d client %p)",
__func__, ctx->xid, clnt);
REC_UNLOCK(rec);
mutex_destroy(&ctx->we.mtx);
cond_destroy(&ctx->we.cv);
mem_free(ctx, sizeof(rpc_ctx_t));
ctx = NULL;
goto out;
}
REC_UNLOCK(rec);
out:
return (ctx);
}
void
rpc_ctx_next_xid(rpc_ctx_t *ctx, uint32_t flags)
{
struct x_vc_data *xd = (struct x_vc_data *)ctx->ctx_u.clnt.clnt->cl_p1;
struct rpc_dplx_rec *rec = xd->rec;
assert(flags & RPC_CTX_FLAG_LOCKED);
REC_LOCK(rec);
opr_rbtree_remove(&xd->cx.calls.t, &ctx->node_k);
ctx->xid = ++(xd->cx.calls.xid);
if (opr_rbtree_insert(&xd->cx.calls.t, &ctx->node_k)) {
REC_UNLOCK(rec);
__warnx(TIRPC_DEBUG_FLAG_RPC_CTX,
"%s: call ctx insert failed (xid %d client %p)",
__func__, ctx->xid, ctx->ctx_u.clnt.clnt);
goto out;
}
REC_UNLOCK(rec);
out:
return;
}
bool
rpc_ctx_xfer_replymsg(struct x_vc_data *xd, struct rpc_msg *msg)
{
rpc_ctx_t ctx_k, *ctx;
struct opr_rbtree_node *nv;
rpc_dplx_lock_t *lk = &xd->rec->recv.lock;
ctx_k.xid = msg->rm_xid;
REC_LOCK(xd->rec);
nv = opr_rbtree_lookup(&xd->cx.calls.t, &ctx_k.node_k);
if (nv) {
ctx = opr_containerof(nv, rpc_ctx_t, node_k);
opr_rbtree_remove(&xd->cx.calls.t, &ctx->node_k);
free_rpc_msg(ctx->msg); /* free call header */
ctx->msg = msg; /* and stash reply header */
ctx->flags |= RPC_CTX_FLAG_SYNCDONE;
REC_UNLOCK(xd->rec);
cond_signal(&lk->we.cv); /* XXX we hold lk->we.mtx */
/* now, we must ourselves wait for the other side to run */
while (!(ctx->flags & RPC_CTX_FLAG_ACKSYNC))
cond_wait(&lk->we.cv, &lk->we.mtx);
/* ctx-specific signal--indicates we will make no further
* references to ctx whatsoever */
mutex_lock(&ctx->we.mtx);
ctx->flags &= ~RPC_CTX_FLAG_WAITSYNC;
cond_signal(&ctx->we.cv);
mutex_unlock(&ctx->we.mtx);
return (true);
}
REC_UNLOCK(xd->rec);
return (false);
}
int
rpc_ctx_wait_reply(rpc_ctx_t *ctx, uint32_t flags)
{
struct x_vc_data *xd = (struct x_vc_data *)ctx->ctx_u.clnt.clnt->cl_p1;
struct rpc_dplx_rec *rec = xd->rec;
rpc_dplx_lock_t *lk = &rec->recv.lock;
struct timespec ts;
int code = 0;
/* we hold recv channel lock */
ctx->flags |= RPC_CTX_FLAG_WAITSYNC;
while (!(ctx->flags & RPC_CTX_FLAG_SYNCDONE)) {
(void)clock_gettime(CLOCK_REALTIME_FAST, &ts);
timespecadd(&ts, &ctx->ctx_u.clnt.timeout);
code = cond_timedwait(&lk->we.cv, &lk->we.mtx, &ts);
/* if we timed out, check for xprt destroyed (no more
* receives) */
if (code == ETIMEDOUT) {
SVCXPRT *xprt = rec->hdl.xprt;
uint32_t xp_flags;
/* dequeue the call */
REC_LOCK(rec);
opr_rbtree_remove(&xd->cx.calls.t, &ctx->node_k);
REC_UNLOCK(rec);
mutex_lock(&xprt->xp_lock);
xp_flags = xprt->xp_flags;
mutex_unlock(&xprt->xp_lock);
if (xp_flags & SVC_XPRT_FLAG_DESTROYED) {
/* XXX should also set error.re_why, but the
* facility is not well developed. */
ctx->error.re_status = RPC_TIMEDOUT;
}
ctx->flags &= ~RPC_CTX_FLAG_WAITSYNC;
goto out;
}
}
ctx->flags &= ~RPC_CTX_FLAG_SYNCDONE;
/* switch on direction */
switch (ctx->msg->rm_direction) {
case REPLY:
if (ctx->msg->rm_xid == ctx->xid)
return (RPC_SUCCESS);
break;
case CALL:
/* XXX cond transfer control to svc */
/* */
break;
default:
break;
}
out:
return (code);
}
void
rpc_ctx_ack_xfer(rpc_ctx_t *ctx)
{
struct x_vc_data *xd = (struct x_vc_data *)ctx->ctx_u.clnt.clnt->cl_p1;
rpc_dplx_lock_t *lk = &xd->rec->recv.lock;
ctx->flags |= RPC_CTX_FLAG_ACKSYNC;
cond_signal(&lk->we.cv); /* XXX we hold lk->we.mtx */
}
void
free_rpc_call_ctx(rpc_ctx_t *ctx, uint32_t flags)
{
struct x_vc_data *xd = (struct x_vc_data *)ctx->ctx_u.clnt.clnt->cl_p1;
struct rpc_dplx_rec *rec = xd->rec;
struct timespec ts;
/* wait for commit of any xfer (ctx specific) */
mutex_lock(&ctx->we.mtx);
if (ctx->flags & RPC_CTX_FLAG_WAITSYNC) {
/* WAITSYNC is already cleared if the call timed out, but it is
* incorrect to wait forever */
(void)clock_gettime(CLOCK_REALTIME_FAST, &ts);
timespecadd(&ts, &ctx->ctx_u.clnt.timeout);
(void)cond_timedwait(&ctx->we.cv, &ctx->we.mtx, &ts);
}
REC_LOCK(rec);
opr_rbtree_remove(&xd->cx.calls.t, &ctx->node_k);
/* interlock */
mutex_unlock(&ctx->we.mtx);
REC_UNLOCK(rec);
if (ctx->msg)
free_rpc_msg(ctx->msg);
mutex_destroy(&ctx->we.mtx);
cond_destroy(&ctx->we.cv);
mem_free(ctx, sizeof(rpc_ctx_t));
}