| /* |
| * Copyright (c) 2012-2014 CEA |
| * contributeur : Dominique Martinet <dominique.martinet@cea.fr> |
| * contributeur : William Allen Simpson <bill@cohortfs.com> |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * - Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * - Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * - Neither the name of Sun Microsystems, Inc. nor the names of its |
| * contributors may be used to endorse or promote products derived |
| * from this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| /** |
| * \file rpc_rdma.c |
| * \brief rdma helper |
| * |
| * This was (very) loosely based on the Mooshika library, which in turn |
| * was a mix of diod, rping (librdmacm/examples), and Linux kernel's |
| * net/9p/trans_rdma.c (dual BSD/GPL license). No vestiges remain. |
| */ |
| |
| #if HAVE_CONFIG_H |
| # include <config.h> |
| #endif |
| |
| #include <stdio.h> //printf |
| #include <limits.h> //INT_MAX |
| #include <sys/socket.h> //sockaddr |
| #include <sys/un.h> //sockaddr_un |
| #include <pthread.h> //pthread_* (think it's included by another one) |
| #include <semaphore.h> //sem_* (is it a good idea to mix sem and pthread_cond/mutex?) |
| #include <arpa/inet.h> //inet_ntop |
| #include <netinet/in.h> //sock_addr_in |
| #include <unistd.h> //fcntl |
| #include <fcntl.h> //fcntl |
| #include <sys/epoll.h> |
| |
| #define EPOLL_SIZE (10) |
| /*^ expected number of fd, must be > 0 */ |
| #define EPOLL_EVENTS (16) |
| /*^ maximum number of events per poll */ |
| #define EPOLL_WAIT_MS (1000) |
| /*^ ms check for rpc_rdma_state.run_count (was 100) */ |
| #define IBV_POLL_EVENTS (16) |
| /*^ maximum number of events per poll */ |
| #define NSEC_IN_SEC (1000*1000*1000) |
| |
| #include "misc/portable.h" |
| #include <rdma/rdma_cma.h> |
| #include <rpc/types.h> |
| #include <rpc/xdr.h> |
| #include <rpc/xdr_ioq.h> |
| #include <rpc/rpc.h> |
| |
| #include "misc/abstract_atomic.h" |
| #include "rpc_rdma.h" |
| |
| #ifdef HAVE_VALGRIND_MEMCHECK_H |
| # include <valgrind/memcheck.h> |
| # ifndef VALGRIND_MAKE_MEM_DEFINED |
| # warning "Valgrind support requested, but VALGRIND_MAKE_MEM_DEFINED not available" |
| # endif |
| #endif |
| #ifndef VALGRIND_MAKE_MEM_DEFINED |
| # define VALGRIND_MAKE_MEM_DEFINED(addr, len) |
| #endif |
| |
| /** defaults **/ |
| #define WORKER_STACK_SIZE (65535) /* was 2116488 */ |
| |
| struct connection_requests { |
| struct rdma_cm_id **id_queue; |
| sem_t q_sem; |
| sem_t u_sem; |
| uint32_t q_head; |
| uint32_t q_tail; |
| u_int q_size; |
| }; |
| |
| struct rpc_rdma_state { |
| LIST_HEAD(pdh_s, rpc_rdma_pd) pdh; /**< Protection Domain list */ |
| mutex_t lock; |
| |
| struct connection_requests c_r; /* never freed??? */ |
| |
| pthread_t cm_thread; /**< Thread id for connection manager */ |
| pthread_t cq_thread; /**< Thread id for completion queue */ |
| pthread_t stats_thread; |
| |
| int cm_epollfd; |
| int cq_epollfd; |
| int stats_epollfd; |
| |
| int32_t run_count; |
| }; |
| |
| /* GLOBAL VARIABLES */ |
| |
| static struct rpc_rdma_state rpc_rdma_state; |
| |
| void |
| rpc_rdma_internals_init(void) |
| { |
| memset(&rpc_rdma_state, 0, sizeof(rpc_rdma_state)); |
| |
| sem_init(&rpc_rdma_state.c_r.q_sem, 0, 0); |
| |
| mutex_init(&rpc_rdma_state.lock, NULL); |
| LIST_INIT(&rpc_rdma_state.pdh); |
| } |
| |
| static void |
| rpc_rdma_internals_join(void) |
| { |
| if (rpc_rdma_state.cm_thread) { |
| pthread_join(rpc_rdma_state.cm_thread, NULL); |
| rpc_rdma_state.cm_thread = 0; |
| } |
| if (rpc_rdma_state.cq_thread) { |
| pthread_join(rpc_rdma_state.cq_thread, NULL); |
| rpc_rdma_state.cq_thread = 0; |
| } |
| if (rpc_rdma_state.stats_thread) { |
| pthread_join(rpc_rdma_state.stats_thread, NULL); |
| rpc_rdma_state.stats_thread = 0; |
| } |
| } |
| |
| void |
| rpc_rdma_internals_fini(void) |
| { |
| rpc_rdma_state.run_count = 0; |
| rpc_rdma_internals_join(); |
| |
| sem_destroy(&rpc_rdma_state.c_r.q_sem); |
| sem_destroy(&rpc_rdma_state.c_r.u_sem); |
| |
| mutex_destroy(&rpc_rdma_state.lock); |
| } |
| |
| /* forward declarations */ |
| |
| static int rpc_rdma_bind_server(RDMAXPRT *xprt); |
| static struct xp_ops rpc_rdma_ops; |
| |
| /* UTILITY FUNCTIONS */ |
| |
| /** |
| * rpc_rdma_pd_by_verbs: register the protection domain for a given xprt |
| * |
| * Since the protection domains seem to be related to the interfaces, |
| * and those are hot-swappable and variable, need a dynamic list of them. |
| * (Cannot reliably count them during initialization.) |
| * |
| * LFU the list to keep access near O(1). |
| * |
| * @param[IN] xprt connection handle |
| * |
| * @return 0, ENOSPC, or EINVAL. |
| */ |
| static int |
| rpc_rdma_pd_by_verbs(RDMAXPRT *xprt) |
| { |
| struct rpc_rdma_pd *pd; |
| struct rpc_rdma_pd *lf; |
| int rc; |
| |
| if (!xprt->cm_id) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() transport missing cm_id", |
| __func__); |
| return EINVAL; |
| } |
| if (!xprt->cm_id->verbs) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() cm_id missing verbs", |
| __func__); |
| /* return EINVAL; legal value for dispatcher??? */ |
| } |
| |
| if (xprt->pd && xprt->pd->context == xprt->cm_id->verbs) { |
| atomic_inc_uint32_t(&xprt->pd->pd_used); |
| return (0); |
| } |
| |
| mutex_lock(&rpc_rdma_state.lock); |
| LIST_FOREACH(pd, &rpc_rdma_state.pdh, pdl) { |
| if (pd->context == xprt->cm_id->verbs) { |
| atomic_inc_uint32_t(&pd->pd_used); |
| lf = LIST_FIRST(&rpc_rdma_state.pdh); |
| if (pd->pd_used > lf->pd_used) { |
| LIST_REMOVE(pd, pdl); |
| LIST_INSERT_HEAD(&rpc_rdma_state.pdh, pd, pdl); |
| } |
| mutex_unlock(&rpc_rdma_state.lock); |
| xprt->pd = pd; |
| return (0); |
| } |
| } |
| |
| pd = mem_zalloc(sizeof(*pd)); |
| rc = 0; |
| pd->context = xprt->cm_id->verbs; |
| pd->pd_used = 1; |
| |
| LIST_INSERT_HEAD(&rpc_rdma_state.pdh, pd, pdl); |
| mutex_unlock(&rpc_rdma_state.lock); |
| xprt->pd = pd; |
| return rc; |
| } |
| |
| /** |
| * rpc_rdma_pd_get: register and setup the protection domain |
| * |
| * @param[IN] xprt connection handle |
| * |
| * @return 0, errno, ENOSPC, or EINVAL. |
| */ |
| static int |
| rpc_rdma_pd_get(RDMAXPRT *xprt) |
| { |
| int rc = rpc_rdma_pd_by_verbs(xprt); |
| |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return rc; |
| } |
| if (!xprt->pd->pd) { |
| xprt->pd->pd = ibv_alloc_pd(xprt->cm_id->verbs); |
| if (!xprt->pd->pd) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] ibv_alloc_pd failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| } |
| return (0); |
| } |
| |
| /** |
| * rpc_rdma_pd_put: de-register and teardown the protection domain |
| * |
| * @param[IN] xprt connection handle |
| */ |
| static inline void |
| rpc_rdma_pd_put(RDMAXPRT *xprt) |
| { |
| if (!xprt->pd) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() missing protection domain?", |
| __func__); |
| return; |
| } |
| |
| if (atomic_dec_uint32_t(&xprt->pd->pd_used) == 0) { |
| mutex_lock(&rpc_rdma_state.lock); |
| LIST_REMOVE(xprt->pd, pdl); |
| mutex_unlock(&rpc_rdma_state.lock); |
| |
| if (xprt->pd->pd) |
| ibv_dealloc_pd(xprt->pd->pd); |
| xprt->pd->pd = NULL; |
| |
| if (xprt->pd->srq) |
| ibv_destroy_srq(xprt->pd->srq); |
| xprt->pd->srq = NULL; |
| |
| if (!TAILQ_EMPTY(&xprt->pd->srqh.qh)) { |
| xdr_ioq_destroy_pool(&xprt->pd->srqh); |
| } |
| mem_free(xprt->pd, sizeof(*xprt->pd)); |
| } |
| xprt->pd = NULL; |
| } |
| |
| #ifdef UNUSED |
| void |
| rpc_rdma_print_devinfo(RDMAXPRT *xprt) |
| { |
| struct ibv_device_attr device_attr; |
| ibv_query_device(xprt->cm_id->verbs, &device_attr); |
| uint64_t node_guid = ntohll(device_attr.node_guid); |
| printf("guid: %04x:%04x:%04x:%04x\n", |
| (unsigned) (node_guid >> 48) & 0xffff, |
| (unsigned) (node_guid >> 32) & 0xffff, |
| (unsigned) (node_guid >> 16) & 0xffff, |
| (unsigned) (node_guid >> 0) & 0xffff); |
| } |
| #endif /* UNUSED */ |
| |
| /** |
| * rpc_rdma_thread_create: Simple wrapper around pthread_create |
| */ |
| static int |
| rpc_rdma_thread_create(pthread_t *thrid, size_t stacksize, |
| void *(*routine)(void *), void *arg) |
| { |
| |
| pthread_attr_t attr; |
| int rc; |
| |
| /* Init for thread parameter (mostly for scheduling) */ |
| rc = pthread_attr_init(&attr); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() can't init pthread's attributes: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() can't set pthread's scope: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() can't set pthread's join state: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = pthread_attr_setstacksize(&attr, stacksize); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() can't set pthread's stack size: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| return pthread_create(thrid, &attr, routine, arg); |
| } |
| |
| static int |
| rpc_rdma_thread_create_epoll(pthread_t *thrid, void *(*routine)(void *), |
| void *arg, int *epollfd) |
| { |
| int rc = 0; |
| |
| /* all calls set a thrid in rpc_rdma_state, but unlikely conflict */ |
| mutex_lock(&rpc_rdma_state.lock); |
| |
| if (*thrid == 0) do { |
| *epollfd = epoll_create(EPOLL_SIZE); |
| if (*epollfd == -1) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() epoll_create failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| break; |
| } |
| |
| rc = rpc_rdma_thread_create(thrid, WORKER_STACK_SIZE, |
| routine, arg); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() could not create thread: %s (%d)", |
| __func__, strerror(rc), rc); |
| *thrid = 0; |
| break; |
| } |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() thread %lx spawned for epoll %d", |
| __func__, |
| (unsigned long)*thrid, |
| *epollfd); |
| } while (0); |
| |
| mutex_unlock(&rpc_rdma_state.lock); |
| return rc; |
| } |
| |
| static void |
| rpc_rdma_worker_callback(struct work_pool_entry *wpe) |
| { |
| struct rpc_rdma_cbc *cbc = |
| opr_containerof(wpe, struct rpc_rdma_cbc, wpe); |
| RDMAXPRT *xprt = (RDMAXPRT *)wpe->arg; |
| |
| if (cbc->status) { |
| if (cbc->negative_cb) { |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] cbc %p status: %d", |
| __func__, xprt, xprt->state, cbc, cbc->status); |
| cbc->negative_cb(cbc, xprt); |
| } |
| |
| /* wpe->arg referenced before work_pool_submit() */ |
| SVC_RELEASE(&xprt->xprt, SVC_REF_FLAG_NONE); |
| return; |
| } |
| |
| switch (cbc->opcode) { |
| case IBV_WC_SEND: |
| case IBV_WC_RDMA_WRITE: |
| case IBV_WC_RDMA_READ: |
| case IBV_WC_RECV: |
| case IBV_WC_RECV_RDMA_WITH_IMM: |
| if (cbc->positive_cb) { |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] cbc %p opcode: %d", |
| __func__, xprt, xprt->state, cbc, cbc->opcode); |
| cbc->positive_cb(cbc, xprt); |
| } |
| break; |
| |
| default: |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] cbc %p opcode: %d unknown", |
| __func__, xprt, xprt->state, cbc, cbc->opcode); |
| break; |
| } |
| |
| /* wpe->arg referenced before work_pool_submit() */ |
| SVC_RELEASE(&xprt->xprt, SVC_REF_FLAG_NONE); |
| } |
| |
| /** |
| * rpc_rdma_fd_add: Adds fd to the epoll wait |
| * |
| * Returns 0 on success, errno value on error. |
| */ |
| static int |
| rpc_rdma_fd_add(RDMAXPRT *xprt, int fd, int epollfd) |
| { |
| struct epoll_event ev; |
| int flags = fcntl(fd, F_GETFL); |
| int rc = fcntl(fd, F_SETFL, flags | O_NONBLOCK); |
| |
| if (rc < 0) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p Failed to make the channel nonblock: %s (%d)", |
| __func__, xprt, strerror(rc), rc); |
| return rc; |
| } |
| |
| ev.events = EPOLLIN; |
| ev.data.ptr = xprt; |
| |
| rc = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev); |
| if (rc == -1) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p Failed to add fd to epoll: %s (%d)", |
| __func__, xprt, strerror(rc), rc); |
| return rc; |
| } |
| |
| return 0; |
| } |
| |
| static int |
| rpc_rdma_fd_del(int fd, int epollfd) |
| { |
| int rc = epoll_ctl(epollfd, EPOLL_CTL_DEL, fd, NULL); |
| |
| /* Let epoll deal with multiple deletes of the same fd */ |
| if (rc == -1 && errno != ENOENT) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Failed to del fd to epoll: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| return 0; |
| } |
| |
| static inline int |
| rpc_rdma_stats_add(RDMAXPRT *xprt) |
| { |
| struct sockaddr_un sockaddr; |
| int rc; |
| |
| /* no stats if no prefix */ |
| if (!xprt->xa->statistics_prefix) |
| return 0; |
| |
| /* setup xprt->stats_sock here */ |
| xprt->stats_sock = socket(AF_UNIX, SOCK_STREAM, 0); |
| if (xprt->stats_sock == -1) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() socket on stats socket failed, quitting thread: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| memset(&sockaddr, 0, sizeof(sockaddr)); |
| sockaddr.sun_family = AF_UNIX; |
| snprintf(sockaddr.sun_path, sizeof(sockaddr.sun_path)-1, "%s%p", |
| xprt->xa->statistics_prefix, xprt); |
| |
| unlink(sockaddr.sun_path); |
| |
| rc = bind(xprt->stats_sock, (struct sockaddr*)&sockaddr, |
| sizeof(sockaddr)); |
| if (rc == -1) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() bind on stats socket failed, quitting thread: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = listen(xprt->stats_sock, 5); |
| if (rc == -1) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() listen on stats socket failed, quitting thread: %s (%d)", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| |
| return rpc_rdma_fd_add(xprt, xprt->stats_sock, |
| rpc_rdma_state.stats_epollfd); |
| } |
| |
| static inline int |
| rpc_rdma_stats_del(RDMAXPRT *xprt) |
| { |
| /* rpc_rdma_fd_del is called in stats thread on a close event */ |
| char sun_path[108]; |
| |
| snprintf(sun_path, sizeof(sun_path)-1, "%s%p", |
| xprt->xa->statistics_prefix, xprt); |
| unlink(sun_path); |
| |
| return close(xprt->stats_sock); |
| } |
| |
| /** |
| * rpc_rdma_stats_thread: unix socket thread |
| * |
| * Well, a thread. arg = xprt |
| */ |
| static void * |
| rpc_rdma_stats_thread(void *arg) |
| { |
| RDMAXPRT *xprt; |
| struct epoll_event epoll_events[EPOLL_EVENTS]; |
| char stats_str[256]; |
| int childfd; |
| int i; |
| int n; |
| int rc; |
| |
| while (rpc_rdma_state.run_count > 0) { |
| n = epoll_wait(rpc_rdma_state.stats_epollfd, |
| epoll_events, EPOLL_EVENTS, EPOLL_WAIT_MS); |
| if (n == 0) |
| continue; |
| |
| if (n == -1) { |
| if (errno == EINTR) |
| continue; |
| |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() epoll_wait failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| break; |
| } |
| |
| for (i = 0; i < n; ++i) { |
| xprt = (RDMAXPRT*)epoll_events[i].data.ptr; |
| if (!xprt) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() no xprt: got an event on a fd that should have been removed!", |
| __func__); |
| continue; |
| } |
| |
| if (epoll_events[i].events == EPOLLERR |
| || epoll_events[i].events == EPOLLHUP) { |
| rpc_rdma_fd_del(xprt->stats_sock, |
| rpc_rdma_state.stats_epollfd); |
| continue; |
| } |
| |
| childfd = accept(xprt->stats_sock, NULL, NULL); |
| if (childfd == -1) { |
| if (errno == EINTR) { |
| continue; |
| } else { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() accept on stats socket failed: %s (%d)", |
| __func__, strerror(errno), errno); |
| continue; |
| } |
| } |
| |
| rc = snprintf(stats_str, sizeof(stats_str), "stats:\n" |
| " tx_bytes\ttx_pkt\ttx_err\n" |
| " %10"PRIu64"\t%"PRIu64"\t%"PRIu64"\n" |
| " rx_bytes\trx_pkt\trx_err\n" |
| " %10"PRIu64"\t%"PRIu64"\t%"PRIu64"\n" |
| " callback time: %lu.%09lu s\n" |
| " completion time: %lu.%09lu s\n", |
| xprt->stats.tx_bytes, xprt->stats.tx_pkt, |
| xprt->stats.tx_err, xprt->stats.rx_bytes, |
| xprt->stats.rx_pkt, xprt->stats.rx_err, |
| xprt->stats.nsec_callback / NSEC_IN_SEC, |
| xprt->stats.nsec_callback % NSEC_IN_SEC, |
| xprt->stats.nsec_compevent / NSEC_IN_SEC, |
| xprt->stats.nsec_compevent % NSEC_IN_SEC); |
| rc = write(childfd, stats_str, rc); |
| rc = close(childfd); |
| } |
| } |
| |
| pthread_exit(NULL); |
| } |
| |
| /** |
| * rpc_rdma_cq_event_handler: completion queue event handler. |
| * |
| * marks contexts back out of use, |
| * calls the appropriate callbacks for each kind of event. |
| * |
| * @return 0 on success, work completion status if not 0 |
| */ |
| static int |
| rpc_rdma_cq_event_handler(RDMAXPRT *xprt) |
| { |
| struct ibv_wc wc[IBV_POLL_EVENTS]; |
| struct ibv_cq *ev_cq; |
| void *ev_ctx; |
| struct rpc_rdma_cbc *cbc; |
| struct xdr_ioq_uv *data; |
| int i; |
| int rc; |
| int npoll = 0; |
| uint32_t len; |
| |
| rc = ibv_get_cq_event(xprt->comp_channel, &ev_cq, &ev_ctx); |
| if (rc) { |
| rc = errno; |
| if (rc != EAGAIN) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() ibv_get_cq_event failed: %d.", |
| __func__, rc); |
| } |
| return rc; |
| } |
| if (ev_cq != xprt->cq) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Unknown cq %p", |
| __func__, ev_cq); |
| ibv_ack_cq_events(ev_cq, 1); |
| return EINVAL; |
| } |
| |
| rc = ibv_req_notify_cq(xprt->cq, 0); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() ibv_req_notify_cq failed: %d.", |
| __func__, rc); |
| } |
| |
| while (rc == 0 |
| && (npoll = ibv_poll_cq(xprt->cq, IBV_POLL_EVENTS, wc)) > 0) { |
| for (i = 0; i < npoll; i++) { |
| if (xprt->bad_recv_wr) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Something was bad on that recv", |
| __func__); |
| } |
| if (xprt->bad_send_wr) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Something was bad on that send", |
| __func__); |
| } |
| cbc = (struct rpc_rdma_cbc *)wc[i].wr_id; |
| cbc->opcode = wc[i].opcode; |
| cbc->status = wc[i].status; |
| cbc->wpe.arg = xprt; |
| |
| if (wc[i].status) { |
| switch (wc[i].opcode) { |
| case IBV_WC_SEND: |
| case IBV_WC_RDMA_WRITE: |
| case IBV_WC_RDMA_READ: |
| xprt->stats.tx_err++; |
| break; |
| case IBV_WC_RECV: |
| case IBV_WC_RECV_RDMA_WITH_IMM: |
| xprt->stats.rx_err++; |
| break; |
| default: |
| break; |
| } |
| |
| SVC_REF(&xprt->xprt, SVC_REF_FLAG_NONE); |
| work_pool_submit(&svc_work_pool, &cbc->wpe); |
| |
| if (xprt->state != RDMAXS_CLOSING |
| && xprt->state != RDMAXS_CLOSED |
| && xprt->state != RDMAXS_ERROR) { |
| rc = wc[i].status; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() cq completion failed status: %s (%d)", |
| __func__, |
| ibv_wc_status_str(rc), rc); |
| |
| } |
| continue; |
| } |
| |
| switch (wc[i].opcode) { |
| case IBV_WC_SEND: |
| case IBV_WC_RDMA_WRITE: |
| case IBV_WC_RDMA_READ: |
| len = wc[i].byte_len; |
| xprt->stats.tx_bytes += len; |
| xprt->stats.tx_pkt++; |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() WC_SEND/RDMA_WRITE/RDMA_READ: %d len %u", |
| __func__, |
| wc[i].opcode, |
| len); |
| |
| if (wc[i].wc_flags & IBV_WC_WITH_IMM) { |
| //FIXME cbc->data->imm_data = ntohl(wc.imm_data); |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() imm_data: %d", |
| __func__, |
| ntohl(wc[i].imm_data)); |
| } |
| |
| SVC_REF(&xprt->xprt, SVC_REF_FLAG_NONE); |
| work_pool_submit(&svc_work_pool, &cbc->wpe); |
| break; |
| |
| case IBV_WC_RECV: |
| case IBV_WC_RECV_RDMA_WITH_IMM: |
| len = wc[i].byte_len; |
| xprt->stats.rx_bytes += len; |
| xprt->stats.rx_pkt++; |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() WC_RECV: %d len %u", |
| __func__, |
| wc[i].opcode, |
| len); |
| |
| if (wc[i].wc_flags & IBV_WC_WITH_IMM) { |
| //FIXME cbc->data->imm_data = ntohl(wc.imm_data); |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() imm_data: %d", |
| __func__, |
| ntohl(wc[i].imm_data)); |
| } |
| |
| /* fill all the sizes in case of multiple sge |
| * assumes _tail was set to _wrap before call |
| */ |
| data = IOQ_(TAILQ_FIRST(&cbc->workq.ioq_uv.uvqh.qh)); |
| while (data && ioquv_length(data) < len) { |
| VALGRIND_MAKE_MEM_DEFINED(data->v.vio_head, ioquv_length(data)); |
| len -= ioquv_length(data); |
| data = IOQ_(TAILQ_NEXT(&data->uvq, q)); |
| } |
| if (data) { |
| data->v.vio_tail = data->v.vio_head + len; |
| VALGRIND_MAKE_MEM_DEFINED(data->v.vio_head, ioquv_length(data)); |
| } else if (len) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() ERROR %d leftover bytes?", |
| __func__, len); |
| } |
| |
| SVC_REF(&xprt->xprt, SVC_REF_FLAG_NONE); |
| work_pool_submit(&svc_work_pool, &cbc->wpe); |
| break; |
| |
| default: |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() unknown opcode: %d", |
| __func__, wc[i].opcode); |
| rc = EINVAL; |
| } |
| } |
| } |
| |
| if (npoll < 0) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] ibv_poll_cq failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(-npoll), -npoll); |
| rc = -npoll; |
| } |
| |
| ibv_ack_cq_events(xprt->cq, 1); |
| |
| return -rc; |
| } |
| |
| /** |
| * rpc_rdma_cq_thread: thread function which waits for new completion events |
| * and gives them to handler (then ack the event) |
| * |
| */ |
| static void * |
| rpc_rdma_cq_thread(void *arg) |
| { |
| RDMAXPRT *xprt; |
| struct epoll_event epoll_events[EPOLL_EVENTS]; |
| int i; |
| int n; |
| int rc; |
| |
| while (rpc_rdma_state.run_count > 0) { |
| n = epoll_wait(rpc_rdma_state.cq_epollfd, |
| epoll_events, EPOLL_EVENTS, EPOLL_WAIT_MS); |
| if (n == 0) |
| continue; |
| |
| if (n == -1) { |
| if (errno == EINTR) |
| continue; |
| |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() epoll_wait failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| break; |
| } |
| |
| for (i = 0; i < n; ++i) { |
| xprt = (RDMAXPRT*)epoll_events[i].data.ptr; |
| if (!xprt) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() got an event on a fd that should have been removed! (no xprt)", |
| __func__); |
| continue; |
| } |
| |
| if (epoll_events[i].events == EPOLLERR |
| || epoll_events[i].events == EPOLLHUP) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() epoll error or hup (%d)", |
| __func__, epoll_events[i].events); |
| continue; |
| } |
| |
| mutex_lock(&xprt->cm_lock); |
| if (xprt->state >= RDMAXS_CLOSING) { |
| /* CLOSING, CLOSED, ERROR */ |
| // closing xprt, skip this, will be done on flush |
| rpc_rdma_fd_del(xprt->comp_channel->fd, |
| rpc_rdma_state.cq_epollfd); |
| mutex_unlock(&xprt->cm_lock); |
| continue; |
| } |
| |
| rc = rpc_rdma_cq_event_handler(xprt); |
| if (rc) { |
| if (xprt->state != RDMAXS_CLOSING |
| && xprt->state != RDMAXS_CLOSED |
| && xprt->state != RDMAXS_ERROR) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() something went wrong with our cq_event_handler", |
| __func__); |
| xprt->state = RDMAXS_ERROR; |
| cond_broadcast(&xprt->cm_cond); |
| } |
| } |
| mutex_unlock(&xprt->cm_lock); |
| } |
| } |
| |
| pthread_exit(NULL); |
| } |
| |
| /** |
| * rpc_rdma_cm_event_handler: handles addr/route resolved events (client side) |
| * and disconnect (everyone) |
| * |
| */ |
| static int |
| rpc_rdma_cm_event_handler(RDMAXPRT *ep_xprt, struct rdma_cm_event *event) |
| { |
| struct rdma_cm_id *cm_id = event->id; |
| RDMAXPRT *xprt = cm_id->context; |
| uint32_t u; |
| int rc = 0; |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p cma_event type %s", |
| __func__, ep_xprt, rdma_event_str(event->event)); |
| |
| if (xprt->bad_recv_wr) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Something was bad on that recv", |
| __func__); |
| } |
| if (xprt->bad_send_wr) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Something was bad on that send", |
| __func__); |
| } |
| |
| switch (event->event) { |
| case RDMA_CM_EVENT_ADDR_RESOLVED: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p ADDR_RESOLVED", |
| __func__, xprt); |
| mutex_lock(&xprt->cm_lock); |
| xprt->state = RDMAXS_ADDR_RESOLVED; |
| cond_broadcast(&xprt->cm_cond); |
| mutex_unlock(&xprt->cm_lock); |
| break; |
| |
| case RDMA_CM_EVENT_ROUTE_RESOLVED: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p ROUTE_RESOLVED", |
| __func__, xprt); |
| mutex_lock(&xprt->cm_lock); |
| xprt->state = RDMAXS_ROUTE_RESOLVED; |
| cond_broadcast(&xprt->cm_cond); |
| mutex_unlock(&xprt->cm_lock); |
| break; |
| |
| case RDMA_CM_EVENT_ESTABLISHED: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p ESTABLISHED", |
| __func__, xprt); |
| |
| xprt->state = RDMAXS_CONNECTED; |
| |
| rc = rpc_rdma_fd_add(xprt, xprt->comp_channel->fd, |
| rpc_rdma_state.cq_epollfd); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| } |
| rpc_rdma_stats_add(xprt); |
| break; |
| |
| case RDMA_CM_EVENT_CONNECT_REQUEST: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p CONNECT_REQUEST", |
| __func__, xprt); |
| |
| rc = sem_trywait(&rpc_rdma_state.c_r.u_sem); |
| if (rc) { |
| rc = errno; |
| if (EAGAIN != rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] sem_trywait failed: %s (%d)", |
| __func__, xprt, xprt->state, |
| strerror(rc), rc); |
| return rc; |
| } |
| |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p WARNING too many connection requests! " |
| "Need to increase backlog parameter.\n", |
| __func__, xprt); |
| |
| /* After advisory message, wait for available slot. |
| */ |
| rc = sem_wait(&rpc_rdma_state.c_r.u_sem); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] sem_wait failed: %s (%d)", |
| __func__, xprt, xprt->state, |
| strerror(rc), rc); |
| return rc; |
| } |
| } |
| |
| u = atomic_postinc_uint32_t(&rpc_rdma_state.c_r.q_tail); |
| if (u >= rpc_rdma_state.c_r.q_size) { |
| u_int q_mask = rpc_rdma_state.c_r.q_size - 1; |
| |
| /* masking allows update by lock-free tasks, |
| * as long as overrun never 2 * q_size |
| */ |
| u &= q_mask; |
| atomic_clear_uint32_t_bits(&rpc_rdma_state.c_r.q_tail, |
| ~q_mask); |
| } |
| rpc_rdma_state.c_r.id_queue[u] = cm_id; |
| |
| /* signal accept handler */ |
| rc = sem_post(&rpc_rdma_state.c_r.q_sem); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() sem_post failed", |
| __func__, strerror(rc), rc); |
| return rc; |
| } |
| break; |
| |
| case RDMA_CM_EVENT_ADDR_ERROR: |
| case RDMA_CM_EVENT_ROUTE_ERROR: |
| case RDMA_CM_EVENT_CONNECT_ERROR: |
| case RDMA_CM_EVENT_UNREACHABLE: |
| case RDMA_CM_EVENT_REJECTED: |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] cma event %s, error %d", |
| __func__, xprt, xprt->state, |
| rdma_event_str(event->event), event->status); |
| mutex_lock(&xprt->cm_lock); |
| xprt->state = RDMAXS_ERROR; |
| cond_broadcast(&xprt->cm_cond); |
| mutex_unlock(&xprt->cm_lock); |
| break; |
| |
| case RDMA_CM_EVENT_DISCONNECTED: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] DISCONNECT EVENT...", |
| __func__, xprt, xprt->state); |
| |
| // don't call completion again |
| if (xprt->comp_channel) |
| rpc_rdma_fd_del(xprt->comp_channel->fd, |
| rpc_rdma_state.cq_epollfd); |
| |
| mutex_lock(&xprt->cm_lock); |
| xprt->state = RDMAXS_CLOSED; |
| cond_broadcast(&xprt->cm_cond); |
| mutex_unlock(&xprt->cm_lock); |
| |
| if (xprt->xa->disconnect_cb) |
| xprt->xa->disconnect_cb(&xprt->xprt); |
| break; |
| |
| case RDMA_CM_EVENT_DEVICE_REMOVAL: |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] cma detected device removal!!!!", |
| __func__, xprt, xprt->state); |
| rc = ENODEV; |
| break; |
| |
| default: |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] unhandled event: %s, ignoring %d\n", |
| __func__, xprt, xprt->state, |
| rdma_event_str(event->event), event->event); |
| break; |
| } |
| |
| return rc; |
| } |
| |
| /** |
| * rpc_rdma_cm_thread: thread function which waits for new connection events |
| * and gives them to handler (then ack the event) |
| * |
| */ |
| static void * |
| rpc_rdma_cm_thread(void *nullarg) |
| { |
| RDMAXPRT *xprt; |
| RDMAXPRT *cm_xprt; |
| struct rdma_cm_event *event; |
| struct epoll_event epoll_events[EPOLL_EVENTS]; |
| int i; |
| int n; |
| int rc; |
| |
| while (rpc_rdma_state.run_count > 0) { |
| n = epoll_wait(rpc_rdma_state.cm_epollfd, |
| epoll_events, EPOLL_EVENTS, EPOLL_WAIT_MS); |
| if (n == 0) |
| continue; |
| |
| if (n == -1) { |
| if (errno == EINTR) |
| continue; |
| |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] epoll_wait failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| break; |
| } |
| |
| for (i = 0; i < n; ++i) { |
| xprt = (RDMAXPRT*)epoll_events[i].data.ptr; |
| if (!xprt) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() got an event on a fd that should have been removed! (no xprt)", |
| __func__); |
| continue; |
| } |
| |
| if (epoll_events[i].events == EPOLLERR |
| || epoll_events[i].events == EPOLLHUP) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() epoll error or hup (%d)", |
| __func__, epoll_events[i].events); |
| continue; |
| } |
| |
| if (xprt->state == RDMAXS_CLOSED) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() got a cm event on a closed xprt?", |
| __func__); |
| continue; |
| } |
| |
| if (!xprt->event_channel) { |
| if (xprt->state != RDMAXS_CLOSED) |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() no event channel? :D", |
| __func__); |
| continue; |
| } |
| |
| rc = rdma_get_cm_event(xprt->event_channel, &event); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() rdma_get_cm_event failed: %d.", |
| __func__, rc); |
| continue; |
| } |
| |
| cm_xprt = event->id->context; |
| rc = rpc_rdma_cm_event_handler(xprt, event); |
| rdma_ack_cm_event(event); |
| |
| if (rc |
| && (cm_xprt->state != RDMAXS_LISTENING |
| || cm_xprt == xprt)) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() rpc_rdma_cm_event_handler: %d.", |
| __func__, rc); |
| } |
| |
| if (cm_xprt->state == RDMAXS_CLOSED |
| && cm_xprt->destroy_on_disconnect) |
| SVC_DESTROY(&cm_xprt->xprt); |
| } |
| } |
| |
| pthread_exit(NULL); |
| } |
| |
| /** |
| * rpc_rdma_flush_buffers: Flush all pending recv/send |
| * |
| * @param[IN] xprt |
| * |
| * @return void |
| */ |
| static void |
| rpc_rdma_flush_buffers(RDMAXPRT *xprt) |
| { |
| int rc; |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u]", |
| __func__, xprt, xprt->state); |
| |
| mutex_lock(&xprt->cm_lock); |
| |
| if (xprt->state != RDMAXS_ERROR) { |
| do { |
| rc = rpc_rdma_cq_event_handler(xprt); |
| } while (rc == 0); |
| |
| if (rc != EAGAIN) |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() couldn't flush pending data in cq: %d", |
| __func__, rc); |
| } |
| #ifdef FIXME |
| /* only flush rx if client or accepting server */ |
| if (xprt->server >= 0) |
| for (i = 0, ctx = xprt->rcb; |
| i < xprt->xa->rq_depth; |
| i++, |
| ctx = (struct rpc_rdma_cbc*)((uint8_t*)ctx + sizeof(struct rpc_rdma_cbc) + xprt->xa->max_recv_sge*sizeof(struct ibv_sge))) |
| if (ctx->used == MSK_CTX_PENDING) |
| rpc_rdma_worker_signal(xprt, ctx, IBV_WC_FATAL_ERR, IBV_WC_RECV); |
| |
| for (i = 0, ctx = (struct rpc_rdma_cbc *)xprt->wcb; |
| i < xprt->xa->sq_depth; |
| i++, ctx = (struct rpc_rdma_cbc*)((uint8_t*)ctx + sizeof(struct rpc_rdma_cbc) + xprt->xa->max_send_sge*sizeof(struct ibv_sge))) |
| if (ctx->used == MSK_CTX_PENDING) |
| rpc_rdma_worker_signal(xprt, ctx, IBV_WC_FATAL_ERR, IBV_WC_SEND); |
| |
| /* only flush rx if client or accepting server */ |
| if (xprt->server >= 0) do { |
| wait = 0; |
| for (i = 0, ctx = xprt->rcb; |
| i < xprt->xa->rq_depth; |
| i++, ctx = rpc_rdma_next_ctx(ctx, xprt->xa->max_recv_sge)) |
| if (ctx->used != MSK_CTX_FREE) |
| wait++; |
| |
| } while (wait && usleep(100000)); |
| do { |
| wait = 0; |
| for (i = 0, ctx = (struct rpc_rdma_cbc *)xprt->wcb; |
| i < xprt->xa->sq_depth; |
| i++, ctx = rpc_rdma_next_ctx(ctx, xprt->xa->max_recv_sge)) |
| if (ctx->used != MSK_CTX_FREE) |
| wait++; |
| |
| } while (wait && usleep(100000)); |
| #endif |
| mutex_unlock(&xprt->cm_lock); |
| } |
| |
| /** |
| * rpc_rdma_destroy_stuff: destroys all qp-related stuff for us |
| * |
| * @param[INOUT] xprt |
| * |
| * @return void, even if the functions _can_ fail we choose to ignore it. //FIXME? |
| */ |
| static void |
| rpc_rdma_destroy_stuff(RDMAXPRT *xprt) |
| { |
| if (xprt->qp) { |
| // flush all pending receive/send buffers to error callback |
| rpc_rdma_flush_buffers(xprt); |
| |
| ibv_destroy_qp(xprt->qp); |
| xprt->qp = NULL; |
| } |
| |
| if (xprt->cq) { |
| ibv_destroy_cq(xprt->cq); |
| xprt->cq = NULL; |
| } |
| |
| if (xprt->comp_channel) { |
| ibv_destroy_comp_channel(xprt->comp_channel); |
| xprt->comp_channel = NULL; |
| } |
| |
| if (!TAILQ_EMPTY(&xprt->cbqh.qh)) { |
| xdr_ioq_destroy_pool(&xprt->cbqh); |
| } |
| } |
| |
| /** |
| * rpc_rdma_destroy: disconnects and free transport data |
| * |
| * @param[IN] xprt pointer to the service transport to destroy |
| */ |
| void |
| rpc_rdma_destroy(SVCXPRT *s_xprt) |
| { |
| RDMAXPRT *xprt = (RDMAXPRT *)s_xprt; |
| |
| /* inhibit repeated destroy */ |
| xprt->destroy_on_disconnect = false; |
| |
| if (xprt->state == RDMAXS_CONNECTED |
| || xprt->state == RDMAXS_CLOSED) { |
| mutex_lock(&xprt->cm_lock); |
| if (xprt->state != RDMAXS_CLOSED |
| && xprt->state != RDMAXS_LISTENING |
| && xprt->state != RDMAXS_ERROR) |
| xprt->state = RDMAXS_CLOSING; |
| |
| if (xprt->cm_id && xprt->cm_id->verbs) |
| rdma_disconnect(xprt->cm_id); |
| |
| while (xprt->state != RDMAXS_CLOSED |
| && xprt->state != RDMAXS_LISTENING |
| && xprt->state != RDMAXS_ERROR) { |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() we're not closed yet, " |
| "waiting for disconnect_event", |
| __func__); |
| cond_wait(&xprt->cm_cond, &xprt->cm_lock); |
| } |
| xprt->state = RDMAXS_CLOSED; |
| mutex_unlock(&xprt->cm_lock); |
| } |
| |
| if (xprt->cm_id) { |
| rdma_destroy_id(xprt->cm_id); |
| xprt->cm_id = NULL; |
| } |
| |
| if (xprt->stats_sock) |
| rpc_rdma_stats_del(xprt); |
| |
| /* event channel is shared between all children, |
| * so don't close it unless it's its own. |
| */ |
| if ((xprt->server != RDMAX_SERVER_CHILD) |
| && xprt->event_channel) { |
| rpc_rdma_fd_del(xprt->event_channel->fd, |
| rpc_rdma_state.cm_epollfd); |
| rdma_destroy_event_channel(xprt->event_channel); |
| xprt->event_channel = NULL; |
| } |
| |
| rpc_rdma_destroy_stuff(xprt); |
| rpc_rdma_pd_put(xprt); |
| |
| if (atomic_dec_int32_t(&rpc_rdma_state.run_count) <= 0) { |
| mutex_lock(&rpc_rdma_state.lock); |
| rpc_rdma_internals_join(); |
| mutex_unlock(&rpc_rdma_state.lock); |
| } |
| |
| /* destroy locking last, was initialized first (below). |
| */ |
| cond_destroy(&xprt->cm_cond); |
| mutex_destroy(&xprt->cm_lock); |
| mutex_destroy(&xprt->xprt.xp_lock); |
| mutex_destroy(&xprt->xprt.xp_auth_lock); |
| |
| mem_free(xprt, sizeof(*xprt)); |
| } |
| |
| /** |
| * rpc_rdma_allocate: allocate rdma transport structures |
| * |
| * @param[IN] xa parameters |
| * |
| * @return xprt on success, NULL on failure |
| */ |
| static RDMAXPRT * |
| rpc_rdma_allocate(struct rpc_rdma_attr *xa) |
| { |
| RDMAXPRT *xprt; |
| int rc; |
| |
| if (!xa) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Invalid argument", |
| __func__); |
| return NULL; |
| } |
| |
| xprt = mem_zalloc(sizeof(RDMAXPRT)); |
| |
| xprt->xprt.xp_type = XPRT_RDMA; |
| xprt->xprt.xp_refs = 1; |
| xprt->xprt.xp_ops = &rpc_rdma_ops; |
| |
| xprt->xa = xa; |
| xprt->conn_type = RDMA_PS_TCP; |
| xprt->destroy_on_disconnect = xa->destroy_on_disconnect; |
| |
| /* initialize locking first, will be destroyed last (above). |
| */ |
| rc = mutex_init(&xprt->xprt.xp_auth_lock, NULL); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() mutex_init xp_auth_lock failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto xp_auth_lock; |
| } |
| |
| rc = mutex_init(&xprt->xprt.xp_lock, NULL); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() mutex_init xp_lock failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto xp_lock; |
| } |
| |
| rc = mutex_init(&xprt->cm_lock, NULL); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() mutex_init failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto cm_lock; |
| } |
| |
| rc = cond_init(&xprt->cm_cond, NULL, NULL); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() cond_init failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto cm_cond; |
| } |
| |
| return (xprt); |
| |
| cm_cond: |
| mutex_destroy(&xprt->cm_lock); |
| cm_lock: |
| mutex_destroy(&xprt->xprt.xp_lock); |
| xp_lock: |
| mutex_destroy(&xprt->xprt.xp_auth_lock); |
| xp_auth_lock: |
| mem_free(xprt, sizeof(*xprt)); |
| return NULL; |
| } |
| |
| /** |
| * rpc_rdma_create: initialize rdma transport structures |
| * |
| * @param[IN] xa parameters |
| * |
| * @return xprt on success, NULL on failure |
| */ |
| SVCXPRT * |
| rpc_rdma_create(struct rpc_rdma_attr *xa) |
| { |
| RDMAXPRT *xprt; |
| int rc; |
| |
| if (xa->backlog > 4096) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() backlog (%u) much too large", |
| __func__, xa->backlog); |
| return NULL; |
| } |
| |
| if (xa->worker_count > 256) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() worker_count (%u) much too large", |
| __func__, xa->worker_count); |
| return NULL; |
| } |
| |
| xprt = rpc_rdma_allocate(xa); |
| if (!xprt) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return NULL; |
| } |
| xprt->server = xa->backlog; /* convenient number > 0 */ |
| |
| /* presence of event_channel confirms RDMA, |
| * otherwise, cleanup and quit RDMA dispatcher. |
| */ |
| xprt->event_channel = rdma_create_event_channel(); |
| if (!xprt->event_channel) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() create_event_channel failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto failure; |
| } |
| |
| rc = rdma_create_id(xprt->event_channel, &xprt->cm_id, xprt, |
| xprt->conn_type); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() create_id failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto failure; |
| } |
| |
| pthread_mutex_lock(&svc_work_pool.pqh.qmutex); |
| if (!svc_work_pool.params.thrd_max) { |
| pthread_mutex_unlock(&svc_work_pool.pqh.qmutex); |
| |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() svc_work_pool already shutdown", |
| __func__); |
| goto failure; |
| } |
| svc_work_pool.params.thrd_max += xa->worker_count; |
| pthread_mutex_unlock(&svc_work_pool.pqh.qmutex); |
| |
| /* round up to the next power of two */ |
| rpc_rdma_state.c_r.q_size = 2; |
| while (rpc_rdma_state.c_r.q_size < xa->backlog) { |
| rpc_rdma_state.c_r.q_size <<= 1; |
| } |
| rpc_rdma_state.c_r.id_queue = mem_alloc(rpc_rdma_state.c_r.q_size |
| * sizeof(struct rdma_cm_id *)); |
| sem_init(&rpc_rdma_state.c_r.u_sem, 0, rpc_rdma_state.c_r.q_size); |
| |
| rc = rpc_rdma_bind_server(xprt); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() NFS/RDMA dispatcher could not bind engine", |
| __func__); |
| goto failure; |
| } |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() NFS/RDMA engine bound", |
| __func__); |
| |
| return (&xprt->xprt); |
| |
| failure: |
| rpc_rdma_destroy(&xprt->xprt); |
| return NULL; |
| } |
| |
| /** |
| * rpc_rdma_create_qp: create a qp associated with a xprt |
| * |
| * @param[INOUT] xprt |
| * @param[IN] cm_id |
| * |
| * @return 0 on success, errno value on error |
| */ |
| static int |
| rpc_rdma_create_qp(RDMAXPRT *xprt, struct rdma_cm_id *cm_id) |
| { |
| int rc; |
| struct ibv_qp_init_attr qp_attr = { |
| .cap.max_send_wr = xprt->xa->sq_depth, |
| .cap.max_send_sge = xprt->xa->max_send_sge, |
| .cap.max_recv_wr = xprt->xa->rq_depth, |
| .cap.max_recv_sge = xprt->xa->max_recv_sge, |
| .cap.max_inline_data = 0, // change if IMM |
| .qp_type = (xprt->conn_type == RDMA_PS_UDP |
| ? IBV_QPT_UD : IBV_QPT_RC), |
| .sq_sig_all = 1, |
| .send_cq = xprt->cq, |
| .recv_cq = xprt->cq, |
| .srq = xprt->srq, |
| }; |
| |
| rc = rdma_create_qp(cm_id, xprt->pd->pd, &qp_attr); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_create_qp failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| xprt->qp = cm_id->qp; |
| return 0; |
| } |
| |
| /** |
| * rpc_rdma_setup_stuff: setup pd, qp an' stuff |
| * |
| * @param[INOUT] xprt |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| static int |
| rpc_rdma_setup_stuff(RDMAXPRT *xprt) |
| { |
| int rc; |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u]", |
| __func__, xprt, xprt->state); |
| |
| /* Located in this function for convenience, called by both |
| * client and server. Each is only done once for all connections. |
| */ |
| rc = rpc_rdma_thread_create_epoll(&rpc_rdma_state.cq_thread, |
| rpc_rdma_cq_thread, xprt, &rpc_rdma_state.cq_epollfd); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return rc; |
| } |
| |
| if (xprt->xa->statistics_prefix != NULL |
| && (rc = rpc_rdma_thread_create_epoll(&rpc_rdma_state.stats_thread, |
| rpc_rdma_stats_thread, xprt, &rpc_rdma_state.stats_epollfd))) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return rc; |
| } |
| |
| xprt->comp_channel = ibv_create_comp_channel(xprt->cm_id->verbs); |
| if (!xprt->comp_channel) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] ibv_create_comp_channel failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| xprt->cq = ibv_create_cq(xprt->cm_id->verbs, |
| xprt->xa->sq_depth + xprt->xa->rq_depth, |
| xprt, xprt->comp_channel, 0); |
| if (!xprt->cq) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] ibv_create_cq failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = ibv_req_notify_cq(xprt->cq, 0); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] ibv_req_notify_cq failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = rpc_rdma_create_qp(xprt, xprt->cm_id); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return rc; |
| } |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] created qp %p", |
| __func__, xprt, xprt->state, xprt->qp); |
| return 0; |
| } |
| |
| /** |
| * rpc_rdma_setup_cbq |
| */ |
| static int |
| rpc_rdma_setup_cbq(struct poolq_head *ioqh, u_int depth, u_int sge) |
| { |
| struct rpc_rdma_cbc *cbc; |
| |
| if (ioqh->qsize) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() contexts already allocated", |
| __func__); |
| return EINVAL; |
| } |
| ioqh->qsize = sizeof(struct rpc_rdma_cbc) |
| + sizeof(struct ibv_sge) * sge; |
| TAILQ_INIT(&ioqh->qh); |
| |
| /* individual entries is less efficient than big array -- but uses |
| * "standard" IOQ operations, xdr_ioq_destroy_pool(), and |
| * debugging memory bounds checking of trailing ibv_sge array. |
| */ |
| while (depth--) { |
| cbc = mem_zalloc(ioqh->qsize); |
| |
| xdr_ioq_setup(&cbc->workq); |
| xdr_ioq_setup(&cbc->holdq); |
| |
| cbc->workq.ioq_uv.uvq_fetch = |
| cbc->holdq.ioq_uv.uvq_fetch = xdr_ioq_uv_fetch_nothing; |
| cbc->workq.xdrs[0].x_ops = |
| cbc->holdq.xdrs[0].x_ops = &xdr_ioq_ops; |
| cbc->workq.xdrs[0].x_op = |
| cbc->holdq.xdrs[0].x_op = XDR_FREE; /* catch setup errors */ |
| |
| cbc->workq.ioq_pool = ioqh; |
| cbc->wpe.fun = rpc_rdma_worker_callback; |
| |
| (ioqh->qcount)++; |
| TAILQ_INSERT_TAIL(&ioqh->qh, &cbc->workq.ioq_s, q); |
| } |
| return 0; |
| } |
| |
| /** |
| * rpc_rdma_bind_server |
| * |
| * @param[INOUT] xprt |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| static int |
| rpc_rdma_bind_server(RDMAXPRT *xprt) |
| { |
| struct rdma_addrinfo *res; |
| struct rdma_addrinfo hints; |
| int rc; |
| |
| if (!xprt || xprt->state != RDMAXS_INITIAL) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] must be initialized first!", |
| __func__, xprt, xprt->state); |
| return EINVAL; |
| } |
| |
| if (xprt->server <= 0) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Must be on server side to call this function", |
| __func__); |
| return EINVAL; |
| } |
| |
| memset(&hints, 0, sizeof(hints)); |
| hints.ai_flags = RAI_PASSIVE; |
| hints.ai_port_space = xprt->conn_type; |
| |
| rc = rdma_getaddrinfo(xprt->xa->node, xprt->xa->port, &hints, &res); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_getaddrinfo: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = rdma_bind_addr(xprt->cm_id, res->ai_src_addr); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_bind_addr: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| rdma_freeaddrinfo(res); |
| |
| /* at this point, the cm_id->verbs aren't filled */ |
| rc = rpc_rdma_pd_by_verbs(xprt); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] register pd failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = rdma_listen(xprt->cm_id, xprt->xa->backlog); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_listen failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| xprt->state = RDMAXS_LISTENING; |
| atomic_inc_int32_t(&rpc_rdma_state.run_count); |
| |
| rc = rpc_rdma_thread_create_epoll(&rpc_rdma_state.cm_thread, |
| rpc_rdma_cm_thread, xprt, &rpc_rdma_state.cm_epollfd); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rpc_rdma_thread_create_epoll failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| (void) atomic_dec_int32_t(&rpc_rdma_state.run_count); |
| return rc; |
| } |
| |
| return rpc_rdma_fd_add(xprt, xprt->event_channel->fd, |
| rpc_rdma_state.cm_epollfd); |
| } |
| |
| /** |
| * rpc_rdma_clone: clone child from listener parent |
| * |
| * @param[IN] l_xprt listening (parent) transport |
| * @param[IN] cm_id new rdma connection manager identifier |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| static RDMAXPRT * |
| rpc_rdma_clone(RDMAXPRT *l_xprt, struct rdma_cm_id *cm_id) |
| { |
| RDMAXPRT *xprt = rpc_rdma_allocate(l_xprt->xa); |
| int rc; |
| |
| if (!xprt) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| return NULL; |
| } |
| |
| xprt->cm_id = cm_id; |
| xprt->cm_id->context = xprt; |
| xprt->state = RDMAXS_CONNECT_REQUEST; |
| xprt->server = RDMAX_SERVER_CHILD; |
| |
| xprt->event_channel = l_xprt->event_channel; |
| |
| rc = rpc_rdma_pd_get(xprt); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| goto failure; |
| } |
| |
| if (l_xprt->xa->use_srq) { |
| if (!xprt->pd->srq) { |
| struct ibv_srq_init_attr srq_attr = { |
| .attr.max_wr = xprt->xa->rq_depth, |
| .attr.max_sge = xprt->xa->max_recv_sge, |
| }; |
| xprt->pd->srq = |
| ibv_create_srq(xprt->pd->pd, &srq_attr); |
| if (!xprt->pd->srq) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() ibv_create_srq failed: %s (%d)", |
| __func__, strerror(rc), rc); |
| goto failure; |
| } |
| } |
| |
| if (!xprt->pd->srqh.qcount) { |
| rc = rpc_rdma_setup_cbq(&xprt->pd->srqh, |
| xprt->xa->rq_depth, |
| xprt->xa->max_recv_sge); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| goto failure; |
| } |
| } |
| |
| /* only send contexts */ |
| rc = rpc_rdma_setup_cbq(&xprt->cbqh, |
| xprt->xa->sq_depth, |
| xprt->xa->credits); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| goto failure; |
| } |
| } else { |
| rc = rpc_rdma_setup_cbq(&xprt->cbqh, |
| xprt->xa->rq_depth + |
| xprt->xa->sq_depth, |
| xprt->xa->credits); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| goto failure; |
| } |
| } |
| |
| /* srq only used as a boolean here */ |
| xprt->srq = xprt->pd->srq; |
| |
| rc = rpc_rdma_setup_stuff(xprt); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s:%u ERROR (return)", |
| __func__, __LINE__); |
| goto failure; |
| } |
| |
| atomic_inc_int32_t(&rpc_rdma_state.run_count); |
| return xprt; |
| |
| failure: |
| rpc_rdma_destroy(&xprt->xprt); |
| return (NULL); |
| } |
| |
| /** |
| * rpc_rdma_accept_finalize: does the real connection acceptance |
| * N.B. no wait for CM result. CM event thread will handle setup/teardown. |
| * |
| * @param[IN] xprt |
| * |
| * @return 0 on success, the value of errno on error |
| */ |
| int |
| rpc_rdma_accept_finalize(RDMAXPRT *xprt) |
| { |
| struct rdma_conn_param conn_param; |
| int rc; |
| |
| if (!xprt || xprt->state != RDMAXS_CONNECT_REQUEST) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] isn't from a connection request?", |
| __func__, xprt, xprt->state); |
| return EINVAL; |
| } |
| |
| memset(&conn_param, 0, sizeof(conn_param)); |
| conn_param.responder_resources = 1; |
| conn_param.initiator_depth = 1; |
| conn_param.private_data = NULL; |
| conn_param.private_data_len = 0; |
| conn_param.rnr_retry_count = 10; |
| |
| rc = rdma_accept(xprt->cm_id, &conn_param); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_accept failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| } |
| |
| return rc; |
| } |
| |
| /** |
| * rpc_rdma_accept_timedwait: given a listening xprt, |
| * waits till any connection is requested and accepts one, |
| * then clones the listener. |
| * |
| * @param[IN] l_xprt listening (parent) transport |
| * @param[IN] abstime time to wait |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| static RDMAXPRT * |
| rpc_rdma_accept_timedwait(RDMAXPRT *l_xprt, struct timespec *abstime) |
| { |
| struct rdma_cm_id *cm_id; |
| uint32_t u; |
| int rc; |
| |
| if (!l_xprt || l_xprt->state != RDMAXS_LISTENING) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] isn't listening (after bind_server)?", |
| __func__, l_xprt, l_xprt->state); |
| return (NULL); |
| } |
| |
| /* Drain connection_requests */ |
| if (abstime) { |
| rc = sem_timedwait(&rpc_rdma_state.c_r.q_sem, abstime); |
| if (rc) { |
| rc = errno; |
| if (ETIMEDOUT == rc) { |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() ETIMEDOUT", |
| __func__); |
| } else { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() sem_timedwait failed", |
| __func__, strerror(rc), rc); |
| } |
| return (NULL); |
| } |
| } else { |
| rc = sem_wait(&rpc_rdma_state.c_r.q_sem); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() sem_wait failed", |
| __func__, strerror(rc), rc); |
| return (NULL); |
| } |
| } |
| |
| u = atomic_postinc_uint32_t(&rpc_rdma_state.c_r.q_head); |
| if (u >= rpc_rdma_state.c_r.q_size) { |
| u_int q_mask = rpc_rdma_state.c_r.q_size - 1; |
| |
| /* masking allows update by lock-free tasks, |
| * as long as overrun never 2 * q_size |
| */ |
| u &= q_mask; |
| atomic_clear_uint32_t_bits(&rpc_rdma_state.c_r.q_head, ~q_mask); |
| } |
| cm_id = rpc_rdma_state.c_r.id_queue[u]; |
| |
| /* Increase available count */ |
| rc = sem_post(&rpc_rdma_state.c_r.u_sem); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() sem_post failed", |
| __func__, strerror(rc), rc); |
| return (NULL); |
| } |
| |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() thread %lx, q %u, cm_id %p", |
| __func__, |
| pthread_self(), |
| u, |
| cm_id); |
| |
| if (!cm_id) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() missing cm_id", |
| __func__); |
| return (NULL); |
| } |
| |
| return rpc_rdma_clone(l_xprt, cm_id); |
| } |
| |
| RDMAXPRT * |
| rpc_rdma_accept_wait(RDMAXPRT *l_xprt,int msleep) |
| { |
| struct timespec ts; |
| |
| if (msleep == 0) |
| return rpc_rdma_accept_timedwait(l_xprt, NULL); |
| |
| clock_gettime(CLOCK_REALTIME, &ts); |
| ts.tv_sec += msleep / 1000; |
| ts.tv_nsec += (msleep % 1000) * NSEC_IN_SEC; |
| if (ts.tv_nsec >= NSEC_IN_SEC) { |
| ts.tv_nsec -= NSEC_IN_SEC; |
| ts.tv_sec++; |
| } |
| |
| return rpc_rdma_accept_timedwait(l_xprt, &ts); |
| } |
| |
| /** |
| * rpc_rdma_bind_client: resolve addr and route for the client and waits till it's done |
| * (the route and cond_signal is done in the cm thread) |
| * |
| */ |
| static int |
| rpc_rdma_bind_client(RDMAXPRT *xprt) |
| { |
| struct rdma_addrinfo hints; |
| struct rdma_addrinfo *res; |
| int rc; |
| |
| mutex_lock(&xprt->cm_lock); |
| |
| do { |
| memset(&hints, 0, sizeof(hints)); |
| hints.ai_port_space = xprt->conn_type; |
| |
| rc = rdma_getaddrinfo(xprt->xa->node, xprt->xa->port, |
| &hints, &res); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_getaddrinfo: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| break; |
| } |
| |
| rc = rdma_resolve_addr(xprt->cm_id, res->ai_src_addr, |
| res->ai_dst_addr, xprt->xa->timeout); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_resolve_addr failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| break; |
| } |
| rdma_freeaddrinfo(res); |
| |
| while (xprt->state == RDMAXS_INITIAL) { |
| cond_wait(&xprt->cm_cond, &xprt->cm_lock); |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] after cond_wait", |
| __func__, xprt, xprt->state); |
| } |
| |
| if (xprt->state != RDMAXS_ADDR_RESOLVED) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Could not resolve addr", |
| __func__); |
| rc = EINVAL; |
| break; |
| } |
| |
| rc = rdma_resolve_route(xprt->cm_id, xprt->xa->timeout); |
| if (rc) { |
| xprt->state = RDMAXS_ERROR; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_resolve_route failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| break; |
| } |
| |
| while (xprt->state == RDMAXS_ADDR_RESOLVED) { |
| cond_wait(&xprt->cm_cond, &xprt->cm_lock); |
| __warnx(TIRPC_DEBUG_FLAG_RPC_RDMA, |
| "%s() %p[%u] after cond_wait", |
| __func__, xprt, xprt->state); |
| } |
| |
| if (xprt->state != RDMAXS_ROUTE_RESOLVED) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() Could not resolve route", |
| __func__); |
| rc = EINVAL; |
| break; |
| } |
| } while (0); |
| |
| mutex_unlock(&xprt->cm_lock); |
| |
| return rc; |
| } |
| |
| /** |
| * rpc_rdma_connect_finalize: tells the other side we're ready to receive stuff |
| * (does the actual rdma_connect) |
| * N.B. no wait for CM result. CM event thread will handle setup/teardown. |
| * |
| * @param[IN] xprt |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| int |
| rpc_rdma_connect_finalize(RDMAXPRT *xprt) |
| { |
| struct rdma_conn_param conn_param; |
| int rc; |
| |
| if (!xprt || xprt->state != RDMAXS_ROUTE_RESOLVED) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] isn't half-connected?", |
| __func__, xprt, xprt->state); |
| return EINVAL; |
| } |
| |
| memset(&conn_param, 0, sizeof(conn_param)); |
| conn_param.responder_resources = 1; |
| conn_param.initiator_depth = 1; |
| conn_param.rnr_retry_count = 10; |
| conn_param.retry_count = 10; |
| |
| rc = rdma_connect(xprt->cm_id, &conn_param); |
| if (rc) { |
| rc = errno; |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rdma_connect failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| } |
| |
| return rc; |
| } |
| |
| /** |
| * rpc_rdma_connect: connects a client to a server |
| * |
| * @param[INOUT] xprt must be init first |
| * |
| * @return 0 on success, errno value on failure |
| */ |
| int |
| rpc_rdma_connect(RDMAXPRT *xprt) |
| { |
| int rc; |
| |
| if (!xprt || xprt->state != RDMAXS_INITIAL) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] must be initialized first!", |
| __func__, xprt, xprt->state); |
| return EINVAL; |
| } |
| |
| if (xprt->server) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() only called from client side!", |
| __func__); |
| return EINVAL; |
| } |
| |
| rc = rpc_rdma_thread_create_epoll(&rpc_rdma_state.cm_thread, |
| rpc_rdma_cm_thread, xprt, &rpc_rdma_state.cm_epollfd); |
| if (rc) { |
| __warnx(TIRPC_DEBUG_FLAG_ERROR, |
| "%s() %p[%u] rpc_rdma_thread_create_epoll failed: %s (%d)", |
| __func__, xprt, xprt->state, strerror(rc), rc); |
| return rc; |
| } |
| |
| rc = rpc_rdma_bind_client(xprt); |
| if (rc) |
| return rc; |
| rc = rpc_rdma_pd_get(xprt); |
| if (rc) |
| return rc; |
| rc = rpc_rdma_setup_stuff(xprt); |
| if (rc) { |
| rpc_rdma_destroy_stuff(xprt); |
| return rc; |
| } |
| rc = rpc_rdma_setup_cbq(&xprt->cbqh, |
| xprt->xa->rq_depth + xprt->xa->sq_depth, |
| xprt->xa->credits); |
| if (rc) |
| return rc; |
| |
| return rpc_rdma_fd_add(xprt, xprt->event_channel->fd, |
| rpc_rdma_state.cm_epollfd); |
| } |
| |
| static struct xp_ops rpc_rdma_ops = { |
| /* XXX wow */ |
| .xp_getargs = (bool(*)(SVCXPRT *, struct svc_req *, xdrproc_t, |
| void *, void *))abort, |
| .xp_reply = (bool(*)(SVCXPRT *, struct svc_req *, |
| struct rpc_msg *))abort, |
| .xp_freeargs = (bool(*)(SVCXPRT *, struct svc_req *, xdrproc_t, |
| void *))abort, |
| }; |