From: Linus Torvalds Date: Tue, 10 Nov 2015 02:11:22 +0000 (-0800) Subject: Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs X-Git-Tag: firefly_0821_release~176^2~745 X-Git-Url: http://plrg.eecs.uci.edu/git/?a=commitdiff_plain;h=e6604ecb70d4b1dbc0372c6518b51c25c4b135a1;hp=-c;p=firefly-linux-kernel-4.4.55.git Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs Pull NFS client updates from Trond Myklebust: "Highlights include: New features: - RDMA client backchannel from Chuck - Support for NFSv4.2 file CLONE using the btrfs ioctl Bugfixes + cleanups: - Move socket data receive out of the bottom halves and into a workqueue - Refactor NFSv4 error handling so synchronous and asynchronous RPC handles errors identically. - Fix a panic when blocks or object layouts reads return a bad data length - Fix nfsroot so it can handle a 1024 byte long path. - Fix bad usage of page offset in bl_read_pagelist - Various NFSv4 callback cleanups+fixes - Fix GETATTR bitmap verification - Support hexadecimal number for sunrpc debug sysctl files" * tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (53 commits) Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug nfs: Fix GETATTR bitmap verification nfs: Remove unused xdr page offsets in getacl/setacl arguments fs/nfs: remove unnecessary new_valid_dev check SUNRPC: fix variable type NFS: Enable client side NFSv4.1 backchannel to use other transports pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors SUNRPC: Remove the TCP-only restriction in bc_svc_process() svcrdma: Add backward direction service for RPC/RDMA transport xprtrdma: Handle incoming backward direction RPC calls xprtrdma: Add support for sending backward direction RPC replies xprtrdma: Pre-allocate Work Requests for backchannel xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers SUNRPC: Abstract backchannel operations xprtrdma: Saving IRQs no longer needed for rb_lock xprtrdma: Remove reply tasklet xprtrdma: Use workqueue to process RPC/RDMA replies xprtrdma: Replace send and receive arrays xprtrdma: Refactor reply handler error handling ... --- e6604ecb70d4b1dbc0372c6518b51c25c4b135a1 diff --combined fs/nfs/nfs4proc.c index 0e5ff69455c7,7ed8f2cd97f8..ff5bddc49a2a --- a/fs/nfs/nfs4proc.c +++ b/fs/nfs/nfs4proc.c @@@ -78,7 -78,6 +78,6 @@@ struct nfs4_opendata static int _nfs4_proc_open(struct nfs4_opendata *data); static int _nfs4_recover_proc_open(struct nfs4_opendata *data); static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *); - static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *, long *); static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr); static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label); static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label); @@@ -239,6 -238,7 +238,7 @@@ const u32 nfs4_fsinfo_bitmap[3] = { FAT FATTR4_WORD1_TIME_DELTA | FATTR4_WORD1_FS_LAYOUT_TYPES, FATTR4_WORD2_LAYOUT_BLKSIZE + | FATTR4_WORD2_CLONE_BLKSIZE }; const u32 nfs4_fs_locations_bitmap[3] = { @@@ -344,13 -344,16 +344,16 @@@ static int nfs4_delay(struct rpc_clnt * /* This is the error handling routine for processes that are allowed * to sleep. */ - int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception) + static int nfs4_do_handle_exception(struct nfs_server *server, + int errorcode, struct nfs4_exception *exception) { struct nfs_client *clp = server->nfs_client; struct nfs4_state *state = exception->state; struct inode *inode = exception->inode; int ret = errorcode; + exception->delay = 0; + exception->recovering = 0; exception->retry = 0; switch(errorcode) { case 0: @@@ -359,11 -362,9 +362,9 @@@ case -NFS4ERR_DELEG_REVOKED: case -NFS4ERR_ADMIN_REVOKED: case -NFS4ERR_BAD_STATEID: - if (inode && nfs4_have_delegation(inode, FMODE_READ)) { - nfs4_inode_return_delegation(inode); - exception->retry = 1; - return 0; - } + if (inode && nfs_async_inode_return_delegation(inode, + NULL) == 0) + goto wait_on_recovery; if (state == NULL) break; ret = nfs4_schedule_stateid_recovery(server, state); @@@ -409,11 -410,12 +410,12 @@@ ret = -EBUSY; break; } - case -NFS4ERR_GRACE: case -NFS4ERR_DELAY: - ret = nfs4_delay(server->client, &exception->timeout); - if (ret != 0) - break; + nfs_inc_server_stats(server, NFSIOS_DELAY); + case -NFS4ERR_GRACE: + exception->delay = 1; + return 0; + case -NFS4ERR_RETRY_UNCACHED_REP: case -NFS4ERR_OLD_STATEID: exception->retry = 1; @@@ -434,14 -436,85 +436,85 @@@ /* We failed to handle the error */ return nfs4_map_errors(ret); wait_on_recovery: - ret = nfs4_wait_clnt_recover(clp); + exception->recovering = 1; + return 0; + } + + /* This is the error handling routine for processes that are allowed + * to sleep. + */ + int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception) + { + struct nfs_client *clp = server->nfs_client; + int ret; + + ret = nfs4_do_handle_exception(server, errorcode, exception); + if (exception->delay) { + ret = nfs4_delay(server->client, &exception->timeout); + goto out_retry; + } + if (exception->recovering) { + ret = nfs4_wait_clnt_recover(clp); + if (test_bit(NFS_MIG_FAILED, &server->mig_status)) + return -EIO; + goto out_retry; + } + return ret; + out_retry: + if (ret == 0) + exception->retry = 1; + return ret; + } + + static int + nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server, + int errorcode, struct nfs4_exception *exception) + { + struct nfs_client *clp = server->nfs_client; + int ret; + + ret = nfs4_do_handle_exception(server, errorcode, exception); + if (exception->delay) { + rpc_delay(task, nfs4_update_delay(&exception->timeout)); + goto out_retry; + } + if (exception->recovering) { + rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL); + if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0) + rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task); + goto out_retry; + } if (test_bit(NFS_MIG_FAILED, &server->mig_status)) - return -EIO; + ret = -EIO; + return ret; + out_retry: if (ret == 0) exception->retry = 1; return ret; } + static int + nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server, + struct nfs4_state *state, long *timeout) + { + struct nfs4_exception exception = { + .state = state, + }; + + if (task->tk_status >= 0) + return 0; + if (timeout) + exception.timeout = *timeout; + task->tk_status = nfs4_async_handle_exception(task, server, + task->tk_status, + &exception); + if (exception.delay && timeout) + *timeout = exception.timeout; + if (exception.retry) + return -EAGAIN; + return 0; + } + /* * Return 'true' if 'clp' is using an rpc_client that is integrity protected * or 'false' otherwise. @@@ -4530,7 -4603,7 +4603,7 @@@ static inline int nfs4_server_supports_ #define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE) static int buf_to_pages_noslab(const void *buf, size_t buflen, - struct page **pages, unsigned int *pgbase) + struct page **pages) { struct page *newpage, **spages; int rc = 0; @@@ -4674,7 -4747,6 +4747,6 @@@ static ssize_t __nfs4_get_acl_uncached( goto out_free; args.acl_len = npages * PAGE_SIZE; - args.acl_pgbase = 0; dprintk("%s buf %p buflen %zu npages %d args.acl_len %zu\n", __func__, buf, buflen, npages, args.acl_len); @@@ -4766,7 -4838,7 +4838,7 @@@ static int __nfs4_proc_set_acl(struct i return -EOPNOTSUPP; if (npages > ARRAY_SIZE(pages)) return -ERANGE; - i = buf_to_pages_noslab(buf, buflen, arg.acl_pages, &arg.acl_pgbase); + i = buf_to_pages_noslab(buf, buflen, arg.acl_pages); if (i < 0) return i; nfs4_inode_return_delegation(inode); @@@ -4955,79 -5027,6 +5027,6 @@@ out #endif /* CONFIG_NFS_V4_SECURITY_LABEL */ - static int - nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server, - struct nfs4_state *state, long *timeout) - { - struct nfs_client *clp = server->nfs_client; - - if (task->tk_status >= 0) - return 0; - switch(task->tk_status) { - case -NFS4ERR_DELEG_REVOKED: - case -NFS4ERR_ADMIN_REVOKED: - case -NFS4ERR_BAD_STATEID: - case -NFS4ERR_OPENMODE: - if (state == NULL) - break; - if (nfs4_schedule_stateid_recovery(server, state) < 0) - goto recovery_failed; - goto wait_on_recovery; - case -NFS4ERR_EXPIRED: - if (state != NULL) { - if (nfs4_schedule_stateid_recovery(server, state) < 0) - goto recovery_failed; - } - case -NFS4ERR_STALE_STATEID: - case -NFS4ERR_STALE_CLIENTID: - nfs4_schedule_lease_recovery(clp); - goto wait_on_recovery; - case -NFS4ERR_MOVED: - if (nfs4_schedule_migration_recovery(server) < 0) - goto recovery_failed; - goto wait_on_recovery; - case -NFS4ERR_LEASE_MOVED: - nfs4_schedule_lease_moved_recovery(clp); - goto wait_on_recovery; - #if defined(CONFIG_NFS_V4_1) - case -NFS4ERR_BADSESSION: - case -NFS4ERR_BADSLOT: - case -NFS4ERR_BAD_HIGH_SLOT: - case -NFS4ERR_DEADSESSION: - case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION: - case -NFS4ERR_SEQ_FALSE_RETRY: - case -NFS4ERR_SEQ_MISORDERED: - dprintk("%s ERROR %d, Reset session\n", __func__, - task->tk_status); - nfs4_schedule_session_recovery(clp->cl_session, task->tk_status); - goto wait_on_recovery; - #endif /* CONFIG_NFS_V4_1 */ - case -NFS4ERR_DELAY: - nfs_inc_server_stats(server, NFSIOS_DELAY); - rpc_delay(task, nfs4_update_delay(timeout)); - goto restart_call; - case -NFS4ERR_GRACE: - rpc_delay(task, NFS4_POLL_RETRY_MAX); - case -NFS4ERR_RETRY_UNCACHED_REP: - case -NFS4ERR_OLD_STATEID: - goto restart_call; - } - task->tk_status = nfs4_map_errors(task->tk_status); - return 0; - recovery_failed: - task->tk_status = -EIO; - return 0; - wait_on_recovery: - rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL); - if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0) - rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task); - if (test_bit(NFS_MIG_FAILED, &server->mig_status)) - goto recovery_failed; - restart_call: - task->tk_status = 0; - return -EAGAIN; - } - static void nfs4_init_boot_verifier(const struct nfs_client *clp, nfs4_verifier *bootverf) { @@@ -5513,7 -5512,18 +5512,7 @@@ static int nfs4_proc_getlk(struct nfs4_ static int do_vfs_lock(struct inode *inode, struct file_lock *fl) { - int res = 0; - switch (fl->fl_flags & (FL_POSIX|FL_FLOCK)) { - case FL_POSIX: - res = posix_lock_inode_wait(inode, fl); - break; - case FL_FLOCK: - res = flock_lock_inode_wait(inode, fl); - break; - default: - BUG(); - } - return res; + return locks_lock_inode_wait(inode, fl); } struct nfs4_unlockdata { @@@ -5522,7 -5532,7 +5521,7 @@@ struct nfs4_lock_state *lsp; struct nfs_open_context *ctx; struct file_lock fl; - const struct nfs_server *server; + struct nfs_server *server; unsigned long timestamp; }; @@@ -8718,7 -8728,8 +8717,8 @@@ static const struct nfs4_minor_version_ | NFS_CAP_ALLOCATE | NFS_CAP_DEALLOCATE | NFS_CAP_SEEK - | NFS_CAP_LAYOUTSTATS, + | NFS_CAP_LAYOUTSTATS + | NFS_CAP_CLONE, .init_client = nfs41_init_client, .shutdown_client = nfs41_shutdown_client, .match_stateid = nfs41_match_stateid, diff --combined include/linux/sunrpc/svc_rdma.h index 1e4438ea2380,fb4013edcf57..f869807a0d0e --- a/include/linux/sunrpc/svc_rdma.h +++ b/include/linux/sunrpc/svc_rdma.h @@@ -105,9 -105,11 +105,9 @@@ struct svc_rdma_chunk_sge }; struct svc_rdma_fastreg_mr { struct ib_mr *mr; - void *kva; - struct ib_fast_reg_page_list *page_list; - int page_list_len; + struct scatterlist *sg; + int sg_nents; unsigned long access_flags; - unsigned long map_len; enum dma_data_direction direction; struct list_head frmr_list; }; @@@ -226,9 -228,13 +226,13 @@@ extern void svc_rdma_put_frmr(struct sv struct svc_rdma_fastreg_mr *); extern void svc_sq_reap(struct svcxprt_rdma *); extern void svc_rq_reap(struct svcxprt_rdma *); - extern struct svc_xprt_class svc_rdma_class; extern void svc_rdma_prep_reply_hdr(struct svc_rqst *); + extern struct svc_xprt_class svc_rdma_class; + #ifdef CONFIG_SUNRPC_BACKCHANNEL + extern struct svc_xprt_class svc_rdma_bc_class; + #endif + /* svc_rdma.c */ extern int svc_rdma_init(void); extern void svc_rdma_cleanup(void); diff --combined net/sunrpc/xprtrdma/frwr_ops.c index a1434447b0d6,0a362397e434..88cf9e7269c2 --- a/net/sunrpc/xprtrdma/frwr_ops.c +++ b/net/sunrpc/xprtrdma/frwr_ops.c @@@ -151,13 -151,9 +151,13 @@@ __frwr_init(struct rpcrdma_mw *r, struc f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth); if (IS_ERR(f->fr_mr)) goto out_mr_err; - f->fr_pgl = ib_alloc_fast_reg_page_list(device, depth); - if (IS_ERR(f->fr_pgl)) + + f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL); + if (!f->sg) goto out_list_err; + + sg_init_table(f->sg, depth); + return 0; out_mr_err: @@@ -167,9 -163,9 +167,9 @@@ return rc; out_list_err: - rc = PTR_ERR(f->fr_pgl); - dprintk("RPC: %s: ib_alloc_fast_reg_page_list status %i\n", - __func__, rc); + rc = -ENOMEM; + dprintk("RPC: %s: sg allocation failure\n", + __func__); ib_dereg_mr(f->fr_mr); return rc; } @@@ -183,7 -179,7 +183,7 @@@ __frwr_release(struct rpcrdma_mw *r if (rc) dprintk("RPC: %s: ib_dereg_mr status %i\n", __func__, rc); - ib_free_fast_reg_page_list(r->r.frmr.fr_pgl); + kfree(r->r.frmr.sg); } static int @@@ -256,8 -252,11 +256,11 @@@ frwr_sendcompletion(struct ib_wc *wc /* WARNING: Only wr_id and status are reliable at this point */ r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id; - pr_warn("RPC: %s: frmr %p flushed, status %s (%d)\n", - __func__, r, ib_wc_status_msg(wc->status), wc->status); + if (wc->status == IB_WC_WR_FLUSH_ERR) + dprintk("RPC: %s: frmr %p flushed\n", __func__, r); + else + pr_warn("RPC: %s: frmr %p error, status %s (%d)\n", + __func__, r, ib_wc_status_msg(wc->status), wc->status); r->r.frmr.fr_state = FRMR_IS_STALE; } @@@ -316,10 -315,13 +319,10 @@@ frwr_op_map(struct rpcrdma_xprt *r_xprt struct rpcrdma_mw *mw; struct rpcrdma_frmr *frmr; struct ib_mr *mr; - struct ib_send_wr fastreg_wr, *bad_wr; + struct ib_reg_wr reg_wr; + struct ib_send_wr *bad_wr; + int rc, i, n, dma_nents; u8 key; - int len, pageoff; - int i, rc; - int seg_len; - u64 pa; - int page_no; mw = seg1->rl_mw; seg1->rl_mw = NULL; @@@ -332,80 -334,64 +335,80 @@@ } while (mw->r.frmr.fr_state != FRMR_IS_INVALID); frmr = &mw->r.frmr; frmr->fr_state = FRMR_IS_VALID; + mr = frmr->fr_mr; - pageoff = offset_in_page(seg1->mr_offset); - seg1->mr_offset -= pageoff; /* start of page */ - seg1->mr_len += pageoff; - len = -pageoff; if (nsegs > ia->ri_max_frmr_depth) nsegs = ia->ri_max_frmr_depth; - for (page_no = i = 0; i < nsegs;) { - rpcrdma_map_one(device, seg, direction); - pa = seg->mr_dma; - for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) { - frmr->fr_pgl->page_list[page_no++] = pa; - pa += PAGE_SIZE; - } - len += seg->mr_len; + for (i = 0; i < nsegs;) { + if (seg->mr_page) + sg_set_page(&frmr->sg[i], + seg->mr_page, + seg->mr_len, + offset_in_page(seg->mr_offset)); + else + sg_set_buf(&frmr->sg[i], seg->mr_offset, + seg->mr_len); + ++seg; ++i; + /* Check for holes */ if ((i < nsegs && offset_in_page(seg->mr_offset)) || offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len)) break; } - dprintk("RPC: %s: Using frmr %p to map %d segments (%d bytes)\n", - __func__, mw, i, len); - - memset(&fastreg_wr, 0, sizeof(fastreg_wr)); - fastreg_wr.wr_id = (unsigned long)(void *)mw; - fastreg_wr.opcode = IB_WR_FAST_REG_MR; - fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma + pageoff; - fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl; - fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT; - fastreg_wr.wr.fast_reg.page_list_len = page_no; - fastreg_wr.wr.fast_reg.length = len; - fastreg_wr.wr.fast_reg.access_flags = writing ? - IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : - IB_ACCESS_REMOTE_READ; - mr = frmr->fr_mr; + frmr->sg_nents = i; + + dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction); + if (!dma_nents) { + pr_err("RPC: %s: failed to dma map sg %p sg_nents %u\n", + __func__, frmr->sg, frmr->sg_nents); + return -ENOMEM; + } + + n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE); + if (unlikely(n != frmr->sg_nents)) { + pr_err("RPC: %s: failed to map mr %p (%u/%u)\n", + __func__, frmr->fr_mr, n, frmr->sg_nents); + rc = n < 0 ? n : -EINVAL; + goto out_senderr; + } + + dprintk("RPC: %s: Using frmr %p to map %u segments (%u bytes)\n", + __func__, mw, frmr->sg_nents, mr->length); + key = (u8)(mr->rkey & 0x000000FF); ib_update_fast_reg_key(mr, ++key); - fastreg_wr.wr.fast_reg.rkey = mr->rkey; + + reg_wr.wr.next = NULL; + reg_wr.wr.opcode = IB_WR_REG_MR; + reg_wr.wr.wr_id = (uintptr_t)mw; + reg_wr.wr.num_sge = 0; + reg_wr.wr.send_flags = 0; + reg_wr.mr = mr; + reg_wr.key = mr->rkey; + reg_wr.access = writing ? + IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE : + IB_ACCESS_REMOTE_READ; DECR_CQCOUNT(&r_xprt->rx_ep); - rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr); + rc = ib_post_send(ia->ri_id->qp, ®_wr.wr, &bad_wr); if (rc) goto out_senderr; + seg1->mr_dir = direction; seg1->rl_mw = mw; seg1->mr_rkey = mr->rkey; - seg1->mr_base = seg1->mr_dma + pageoff; - seg1->mr_nsegs = i; - seg1->mr_len = len; - return i; + seg1->mr_base = mr->iova; + seg1->mr_nsegs = frmr->sg_nents; + seg1->mr_len = mr->length; + + return frmr->sg_nents; out_senderr: dprintk("RPC: %s: ib_post_send status %i\n", __func__, rc); - while (i--) - rpcrdma_unmap_one(device, --seg); + ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction); __frwr_queue_recovery(mw); return rc; } @@@ -419,22 -405,22 +422,22 @@@ frwr_op_unmap(struct rpcrdma_xprt *r_xp struct rpcrdma_mr_seg *seg1 = seg; struct rpcrdma_ia *ia = &r_xprt->rx_ia; struct rpcrdma_mw *mw = seg1->rl_mw; + struct rpcrdma_frmr *frmr = &mw->r.frmr; struct ib_send_wr invalidate_wr, *bad_wr; int rc, nsegs = seg->mr_nsegs; dprintk("RPC: %s: FRMR %p\n", __func__, mw); seg1->rl_mw = NULL; - mw->r.frmr.fr_state = FRMR_IS_INVALID; + frmr->fr_state = FRMR_IS_INVALID; memset(&invalidate_wr, 0, sizeof(invalidate_wr)); invalidate_wr.wr_id = (unsigned long)(void *)mw; invalidate_wr.opcode = IB_WR_LOCAL_INV; - invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey; + invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey; DECR_CQCOUNT(&r_xprt->rx_ep); - while (seg1->mr_nsegs--) - rpcrdma_unmap_one(ia->ri_device, seg++); + ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir); read_lock(&ia->ri_qplock); rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr); read_unlock(&ia->ri_qplock); diff --combined net/sunrpc/xprtrdma/svc_rdma_transport.c index a266e870d870,a133b1e5b5f6..b348b4adef29 --- a/net/sunrpc/xprtrdma/svc_rdma_transport.c +++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c @@@ -56,6 -56,7 +56,7 @@@ #define RPCDBG_FACILITY RPCDBG_SVCXPRT + static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int); static struct svc_xprt *svc_rdma_create(struct svc_serv *serv, struct net *net, struct sockaddr *sa, int salen, @@@ -95,6 -96,63 +96,63 @@@ struct svc_xprt_class svc_rdma_class = .xcl_ident = XPRT_TRANSPORT_RDMA, }; + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *, + struct sockaddr *, int, int); + static void svc_rdma_bc_detach(struct svc_xprt *); + static void svc_rdma_bc_free(struct svc_xprt *); + + static struct svc_xprt_ops svc_rdma_bc_ops = { + .xpo_create = svc_rdma_bc_create, + .xpo_detach = svc_rdma_bc_detach, + .xpo_free = svc_rdma_bc_free, + .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr, + .xpo_secure_port = svc_rdma_secure_port, + }; + + struct svc_xprt_class svc_rdma_bc_class = { + .xcl_name = "rdma-bc", + .xcl_owner = THIS_MODULE, + .xcl_ops = &svc_rdma_bc_ops, + .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN) + }; + + static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv, + struct net *net, + struct sockaddr *sa, int salen, + int flags) + { + struct svcxprt_rdma *cma_xprt; + struct svc_xprt *xprt; + + cma_xprt = rdma_create_xprt(serv, 0); + if (!cma_xprt) + return ERR_PTR(-ENOMEM); + xprt = &cma_xprt->sc_xprt; + + svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv); + serv->sv_bc_xprt = xprt; + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + return xprt; + } + + static void svc_rdma_bc_detach(struct svc_xprt *xprt) + { + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + } + + static void svc_rdma_bc_free(struct svc_xprt *xprt) + { + struct svcxprt_rdma *rdma = + container_of(xprt, struct svcxprt_rdma, sc_xprt); + + dprintk("svcrdma: %s(%p)\n", __func__, xprt); + if (xprt) + kfree(rdma); + } + #endif /* CONFIG_SUNRPC_BACKCHANNEL */ + struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) { struct svc_rdma_op_ctxt *ctxt; @@@ -692,8 -750,8 +750,8 @@@ static struct svc_xprt *svc_rdma_create if (!cma_xprt) return ERR_PTR(-ENOMEM); - listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP, - IB_QPT_RC); + listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt, + RDMA_PS_TCP, IB_QPT_RC); if (IS_ERR(listen_id)) { ret = PTR_ERR(listen_id); dprintk("svcrdma: rdma_create_id failed = %d\n", ret); @@@ -732,7 -790,7 +790,7 @@@ static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt) { struct ib_mr *mr; - struct ib_fast_reg_page_list *pl; + struct scatterlist *sg; struct svc_rdma_fastreg_mr *frmr; u32 num_sg; @@@ -745,14 -803,13 +803,14 @@@ if (IS_ERR(mr)) goto err_free_frmr; - pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device, - num_sg); - if (IS_ERR(pl)) + sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL); + if (!sg) goto err_free_mr; + sg_init_table(sg, RPCSVC_MAXPAGES); + frmr->mr = mr; - frmr->page_list = pl; + frmr->sg = sg; INIT_LIST_HEAD(&frmr->frmr_list); return frmr; @@@ -772,8 -829,8 +830,8 @@@ static void rdma_dealloc_frmr_q(struct frmr = list_entry(xprt->sc_frmr_q.next, struct svc_rdma_fastreg_mr, frmr_list); list_del_init(&frmr->frmr_list); + kfree(frmr->sg); ib_dereg_mr(frmr->mr); - ib_free_fast_reg_page_list(frmr->page_list); kfree(frmr); } } @@@ -787,7 -844,8 +845,7 @@@ struct svc_rdma_fastreg_mr *svc_rdma_ge frmr = list_entry(rdma->sc_frmr_q.next, struct svc_rdma_fastreg_mr, frmr_list); list_del_init(&frmr->frmr_list); - frmr->map_len = 0; - frmr->page_list_len = 0; + frmr->sg_nents = 0; } spin_unlock_bh(&rdma->sc_frmr_q_lock); if (frmr) @@@ -796,13 -854,25 +854,13 @@@ return rdma_alloc_frmr(rdma); } -static void frmr_unmap_dma(struct svcxprt_rdma *xprt, - struct svc_rdma_fastreg_mr *frmr) -{ - int page_no; - for (page_no = 0; page_no < frmr->page_list_len; page_no++) { - dma_addr_t addr = frmr->page_list->page_list[page_no]; - if (ib_dma_mapping_error(frmr->mr->device, addr)) - continue; - atomic_dec(&xprt->sc_dma_used); - ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE, - frmr->direction); - } -} - void svc_rdma_put_frmr(struct svcxprt_rdma *rdma, struct svc_rdma_fastreg_mr *frmr) { if (frmr) { - frmr_unmap_dma(rdma, frmr); + ib_dma_unmap_sg(rdma->sc_cm_id->device, + frmr->sg, frmr->sg_nents, frmr->direction); + atomic_dec(&rdma->sc_dma_used); spin_lock_bh(&rdma->sc_frmr_q_lock); WARN_ON_ONCE(!list_empty(&frmr->frmr_list)); list_add(&frmr->frmr_list, &rdma->sc_frmr_q); diff --combined net/sunrpc/xprtrdma/verbs.c index f63369bd01c5,93883ffb86e0..eadd1655145a --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@@ -68,47 -68,33 +68,33 @@@ * internal functions */ - /* - * handle replies in tasklet context, using a single, global list - * rdma tasklet function -- just turn around and call the func - * for all replies on the list - */ - - static DEFINE_SPINLOCK(rpcrdma_tk_lock_g); - static LIST_HEAD(rpcrdma_tasklets_g); + static struct workqueue_struct *rpcrdma_receive_wq; - static void - rpcrdma_run_tasklet(unsigned long data) + int + rpcrdma_alloc_wq(void) { - struct rpcrdma_rep *rep; - unsigned long flags; - - data = data; - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - while (!list_empty(&rpcrdma_tasklets_g)) { - rep = list_entry(rpcrdma_tasklets_g.next, - struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); + struct workqueue_struct *recv_wq; - rpcrdma_reply_handler(rep); + recv_wq = alloc_workqueue("xprtrdma_receive", + WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI, + 0); + if (!recv_wq) + return -ENOMEM; - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - } - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); + rpcrdma_receive_wq = recv_wq; + return 0; } - static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL); - - static void - rpcrdma_schedule_tasklet(struct list_head *sched_list) + void + rpcrdma_destroy_wq(void) { - unsigned long flags; + struct workqueue_struct *wq; - spin_lock_irqsave(&rpcrdma_tk_lock_g, flags); - list_splice_tail(sched_list, &rpcrdma_tasklets_g); - spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags); - tasklet_schedule(&rpcrdma_tasklet_g); + if (rpcrdma_receive_wq) { + wq = rpcrdma_receive_wq; + rpcrdma_receive_wq = NULL; + destroy_workqueue(wq); + } } static void @@@ -158,63 -144,54 +144,54 @@@ rpcrdma_sendcq_process_wc(struct ib_wc } } - static int - rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) + /* The common case is a single send completion is waiting. By + * passing two WC entries to ib_poll_cq, a return code of 1 + * means there is exactly one WC waiting and no more. We don't + * have to invoke ib_poll_cq again to know that the CQ has been + * properly drained. + */ + static void + rpcrdma_sendcq_poll(struct ib_cq *cq) { - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[2]; + int count, rc; - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_send_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - return rc; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_sendcq_process_wc(wcs++); - } while (rc == RPCRDMA_POLLSIZE && --budget); - return 0; + rpcrdma_sendcq_process_wc(pos++); + } while (rc == ARRAY_SIZE(wcs)); + return; } - /* - * Handle send, fast_reg_mr, and local_inv completions. - * - * Send events are typically suppressed and thus do not result - * in an upcall. Occasionally one is signaled, however. This - * prevents the provider's completion queue from wrapping and - * losing a completion. + /* Handle provider send completion upcalls. */ static void rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_sendcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } + do { + rpcrdma_sendcq_poll(cq); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); + } - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } + static void + rpcrdma_receive_worker(struct work_struct *work) + { + struct rpcrdma_rep *rep = + container_of(work, struct rpcrdma_rep, rr_work); - rpcrdma_sendcq_poll(cq, ep); + rpcrdma_reply_handler(rep); } static void - rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list) + rpcrdma_recvcq_process_wc(struct ib_wc *wc) { struct rpcrdma_rep *rep = (struct rpcrdma_rep *)(unsigned long)wc->wr_id; @@@ -237,91 -214,60 +214,60 @@@ prefetch(rdmab_to_msg(rep->rr_rdmabuf)); out_schedule: - list_add_tail(&rep->rr_list, sched_list); + queue_work(rpcrdma_receive_wq, &rep->rr_work); return; + out_fail: if (wc->status != IB_WC_WR_FLUSH_ERR) pr_err("RPC: %s: rep %p: %s\n", __func__, rep, ib_wc_status_msg(wc->status)); - rep->rr_len = ~0U; + rep->rr_len = RPCRDMA_BAD_LEN; goto out_schedule; } - static int - rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep) + /* The wc array is on stack: automatic memory is always CPU-local. + * + * struct ib_wc is 64 bytes, making the poll array potentially + * large. But this is at the bottom of the call chain. Further + * substantial work is done in another thread. + */ + static void + rpcrdma_recvcq_poll(struct ib_cq *cq) { - struct list_head sched_list; - struct ib_wc *wcs; - int budget, count, rc; + struct ib_wc *pos, wcs[4]; + int count, rc; - INIT_LIST_HEAD(&sched_list); - budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE; do { - wcs = ep->rep_recv_wcs; + pos = wcs; - rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs); - if (rc <= 0) - goto out_schedule; + rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos); + if (rc < 0) + break; count = rc; while (count-- > 0) - rpcrdma_recvcq_process_wc(wcs++, &sched_list); - } while (rc == RPCRDMA_POLLSIZE && --budget); - rc = 0; - - out_schedule: - rpcrdma_schedule_tasklet(&sched_list); - return rc; + rpcrdma_recvcq_process_wc(pos++); + } while (rc == ARRAY_SIZE(wcs)); } - /* - * Handle receive completions. - * - * It is reentrant but processes single events in order to maintain - * ordering of receives to keep server credits. - * - * It is the responsibility of the scheduled tasklet to return - * recv buffers to the pool. NOTE: this affects synchronization of - * connection shutdown. That is, the structures required for - * the completion of the reply handler must remain intact until - * all memory has been reclaimed. + /* Handle provider receive completion upcalls. */ static void rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context) { - struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context; - int rc; - - rc = rpcrdma_recvcq_poll(cq, ep); - if (rc) { - dprintk("RPC: %s: ib_poll_cq failed: %i\n", - __func__, rc); - return; - } - - rc = ib_req_notify_cq(cq, - IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS); - if (rc == 0) - return; - if (rc < 0) { - dprintk("RPC: %s: ib_req_notify_cq failed: %i\n", - __func__, rc); - return; - } - - rpcrdma_recvcq_poll(cq, ep); + do { + rpcrdma_recvcq_poll(cq); + } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP | + IB_CQ_REPORT_MISSED_EVENTS) > 0); } static void rpcrdma_flush_cqs(struct rpcrdma_ep *ep) { struct ib_wc wc; - LIST_HEAD(sched_list); while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0) - rpcrdma_recvcq_process_wc(&wc, &sched_list); - if (!list_empty(&sched_list)) - rpcrdma_schedule_tasklet(&sched_list); + rpcrdma_recvcq_process_wc(&wc); while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0) rpcrdma_sendcq_process_wc(&wc); } @@@ -432,8 -378,7 +378,8 @@@ rpcrdma_create_id(struct rpcrdma_xprt * init_completion(&ia->ri_done); - id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC); + id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, + IB_QPT_RC); if (IS_ERR(id)) { rc = PTR_ERR(id); dprintk("RPC: %s: rdma_create_id() failed %i\n", @@@ -623,6 -568,7 +569,7 @@@ rpcrdma_ep_create(struct rpcrdma_ep *ep struct ib_device_attr *devattr = &ia->ri_devattr; struct ib_cq *sendcq, *recvcq; struct ib_cq_init_attr cq_attr = {}; + unsigned int max_qp_wr; int rc, err; if (devattr->max_sge < RPCRDMA_MAX_IOVS) { @@@ -631,18 -577,27 +578,27 @@@ return -ENOMEM; } + if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) { + dprintk("RPC: %s: insufficient wqe's available\n", + __func__); + return -ENOMEM; + } + max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS; + /* check provider's send/recv wr limits */ - if (cdata->max_requests > devattr->max_qp_wr) - cdata->max_requests = devattr->max_qp_wr; + if (cdata->max_requests > max_qp_wr) + cdata->max_requests = max_qp_wr; ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall; ep->rep_attr.qp_context = ep; ep->rep_attr.srq = NULL; ep->rep_attr.cap.max_send_wr = cdata->max_requests; + ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS; rc = ia->ri_ops->ro_open(ia, ep, cdata); if (rc) return rc; ep->rep_attr.cap.max_recv_wr = cdata->max_requests; + ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS; ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS; ep->rep_attr.cap.max_recv_sge = 1; ep->rep_attr.cap.max_inline_data = 0; @@@ -670,7 -625,7 +626,7 @@@ cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1; sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(sendcq)) { rc = PTR_ERR(sendcq); dprintk("RPC: %s: failed to create send CQ: %i\n", @@@ -687,7 -642,7 +643,7 @@@ cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1; recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall, - rpcrdma_cq_async_error_upcall, ep, &cq_attr); + rpcrdma_cq_async_error_upcall, NULL, &cq_attr); if (IS_ERR(recvcq)) { rc = PTR_ERR(recvcq); dprintk("RPC: %s: failed to create recv CQ: %i\n", @@@ -886,7 -841,21 +842,21 @@@ retry } rc = ep->rep_connected; } else { + struct rpcrdma_xprt *r_xprt; + unsigned int extras; + dprintk("RPC: %s: connected\n", __func__); + + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; + + if (extras) { + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); + if (rc) + pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n", + __func__, rc); + rc = 0; + } } out: @@@ -923,20 -892,25 +893,25 @@@ rpcrdma_ep_disconnect(struct rpcrdma_e } } - static struct rpcrdma_req * + struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), GFP_KERNEL); if (req == NULL) return ERR_PTR(-ENOMEM); + INIT_LIST_HEAD(&req->rl_free); + spin_lock(&buffer->rb_reqslock); + list_add(&req->rl_all, &buffer->rb_allreqs); + spin_unlock(&buffer->rb_reqslock); req->rl_buffer = &r_xprt->rx_buf; return req; } - static struct rpcrdma_rep * + struct rpcrdma_rep * rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; @@@ -958,6 -932,7 +933,7 @@@ rep->rr_device = ia->ri_device; rep->rr_rxprt = r_xprt; + INIT_WORK(&rep->rr_work, rpcrdma_receive_worker); return rep; out_free: @@@ -971,44 -946,21 +947,21 @@@ rpcrdma_buffer_create(struct rpcrdma_xp { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - char *p; - size_t len; int i, rc; - buf->rb_max_requests = cdata->max_requests; + buf->rb_max_requests = r_xprt->rx_data.max_requests; + buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); - /* Need to allocate: - * 1. arrays for send and recv pointers - * 2. arrays of struct rpcrdma_req to fill in pointers - * 3. array of struct rpcrdma_rep for replies - * Send/recv buffers in req/rep need to be registered - */ - len = buf->rb_max_requests * - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - - p = kzalloc(len, GFP_KERNEL); - if (p == NULL) { - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n", - __func__, len); - rc = -ENOMEM; - goto out; - } - buf->rb_pool = p; /* for freeing it later */ - - buf->rb_send_bufs = (struct rpcrdma_req **) p; - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests]; - buf->rb_recv_bufs = (struct rpcrdma_rep **) p; - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - rc = ia->ri_ops->ro_init(r_xprt); if (rc) goto out; + INIT_LIST_HEAD(&buf->rb_send_bufs); + INIT_LIST_HEAD(&buf->rb_allreqs); + spin_lock_init(&buf->rb_reqslock); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; - struct rpcrdma_rep *rep; req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) { @@@ -1017,7 -969,13 +970,13 @@@ rc = PTR_ERR(req); goto out; } - buf->rb_send_bufs[i] = req; + req->rl_backchannel = false; + list_add(&req->rl_free, &buf->rb_send_bufs); + } + + INIT_LIST_HEAD(&buf->rb_recv_bufs); + for (i = 0; i < buf->rb_max_requests + 2; i++) { + struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); if (IS_ERR(rep)) { @@@ -1026,7 -984,7 +985,7 @@@ rc = PTR_ERR(rep); goto out; } - buf->rb_recv_bufs[i] = rep; + list_add(&rep->rr_list, &buf->rb_recv_bufs); } return 0; @@@ -1035,22 -993,38 +994,38 @@@ out return rc; } + static struct rpcrdma_req * + rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf) + { + struct rpcrdma_req *req; + + req = list_first_entry(&buf->rb_send_bufs, + struct rpcrdma_req, rl_free); + list_del(&req->rl_free); + return req; + } + + static struct rpcrdma_rep * + rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf) + { + struct rpcrdma_rep *rep; + + rep = list_first_entry(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + list_del(&rep->rr_list); + return rep; + } + static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { - if (!rep) - return; - rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); kfree(rep); } - static void + void rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) { - if (!req) - return; - rpcrdma_free_regbuf(ia, req->rl_sendbuf); rpcrdma_free_regbuf(ia, req->rl_rdmabuf); kfree(req); @@@ -1060,25 -1034,29 +1035,29 @@@ voi rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); - int i; - /* clean up in reverse order from create - * 1. recv mr memory (mr free, then kfree) - * 2. send mr memory (mr free, then kfree) - * 3. MWs - */ - dprintk("RPC: %s: entering\n", __func__); + while (!list_empty(&buf->rb_recv_bufs)) { + struct rpcrdma_rep *rep; - for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs) - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); - if (buf->rb_send_bufs) - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); + rep = rpcrdma_buffer_get_rep_locked(buf); + rpcrdma_destroy_rep(ia, rep); } - ia->ri_ops->ro_destroy(buf); + spin_lock(&buf->rb_reqslock); + while (!list_empty(&buf->rb_allreqs)) { + struct rpcrdma_req *req; + + req = list_first_entry(&buf->rb_allreqs, + struct rpcrdma_req, rl_all); + list_del(&req->rl_all); + + spin_unlock(&buf->rb_reqslock); + rpcrdma_destroy_req(ia, req); + spin_lock(&buf->rb_reqslock); + } + spin_unlock(&buf->rb_reqslock); - kfree(buf->rb_pool); + ia->ri_ops->ro_destroy(buf); } struct rpcrdma_mw * @@@ -1110,53 -1088,34 +1089,34 @@@ rpcrdma_put_mw(struct rpcrdma_xprt *r_x spin_unlock(&buf->rb_mwlock); } - static void - rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) - { - buf->rb_send_bufs[--buf->rb_send_index] = req; - req->rl_niovs = 0; - if (req->rl_reply) { - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; - req->rl_reply = NULL; - } - } - /* * Get a set of request/reply buffers. * - * Reply buffer (if needed) is attached to send buffer upon return. - * Rule: - * rb_send_index and rb_recv_index MUST always be pointing to the - * *next* available buffer (non-NULL). They are incremented after - * removing buffers, and decremented *before* returning them. + * Reply buffer (if available) is attached to send buffer upon return. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) { struct rpcrdma_req *req; - unsigned long flags; - - spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_send_index == buffers->rb_max_requests) { - spin_unlock_irqrestore(&buffers->rb_lock, flags); - dprintk("RPC: %s: out of request buffers\n", __func__); - return ((struct rpcrdma_req *)NULL); - } - - req = buffers->rb_send_bufs[buffers->rb_send_index]; - if (buffers->rb_send_index < buffers->rb_recv_index) { - dprintk("RPC: %s: %d extra receives outstanding (ok)\n", - __func__, - buffers->rb_recv_index - buffers->rb_send_index); - req->rl_reply = NULL; - } else { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; + spin_lock(&buffers->rb_lock); + if (list_empty(&buffers->rb_send_bufs)) + goto out_reqbuf; + req = rpcrdma_buffer_get_req_locked(buffers); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_repbuf; + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock(&buffers->rb_lock); + return req; - spin_unlock_irqrestore(&buffers->rb_lock, flags); + out_reqbuf: + spin_unlock(&buffers->rb_lock); + pr_warn("RPC: %s: out of request buffers\n", __func__); + return NULL; + out_repbuf: + spin_unlock(&buffers->rb_lock); + pr_warn("RPC: %s: out of reply buffers\n", __func__); + req->rl_reply = NULL; return req; } @@@ -1168,30 -1127,31 +1128,31 @@@ voi rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; - unsigned long flags; + struct rpcrdma_rep *rep = req->rl_reply; - spin_lock_irqsave(&buffers->rb_lock, flags); - rpcrdma_buffer_put_sendbuf(req, buffers); - spin_unlock_irqrestore(&buffers->rb_lock, flags); + req->rl_niovs = 0; + req->rl_reply = NULL; + + spin_lock(&buffers->rb_lock); + list_add_tail(&req->rl_free, &buffers->rb_send_bufs); + if (rep) + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock(&buffers->rb_lock); } /* * Recover reply buffers from pool. - * This happens when recovering from error conditions. - * Post-increment counter/array index. + * This happens when recovering from disconnect. */ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_recv_index < buffers->rb_max_requests) { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); + if (!list_empty(&buffers->rb_recv_bufs)) + req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock(&buffers->rb_lock); } /* @@@ -1202,11 -1162,10 +1163,10 @@@ voi rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) { struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf; - unsigned long flags; - spin_lock_irqsave(&buffers->rb_lock, flags); - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; - spin_unlock_irqrestore(&buffers->rb_lock, flags); + spin_lock(&buffers->rb_lock); + list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock(&buffers->rb_lock); } /* @@@ -1363,6 -1322,47 +1323,47 @@@ rpcrdma_ep_post_recv(struct rpcrdma_ia return rc; } + /** + * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests + * @r_xprt: transport associated with these backchannel resources + * @min_reqs: minimum number of incoming requests expected + * + * Returns zero if all requested buffers were posted, or a negative errno. + */ + int + rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) + { + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc; + + while (count--) { + spin_lock_irqsave(&buffers->rb_lock, flags); + if (list_empty(&buffers->rb_recv_bufs)) + goto out_reqbuf; + rep = rpcrdma_buffer_get_rep_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + + rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (rc) + goto out_rc; + } + + return 0; + + out_reqbuf: + spin_unlock_irqrestore(&buffers->rb_lock, flags); + pr_warn("%s: no extra receive buffers\n", __func__); + return -ENOMEM; + + out_rc: + rpcrdma_recv_buffer_put(rep); + return rc; + } + /* How many chunk list items fit within our inline buffers? */ unsigned int diff --combined net/sunrpc/xprtrdma/xprt_rdma.h index c82abf44e39d,f8dd17be9f43..ac7f8d4f632a --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@@ -77,9 -77,6 +77,6 @@@ struct rpcrdma_ia * RDMA Endpoint -- one per transport instance */ - #define RPCRDMA_WC_BUDGET (128) - #define RPCRDMA_POLLSIZE (16) - struct rpcrdma_ep { atomic_t rep_cqcount; int rep_cqinit; @@@ -89,8 -86,6 +86,6 @@@ struct rdma_conn_param rep_remote_cma; struct sockaddr_storage rep_remote_addr; struct delayed_work rep_connect_worker; - struct ib_wc rep_send_wcs[RPCRDMA_POLLSIZE]; - struct ib_wc rep_recv_wcs[RPCRDMA_POLLSIZE]; }; /* @@@ -106,6 -101,16 +101,16 @@@ */ #define RPCRDMA_IGNORE_COMPLETION (0ULL) + /* Pre-allocate extra Work Requests for handling backward receives + * and sends. This is a fixed value because the Work Queues are + * allocated when the forward channel is set up. + */ + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + #define RPCRDMA_BACKWARD_WRS (8) + #else + #define RPCRDMA_BACKWARD_WRS (0) + #endif + /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV * * The below structure appears at the front of a large region of kmalloc'd @@@ -169,10 -174,13 +174,13 @@@ struct rpcrdma_rep unsigned int rr_len; struct ib_device *rr_device; struct rpcrdma_xprt *rr_rxprt; + struct work_struct rr_work; struct list_head rr_list; struct rpcrdma_regbuf *rr_rdmabuf; }; + #define RPCRDMA_BAD_LEN (~0U) + /* * struct rpcrdma_mw - external memory region metadata * @@@ -193,8 -201,7 +201,8 @@@ enum rpcrdma_frmr_state }; struct rpcrdma_frmr { - struct ib_fast_reg_page_list *fr_pgl; + struct scatterlist *sg; + int sg_nents; struct ib_mr *fr_mr; enum rpcrdma_frmr_state fr_state; struct work_struct fr_work; @@@ -256,6 -263,7 +264,7 @@@ struct rpcrdma_mr_seg { /* chunk descr #define RPCRDMA_MAX_IOVS (2) struct rpcrdma_req { + struct list_head rl_free; unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; @@@ -265,6 -273,9 +274,9 @@@ struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + + struct list_head rl_all; + bool rl_backchannel; }; static inline struct rpcrdma_req * @@@ -289,12 -300,14 +301,14 @@@ struct rpcrdma_buffer struct list_head rb_all; char *rb_pool; - spinlock_t rb_lock; /* protect buf arrays */ + spinlock_t rb_lock; /* protect buf lists */ + struct list_head rb_send_bufs; + struct list_head rb_recv_bufs; u32 rb_max_requests; - int rb_send_index; - int rb_recv_index; - struct rpcrdma_req **rb_send_bufs; - struct rpcrdma_rep **rb_recv_bufs; + + u32 rb_bc_srv_max_requests; + spinlock_t rb_reqslock; /* protect rb_allreqs */ + struct list_head rb_allreqs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@@ -340,6 -353,7 +354,7 @@@ struct rpcrdma_stats unsigned long failed_marshal_count; unsigned long bad_reply_count; unsigned long nomsg_call_count; + unsigned long bcall_count; }; /* @@@ -415,6 -429,9 +430,9 @@@ int rpcrdma_ep_post_recv(struct rpcrdma /* * Buffer calls - xprtrdma/verbs.c */ + struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); + struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); + void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@@ -431,10 -448,14 +449,14 @@@ void rpcrdma_free_regbuf(struct rpcrdma struct rpcrdma_regbuf *); unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); + int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); + int rpcrdma_alloc_wq(void); + void rpcrdma_destroy_wq(void); + /* * Wrappers for chunk registration, shared by read/write chunk code. */ @@@ -495,6 -516,18 +517,18 @@@ int rpcrdma_marshal_req(struct rpc_rqs int xprt_rdma_init(void); void xprt_rdma_cleanup(void); + /* Backchannel calls - xprtrdma/backchannel.c + */ + #if defined(CONFIG_SUNRPC_BACKCHANNEL) + int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); + int xprt_rdma_bc_up(struct svc_serv *, struct net *); + int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); + void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); + int rpcrdma_bc_marshal_reply(struct rpc_rqst *); + void xprt_rdma_bc_free_rqst(struct rpc_rqst *); + void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); + #endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* Temporary NFS request map cache. Created in svc_rdma.c */ extern struct kmem_cache *svc_rdma_map_cachep; /* WR context cache. Created in svc_rdma.c */