Merge tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 10 Nov 2015 02:11:22 +0000 (18:11 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 10 Nov 2015 02:11:22 +0000 (18:11 -0800)
Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  New features:
   - RDMA client backchannel from Chuck
   - Support for NFSv4.2 file CLONE using the btrfs ioctl

  Bugfixes + cleanups:
   - Move socket data receive out of the bottom halves and into a
     workqueue
   - Refactor NFSv4 error handling so synchronous and asynchronous RPC
     handles errors identically.
   - Fix a panic when blocks or object layouts reads return a bad data
     length
   - Fix nfsroot so it can handle a 1024 byte long path.
   - Fix bad usage of page offset in bl_read_pagelist
   - Various NFSv4 callback cleanups+fixes
   - Fix GETATTR bitmap verification
   - Support hexadecimal number for sunrpc debug sysctl files"

* tag 'nfs-for-4.4-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (53 commits)
  Sunrpc: Supports hexadecimal number for sysctl files of sunrpc debug
  nfs: Fix GETATTR bitmap verification
  nfs: Remove unused xdr page offsets in getacl/setacl arguments
  fs/nfs: remove unnecessary new_valid_dev check
  SUNRPC: fix variable type
  NFS: Enable client side NFSv4.1 backchannel to use other transports
  pNFS/flexfiles: Add support for FF_FLAGS_NO_IO_THRU_MDS
  pNFS/flexfiles: When mirrored, retry failed reads by switching mirrors
  SUNRPC: Remove the TCP-only restriction in bc_svc_process()
  svcrdma: Add backward direction service for RPC/RDMA transport
  xprtrdma: Handle incoming backward direction RPC calls
  xprtrdma: Add support for sending backward direction RPC replies
  xprtrdma: Pre-allocate Work Requests for backchannel
  xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers
  SUNRPC: Abstract backchannel operations
  xprtrdma: Saving IRQs no longer needed for rb_lock
  xprtrdma: Remove reply tasklet
  xprtrdma: Use workqueue to process RPC/RDMA replies
  xprtrdma: Replace send and receive arrays
  xprtrdma: Refactor reply handler error handling
  ...

1  2 
fs/nfs/nfs4proc.c
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/frwr_ops.c
net/sunrpc/xprtrdma/svc_rdma_transport.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

diff --combined fs/nfs/nfs4proc.c
index 0e5ff69455c7beba96df05104594bcb3dee336a7,7ed8f2cd97f8711ce86817f7449c97acb7ff6a27..ff5bddc49a2a30449a63a6c2a32a2aadc6db84b5
@@@ -78,7 -78,6 +78,6 @@@ struct nfs4_opendata
  static int _nfs4_proc_open(struct nfs4_opendata *data);
  static int _nfs4_recover_proc_open(struct nfs4_opendata *data);
  static int nfs4_do_fsinfo(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *);
- static int nfs4_async_handle_error(struct rpc_task *, const struct nfs_server *, struct nfs4_state *, long *);
  static void nfs_fixup_referral_attributes(struct nfs_fattr *fattr);
  static int nfs4_proc_getattr(struct nfs_server *, struct nfs_fh *, struct nfs_fattr *, struct nfs4_label *label);
  static int _nfs4_proc_getattr(struct nfs_server *server, struct nfs_fh *fhandle, struct nfs_fattr *fattr, struct nfs4_label *label);
@@@ -239,6 -238,7 +238,7 @@@ const u32 nfs4_fsinfo_bitmap[3] = { FAT
                        FATTR4_WORD1_TIME_DELTA
                        | FATTR4_WORD1_FS_LAYOUT_TYPES,
                        FATTR4_WORD2_LAYOUT_BLKSIZE
+                       | FATTR4_WORD2_CLONE_BLKSIZE
  };
  
  const u32 nfs4_fs_locations_bitmap[3] = {
@@@ -344,13 -344,16 +344,16 @@@ static int nfs4_delay(struct rpc_clnt *
  /* This is the error handling routine for processes that are allowed
   * to sleep.
   */
- int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception)
+ static int nfs4_do_handle_exception(struct nfs_server *server,
+               int errorcode, struct nfs4_exception *exception)
  {
        struct nfs_client *clp = server->nfs_client;
        struct nfs4_state *state = exception->state;
        struct inode *inode = exception->inode;
        int ret = errorcode;
  
+       exception->delay = 0;
+       exception->recovering = 0;
        exception->retry = 0;
        switch(errorcode) {
                case 0:
                case -NFS4ERR_DELEG_REVOKED:
                case -NFS4ERR_ADMIN_REVOKED:
                case -NFS4ERR_BAD_STATEID:
-                       if (inode && nfs4_have_delegation(inode, FMODE_READ)) {
-                               nfs4_inode_return_delegation(inode);
-                               exception->retry = 1;
-                               return 0;
-                       }
+                       if (inode && nfs_async_inode_return_delegation(inode,
+                                               NULL) == 0)
+                               goto wait_on_recovery;
                        if (state == NULL)
                                break;
                        ret = nfs4_schedule_stateid_recovery(server, state);
                                ret = -EBUSY;
                                break;
                        }
-               case -NFS4ERR_GRACE:
                case -NFS4ERR_DELAY:
-                       ret = nfs4_delay(server->client, &exception->timeout);
-                       if (ret != 0)
-                               break;
+                       nfs_inc_server_stats(server, NFSIOS_DELAY);
+               case -NFS4ERR_GRACE:
+                       exception->delay = 1;
+                       return 0;
                case -NFS4ERR_RETRY_UNCACHED_REP:
                case -NFS4ERR_OLD_STATEID:
                        exception->retry = 1;
        /* We failed to handle the error */
        return nfs4_map_errors(ret);
  wait_on_recovery:
-       ret = nfs4_wait_clnt_recover(clp);
+       exception->recovering = 1;
+       return 0;
+ }
+ /* This is the error handling routine for processes that are allowed
+  * to sleep.
+  */
+ int nfs4_handle_exception(struct nfs_server *server, int errorcode, struct nfs4_exception *exception)
+ {
+       struct nfs_client *clp = server->nfs_client;
+       int ret;
+       ret = nfs4_do_handle_exception(server, errorcode, exception);
+       if (exception->delay) {
+               ret = nfs4_delay(server->client, &exception->timeout);
+               goto out_retry;
+       }
+       if (exception->recovering) {
+               ret = nfs4_wait_clnt_recover(clp);
+               if (test_bit(NFS_MIG_FAILED, &server->mig_status))
+                       return -EIO;
+               goto out_retry;
+       }
+       return ret;
+ out_retry:
+       if (ret == 0)
+               exception->retry = 1;
+       return ret;
+ }
+ static int
+ nfs4_async_handle_exception(struct rpc_task *task, struct nfs_server *server,
+               int errorcode, struct nfs4_exception *exception)
+ {
+       struct nfs_client *clp = server->nfs_client;
+       int ret;
+       ret = nfs4_do_handle_exception(server, errorcode, exception);
+       if (exception->delay) {
+               rpc_delay(task, nfs4_update_delay(&exception->timeout));
+               goto out_retry;
+       }
+       if (exception->recovering) {
+               rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
+               if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
+                       rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
+               goto out_retry;
+       }
        if (test_bit(NFS_MIG_FAILED, &server->mig_status))
-               return -EIO;
+               ret = -EIO;
+       return ret;
+ out_retry:
        if (ret == 0)
                exception->retry = 1;
        return ret;
  }
  
+ static int
+ nfs4_async_handle_error(struct rpc_task *task, struct nfs_server *server,
+                       struct nfs4_state *state, long *timeout)
+ {
+       struct nfs4_exception exception = {
+               .state = state,
+       };
+       if (task->tk_status >= 0)
+               return 0;
+       if (timeout)
+               exception.timeout = *timeout;
+       task->tk_status = nfs4_async_handle_exception(task, server,
+                       task->tk_status,
+                       &exception);
+       if (exception.delay && timeout)
+               *timeout = exception.timeout;
+       if (exception.retry)
+               return -EAGAIN;
+       return 0;
+ }
  /*
   * Return 'true' if 'clp' is using an rpc_client that is integrity protected
   * or 'false' otherwise.
@@@ -4530,7 -4603,7 +4603,7 @@@ static inline int nfs4_server_supports_
  #define NFS4ACL_MAXPAGES DIV_ROUND_UP(XATTR_SIZE_MAX, PAGE_SIZE)
  
  static int buf_to_pages_noslab(const void *buf, size_t buflen,
-               struct page **pages, unsigned int *pgbase)
+               struct page **pages)
  {
        struct page *newpage, **spages;
        int rc = 0;
@@@ -4674,7 -4747,6 +4747,6 @@@ static ssize_t __nfs4_get_acl_uncached(
                goto out_free;
  
        args.acl_len = npages * PAGE_SIZE;
-       args.acl_pgbase = 0;
  
        dprintk("%s  buf %p buflen %zu npages %d args.acl_len %zu\n",
                __func__, buf, buflen, npages, args.acl_len);
@@@ -4766,7 -4838,7 +4838,7 @@@ static int __nfs4_proc_set_acl(struct i
                return -EOPNOTSUPP;
        if (npages > ARRAY_SIZE(pages))
                return -ERANGE;
-       i = buf_to_pages_noslab(buf, buflen, arg.acl_pages, &arg.acl_pgbase);
+       i = buf_to_pages_noslab(buf, buflen, arg.acl_pages);
        if (i < 0)
                return i;
        nfs4_inode_return_delegation(inode);
@@@ -4955,79 -5027,6 +5027,6 @@@ out
  #endif        /* CONFIG_NFS_V4_SECURITY_LABEL */
  
  
- static int
- nfs4_async_handle_error(struct rpc_task *task, const struct nfs_server *server,
-                       struct nfs4_state *state, long *timeout)
- {
-       struct nfs_client *clp = server->nfs_client;
-       if (task->tk_status >= 0)
-               return 0;
-       switch(task->tk_status) {
-               case -NFS4ERR_DELEG_REVOKED:
-               case -NFS4ERR_ADMIN_REVOKED:
-               case -NFS4ERR_BAD_STATEID:
-               case -NFS4ERR_OPENMODE:
-                       if (state == NULL)
-                               break;
-                       if (nfs4_schedule_stateid_recovery(server, state) < 0)
-                               goto recovery_failed;
-                       goto wait_on_recovery;
-               case -NFS4ERR_EXPIRED:
-                       if (state != NULL) {
-                               if (nfs4_schedule_stateid_recovery(server, state) < 0)
-                                       goto recovery_failed;
-                       }
-               case -NFS4ERR_STALE_STATEID:
-               case -NFS4ERR_STALE_CLIENTID:
-                       nfs4_schedule_lease_recovery(clp);
-                       goto wait_on_recovery;
-               case -NFS4ERR_MOVED:
-                       if (nfs4_schedule_migration_recovery(server) < 0)
-                               goto recovery_failed;
-                       goto wait_on_recovery;
-               case -NFS4ERR_LEASE_MOVED:
-                       nfs4_schedule_lease_moved_recovery(clp);
-                       goto wait_on_recovery;
- #if defined(CONFIG_NFS_V4_1)
-               case -NFS4ERR_BADSESSION:
-               case -NFS4ERR_BADSLOT:
-               case -NFS4ERR_BAD_HIGH_SLOT:
-               case -NFS4ERR_DEADSESSION:
-               case -NFS4ERR_CONN_NOT_BOUND_TO_SESSION:
-               case -NFS4ERR_SEQ_FALSE_RETRY:
-               case -NFS4ERR_SEQ_MISORDERED:
-                       dprintk("%s ERROR %d, Reset session\n", __func__,
-                               task->tk_status);
-                       nfs4_schedule_session_recovery(clp->cl_session, task->tk_status);
-                       goto wait_on_recovery;
- #endif /* CONFIG_NFS_V4_1 */
-               case -NFS4ERR_DELAY:
-                       nfs_inc_server_stats(server, NFSIOS_DELAY);
-                       rpc_delay(task, nfs4_update_delay(timeout));
-                       goto restart_call;
-               case -NFS4ERR_GRACE:
-                       rpc_delay(task, NFS4_POLL_RETRY_MAX);
-               case -NFS4ERR_RETRY_UNCACHED_REP:
-               case -NFS4ERR_OLD_STATEID:
-                       goto restart_call;
-       }
-       task->tk_status = nfs4_map_errors(task->tk_status);
-       return 0;
- recovery_failed:
-       task->tk_status = -EIO;
-       return 0;
- wait_on_recovery:
-       rpc_sleep_on(&clp->cl_rpcwaitq, task, NULL);
-       if (test_bit(NFS4CLNT_MANAGER_RUNNING, &clp->cl_state) == 0)
-               rpc_wake_up_queued_task(&clp->cl_rpcwaitq, task);
-       if (test_bit(NFS_MIG_FAILED, &server->mig_status))
-               goto recovery_failed;
- restart_call:
-       task->tk_status = 0;
-       return -EAGAIN;
- }
  static void nfs4_init_boot_verifier(const struct nfs_client *clp,
                                    nfs4_verifier *bootverf)
  {
@@@ -5513,7 -5512,18 +5512,7 @@@ static int nfs4_proc_getlk(struct nfs4_
  
  static int do_vfs_lock(struct inode *inode, struct file_lock *fl)
  {
 -      int res = 0;
 -      switch (fl->fl_flags & (FL_POSIX|FL_FLOCK)) {
 -              case FL_POSIX:
 -                      res = posix_lock_inode_wait(inode, fl);
 -                      break;
 -              case FL_FLOCK:
 -                      res = flock_lock_inode_wait(inode, fl);
 -                      break;
 -              default:
 -                      BUG();
 -      }
 -      return res;
 +      return locks_lock_inode_wait(inode, fl);
  }
  
  struct nfs4_unlockdata {
        struct nfs4_lock_state *lsp;
        struct nfs_open_context *ctx;
        struct file_lock fl;
-       const struct nfs_server *server;
+       struct nfs_server *server;
        unsigned long timestamp;
  };
  
@@@ -8718,7 -8728,8 +8717,8 @@@ static const struct nfs4_minor_version_
                | NFS_CAP_ALLOCATE
                | NFS_CAP_DEALLOCATE
                | NFS_CAP_SEEK
-               | NFS_CAP_LAYOUTSTATS,
+               | NFS_CAP_LAYOUTSTATS
+               | NFS_CAP_CLONE,
        .init_client = nfs41_init_client,
        .shutdown_client = nfs41_shutdown_client,
        .match_stateid = nfs41_match_stateid,
index 1e4438ea2380dd07e79f3d8011445900edea7437,fb4013edcf5732cf3b777b5bc2ac29b0ac8d1bba..f869807a0d0e2ca93629a7d25092f268dbc8f520
@@@ -105,9 -105,11 +105,9 @@@ struct svc_rdma_chunk_sge 
  };
  struct svc_rdma_fastreg_mr {
        struct ib_mr *mr;
 -      void *kva;
 -      struct ib_fast_reg_page_list *page_list;
 -      int page_list_len;
 +      struct scatterlist *sg;
 +      int sg_nents;
        unsigned long access_flags;
 -      unsigned long map_len;
        enum dma_data_direction direction;
        struct list_head frmr_list;
  };
@@@ -226,9 -228,13 +226,13 @@@ extern void svc_rdma_put_frmr(struct sv
                              struct svc_rdma_fastreg_mr *);
  extern void svc_sq_reap(struct svcxprt_rdma *);
  extern void svc_rq_reap(struct svcxprt_rdma *);
- extern struct svc_xprt_class svc_rdma_class;
  extern void svc_rdma_prep_reply_hdr(struct svc_rqst *);
  
+ extern struct svc_xprt_class svc_rdma_class;
+ #ifdef CONFIG_SUNRPC_BACKCHANNEL
+ extern struct svc_xprt_class svc_rdma_bc_class;
+ #endif
  /* svc_rdma.c */
  extern int svc_rdma_init(void);
  extern void svc_rdma_cleanup(void);
index a1434447b0d6ae9a937a7d9d90c51660de8717b6,0a362397e434fe9a126bcd94a3f2bba99b89ea2f..88cf9e7269c2bd0d626bc64448254be73f602940
@@@ -151,13 -151,9 +151,13 @@@ __frwr_init(struct rpcrdma_mw *r, struc
        f->fr_mr = ib_alloc_mr(pd, IB_MR_TYPE_MEM_REG, depth);
        if (IS_ERR(f->fr_mr))
                goto out_mr_err;
 -      f->fr_pgl = ib_alloc_fast_reg_page_list(device, depth);
 -      if (IS_ERR(f->fr_pgl))
 +
 +      f->sg = kcalloc(depth, sizeof(*f->sg), GFP_KERNEL);
 +      if (!f->sg)
                goto out_list_err;
 +
 +      sg_init_table(f->sg, depth);
 +
        return 0;
  
  out_mr_err:
        return rc;
  
  out_list_err:
 -      rc = PTR_ERR(f->fr_pgl);
 -      dprintk("RPC:       %s: ib_alloc_fast_reg_page_list status %i\n",
 -              __func__, rc);
 +      rc = -ENOMEM;
 +      dprintk("RPC:       %s: sg allocation failure\n",
 +              __func__);
        ib_dereg_mr(f->fr_mr);
        return rc;
  }
@@@ -183,7 -179,7 +183,7 @@@ __frwr_release(struct rpcrdma_mw *r
        if (rc)
                dprintk("RPC:       %s: ib_dereg_mr status %i\n",
                        __func__, rc);
 -      ib_free_fast_reg_page_list(r->r.frmr.fr_pgl);
 +      kfree(r->r.frmr.sg);
  }
  
  static int
@@@ -256,8 -252,11 +256,11 @@@ frwr_sendcompletion(struct ib_wc *wc
  
        /* WARNING: Only wr_id and status are reliable at this point */
        r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-       pr_warn("RPC:       %s: frmr %p flushed, status %s (%d)\n",
-               __func__, r, ib_wc_status_msg(wc->status), wc->status);
+       if (wc->status == IB_WC_WR_FLUSH_ERR)
+               dprintk("RPC:       %s: frmr %p flushed\n", __func__, r);
+       else
+               pr_warn("RPC:       %s: frmr %p error, status %s (%d)\n",
+                       __func__, r, ib_wc_status_msg(wc->status), wc->status);
        r->r.frmr.fr_state = FRMR_IS_STALE;
  }
  
@@@ -316,10 -315,13 +319,10 @@@ frwr_op_map(struct rpcrdma_xprt *r_xprt
        struct rpcrdma_mw *mw;
        struct rpcrdma_frmr *frmr;
        struct ib_mr *mr;
 -      struct ib_send_wr fastreg_wr, *bad_wr;
 +      struct ib_reg_wr reg_wr;
 +      struct ib_send_wr *bad_wr;
 +      int rc, i, n, dma_nents;
        u8 key;
 -      int len, pageoff;
 -      int i, rc;
 -      int seg_len;
 -      u64 pa;
 -      int page_no;
  
        mw = seg1->rl_mw;
        seg1->rl_mw = NULL;
        } while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
        frmr = &mw->r.frmr;
        frmr->fr_state = FRMR_IS_VALID;
 +      mr = frmr->fr_mr;
  
 -      pageoff = offset_in_page(seg1->mr_offset);
 -      seg1->mr_offset -= pageoff;     /* start of page */
 -      seg1->mr_len += pageoff;
 -      len = -pageoff;
        if (nsegs > ia->ri_max_frmr_depth)
                nsegs = ia->ri_max_frmr_depth;
  
 -      for (page_no = i = 0; i < nsegs;) {
 -              rpcrdma_map_one(device, seg, direction);
 -              pa = seg->mr_dma;
 -              for (seg_len = seg->mr_len; seg_len > 0; seg_len -= PAGE_SIZE) {
 -                      frmr->fr_pgl->page_list[page_no++] = pa;
 -                      pa += PAGE_SIZE;
 -              }
 -              len += seg->mr_len;
 +      for (i = 0; i < nsegs;) {
 +              if (seg->mr_page)
 +                      sg_set_page(&frmr->sg[i],
 +                                  seg->mr_page,
 +                                  seg->mr_len,
 +                                  offset_in_page(seg->mr_offset));
 +              else
 +                      sg_set_buf(&frmr->sg[i], seg->mr_offset,
 +                                 seg->mr_len);
 +
                ++seg;
                ++i;
 +
                /* Check for holes */
                if ((i < nsegs && offset_in_page(seg->mr_offset)) ||
                    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
                        break;
        }
 -      dprintk("RPC:       %s: Using frmr %p to map %d segments (%d bytes)\n",
 -              __func__, mw, i, len);
 -
 -      memset(&fastreg_wr, 0, sizeof(fastreg_wr));
 -      fastreg_wr.wr_id = (unsigned long)(void *)mw;
 -      fastreg_wr.opcode = IB_WR_FAST_REG_MR;
 -      fastreg_wr.wr.fast_reg.iova_start = seg1->mr_dma + pageoff;
 -      fastreg_wr.wr.fast_reg.page_list = frmr->fr_pgl;
 -      fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
 -      fastreg_wr.wr.fast_reg.page_list_len = page_no;
 -      fastreg_wr.wr.fast_reg.length = len;
 -      fastreg_wr.wr.fast_reg.access_flags = writing ?
 -                              IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 -                              IB_ACCESS_REMOTE_READ;
 -      mr = frmr->fr_mr;
 +      frmr->sg_nents = i;
 +
 +      dma_nents = ib_dma_map_sg(device, frmr->sg, frmr->sg_nents, direction);
 +      if (!dma_nents) {
 +              pr_err("RPC:       %s: failed to dma map sg %p sg_nents %u\n",
 +                     __func__, frmr->sg, frmr->sg_nents);
 +              return -ENOMEM;
 +      }
 +
 +      n = ib_map_mr_sg(mr, frmr->sg, frmr->sg_nents, PAGE_SIZE);
 +      if (unlikely(n != frmr->sg_nents)) {
 +              pr_err("RPC:       %s: failed to map mr %p (%u/%u)\n",
 +                     __func__, frmr->fr_mr, n, frmr->sg_nents);
 +              rc = n < 0 ? n : -EINVAL;
 +              goto out_senderr;
 +      }
 +
 +      dprintk("RPC:       %s: Using frmr %p to map %u segments (%u bytes)\n",
 +              __func__, mw, frmr->sg_nents, mr->length);
 +
        key = (u8)(mr->rkey & 0x000000FF);
        ib_update_fast_reg_key(mr, ++key);
 -      fastreg_wr.wr.fast_reg.rkey = mr->rkey;
 +
 +      reg_wr.wr.next = NULL;
 +      reg_wr.wr.opcode = IB_WR_REG_MR;
 +      reg_wr.wr.wr_id = (uintptr_t)mw;
 +      reg_wr.wr.num_sge = 0;
 +      reg_wr.wr.send_flags = 0;
 +      reg_wr.mr = mr;
 +      reg_wr.key = mr->rkey;
 +      reg_wr.access = writing ?
 +                      IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 +                      IB_ACCESS_REMOTE_READ;
  
        DECR_CQCOUNT(&r_xprt->rx_ep);
 -      rc = ib_post_send(ia->ri_id->qp, &fastreg_wr, &bad_wr);
 +      rc = ib_post_send(ia->ri_id->qp, &reg_wr.wr, &bad_wr);
        if (rc)
                goto out_senderr;
  
 +      seg1->mr_dir = direction;
        seg1->rl_mw = mw;
        seg1->mr_rkey = mr->rkey;
 -      seg1->mr_base = seg1->mr_dma + pageoff;
 -      seg1->mr_nsegs = i;
 -      seg1->mr_len = len;
 -      return i;
 +      seg1->mr_base = mr->iova;
 +      seg1->mr_nsegs = frmr->sg_nents;
 +      seg1->mr_len = mr->length;
 +
 +      return frmr->sg_nents;
  
  out_senderr:
        dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
 -      while (i--)
 -              rpcrdma_unmap_one(device, --seg);
 +      ib_dma_unmap_sg(device, frmr->sg, dma_nents, direction);
        __frwr_queue_recovery(mw);
        return rc;
  }
@@@ -419,22 -405,22 +422,22 @@@ frwr_op_unmap(struct rpcrdma_xprt *r_xp
        struct rpcrdma_mr_seg *seg1 = seg;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
        struct rpcrdma_mw *mw = seg1->rl_mw;
 +      struct rpcrdma_frmr *frmr = &mw->r.frmr;
        struct ib_send_wr invalidate_wr, *bad_wr;
        int rc, nsegs = seg->mr_nsegs;
  
        dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
  
        seg1->rl_mw = NULL;
 -      mw->r.frmr.fr_state = FRMR_IS_INVALID;
 +      frmr->fr_state = FRMR_IS_INVALID;
  
        memset(&invalidate_wr, 0, sizeof(invalidate_wr));
        invalidate_wr.wr_id = (unsigned long)(void *)mw;
        invalidate_wr.opcode = IB_WR_LOCAL_INV;
 -      invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey;
 +      invalidate_wr.ex.invalidate_rkey = frmr->fr_mr->rkey;
        DECR_CQCOUNT(&r_xprt->rx_ep);
  
 -      while (seg1->mr_nsegs--)
 -              rpcrdma_unmap_one(ia->ri_device, seg++);
 +      ib_dma_unmap_sg(ia->ri_device, frmr->sg, frmr->sg_nents, seg1->mr_dir);
        read_lock(&ia->ri_qplock);
        rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
        read_unlock(&ia->ri_qplock);
index a266e870d870e8b2cf0d938c3009460b46fddc7b,a133b1e5b5f62a0543a7ab0c2e2fb362b726a5bd..b348b4adef29a48246709cc7f32cf576865753eb
@@@ -56,6 -56,7 +56,7 @@@
  
  #define RPCDBG_FACILITY       RPCDBG_SVCXPRT
  
+ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *, int);
  static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
                                        struct net *net,
                                        struct sockaddr *sa, int salen,
@@@ -95,6 -96,63 +96,63 @@@ struct svc_xprt_class svc_rdma_class = 
        .xcl_ident = XPRT_TRANSPORT_RDMA,
  };
  
+ #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *, struct net *,
+                                          struct sockaddr *, int, int);
+ static void svc_rdma_bc_detach(struct svc_xprt *);
+ static void svc_rdma_bc_free(struct svc_xprt *);
+ static struct svc_xprt_ops svc_rdma_bc_ops = {
+       .xpo_create = svc_rdma_bc_create,
+       .xpo_detach = svc_rdma_bc_detach,
+       .xpo_free = svc_rdma_bc_free,
+       .xpo_prep_reply_hdr = svc_rdma_prep_reply_hdr,
+       .xpo_secure_port = svc_rdma_secure_port,
+ };
+ struct svc_xprt_class svc_rdma_bc_class = {
+       .xcl_name = "rdma-bc",
+       .xcl_owner = THIS_MODULE,
+       .xcl_ops = &svc_rdma_bc_ops,
+       .xcl_max_payload = (1024 - RPCRDMA_HDRLEN_MIN)
+ };
+ static struct svc_xprt *svc_rdma_bc_create(struct svc_serv *serv,
+                                          struct net *net,
+                                          struct sockaddr *sa, int salen,
+                                          int flags)
+ {
+       struct svcxprt_rdma *cma_xprt;
+       struct svc_xprt *xprt;
+       cma_xprt = rdma_create_xprt(serv, 0);
+       if (!cma_xprt)
+               return ERR_PTR(-ENOMEM);
+       xprt = &cma_xprt->sc_xprt;
+       svc_xprt_init(net, &svc_rdma_bc_class, xprt, serv);
+       serv->sv_bc_xprt = xprt;
+       dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+       return xprt;
+ }
+ static void svc_rdma_bc_detach(struct svc_xprt *xprt)
+ {
+       dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+ }
+ static void svc_rdma_bc_free(struct svc_xprt *xprt)
+ {
+       struct svcxprt_rdma *rdma =
+               container_of(xprt, struct svcxprt_rdma, sc_xprt);
+       dprintk("svcrdma: %s(%p)\n", __func__, xprt);
+       if (xprt)
+               kfree(rdma);
+ }
+ #endif        /* CONFIG_SUNRPC_BACKCHANNEL */
  struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
  {
        struct svc_rdma_op_ctxt *ctxt;
@@@ -692,8 -750,8 +750,8 @@@ static struct svc_xprt *svc_rdma_create
        if (!cma_xprt)
                return ERR_PTR(-ENOMEM);
  
 -      listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP,
 -                                 IB_QPT_RC);
 +      listen_id = rdma_create_id(&init_net, rdma_listen_handler, cma_xprt,
 +                                 RDMA_PS_TCP, IB_QPT_RC);
        if (IS_ERR(listen_id)) {
                ret = PTR_ERR(listen_id);
                dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
  static struct svc_rdma_fastreg_mr *rdma_alloc_frmr(struct svcxprt_rdma *xprt)
  {
        struct ib_mr *mr;
 -      struct ib_fast_reg_page_list *pl;
 +      struct scatterlist *sg;
        struct svc_rdma_fastreg_mr *frmr;
        u32 num_sg;
  
        if (IS_ERR(mr))
                goto err_free_frmr;
  
 -      pl = ib_alloc_fast_reg_page_list(xprt->sc_cm_id->device,
 -                                       num_sg);
 -      if (IS_ERR(pl))
 +      sg = kcalloc(RPCSVC_MAXPAGES, sizeof(*sg), GFP_KERNEL);
 +      if (!sg)
                goto err_free_mr;
  
 +      sg_init_table(sg, RPCSVC_MAXPAGES);
 +
        frmr->mr = mr;
 -      frmr->page_list = pl;
 +      frmr->sg = sg;
        INIT_LIST_HEAD(&frmr->frmr_list);
        return frmr;
  
@@@ -772,8 -829,8 +830,8 @@@ static void rdma_dealloc_frmr_q(struct 
                frmr = list_entry(xprt->sc_frmr_q.next,
                                  struct svc_rdma_fastreg_mr, frmr_list);
                list_del_init(&frmr->frmr_list);
 +              kfree(frmr->sg);
                ib_dereg_mr(frmr->mr);
 -              ib_free_fast_reg_page_list(frmr->page_list);
                kfree(frmr);
        }
  }
@@@ -787,7 -844,8 +845,7 @@@ struct svc_rdma_fastreg_mr *svc_rdma_ge
                frmr = list_entry(rdma->sc_frmr_q.next,
                                  struct svc_rdma_fastreg_mr, frmr_list);
                list_del_init(&frmr->frmr_list);
 -              frmr->map_len = 0;
 -              frmr->page_list_len = 0;
 +              frmr->sg_nents = 0;
        }
        spin_unlock_bh(&rdma->sc_frmr_q_lock);
        if (frmr)
        return rdma_alloc_frmr(rdma);
  }
  
 -static void frmr_unmap_dma(struct svcxprt_rdma *xprt,
 -                         struct svc_rdma_fastreg_mr *frmr)
 -{
 -      int page_no;
 -      for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
 -              dma_addr_t addr = frmr->page_list->page_list[page_no];
 -              if (ib_dma_mapping_error(frmr->mr->device, addr))
 -                      continue;
 -              atomic_dec(&xprt->sc_dma_used);
 -              ib_dma_unmap_page(frmr->mr->device, addr, PAGE_SIZE,
 -                                frmr->direction);
 -      }
 -}
 -
  void svc_rdma_put_frmr(struct svcxprt_rdma *rdma,
                       struct svc_rdma_fastreg_mr *frmr)
  {
        if (frmr) {
 -              frmr_unmap_dma(rdma, frmr);
 +              ib_dma_unmap_sg(rdma->sc_cm_id->device,
 +                              frmr->sg, frmr->sg_nents, frmr->direction);
 +              atomic_dec(&rdma->sc_dma_used);
                spin_lock_bh(&rdma->sc_frmr_q_lock);
                WARN_ON_ONCE(!list_empty(&frmr->frmr_list));
                list_add(&frmr->frmr_list, &rdma->sc_frmr_q);
index f63369bd01c54b9d7df2a15511c1fa066289a91f,93883ffb86e0d0e6244f33d1558d11ac0f1d134b..eadd1655145a3bc5b81bdefb7015792fb3be566a
   * internal functions
   */
  
- /*
-  * handle replies in tasklet context, using a single, global list
-  * rdma tasklet function -- just turn around and call the func
-  * for all replies on the list
-  */
- static DEFINE_SPINLOCK(rpcrdma_tk_lock_g);
- static LIST_HEAD(rpcrdma_tasklets_g);
+ static struct workqueue_struct *rpcrdma_receive_wq;
  
- static void
- rpcrdma_run_tasklet(unsigned long data)
+ int
+ rpcrdma_alloc_wq(void)
  {
-       struct rpcrdma_rep *rep;
-       unsigned long flags;
-       data = data;
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       while (!list_empty(&rpcrdma_tasklets_g)) {
-               rep = list_entry(rpcrdma_tasklets_g.next,
-                                struct rpcrdma_rep, rr_list);
-               list_del(&rep->rr_list);
-               spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+       struct workqueue_struct *recv_wq;
  
-               rpcrdma_reply_handler(rep);
+       recv_wq = alloc_workqueue("xprtrdma_receive",
+                                 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+                                 0);
+       if (!recv_wq)
+               return -ENOMEM;
  
-               spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       }
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
+       rpcrdma_receive_wq = recv_wq;
+       return 0;
  }
  
- static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
- static void
- rpcrdma_schedule_tasklet(struct list_head *sched_list)
+ void
+ rpcrdma_destroy_wq(void)
  {
-       unsigned long flags;
+       struct workqueue_struct *wq;
  
-       spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
-       list_splice_tail(sched_list, &rpcrdma_tasklets_g);
-       spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
-       tasklet_schedule(&rpcrdma_tasklet_g);
+       if (rpcrdma_receive_wq) {
+               wq = rpcrdma_receive_wq;
+               rpcrdma_receive_wq = NULL;
+               destroy_workqueue(wq);
+       }
  }
  
  static void
@@@ -158,63 -144,54 +144,54 @@@ rpcrdma_sendcq_process_wc(struct ib_wc 
        }
  }
  
- static int
- rpcrdma_sendcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+ /* The common case is a single send completion is waiting. By
+  * passing two WC entries to ib_poll_cq, a return code of 1
+  * means there is exactly one WC waiting and no more. We don't
+  * have to invoke ib_poll_cq again to know that the CQ has been
+  * properly drained.
+  */
+ static void
+ rpcrdma_sendcq_poll(struct ib_cq *cq)
  {
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[2];
+       int count, rc;
  
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_send_wcs;
+               pos = wcs;
  
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       return rc;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
  
                count = rc;
                while (count-- > 0)
-                       rpcrdma_sendcq_process_wc(wcs++);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       return 0;
+                       rpcrdma_sendcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
+       return;
  }
  
- /*
-  * Handle send, fast_reg_mr, and local_inv completions.
-  *
-  * Send events are typically suppressed and thus do not result
-  * in an upcall. Occasionally one is signaled, however. This
-  * prevents the provider's completion queue from wrapping and
-  * losing a completion.
+ /* Handle provider send completion upcalls.
   */
  static void
  rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
  {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-       rc = rpcrdma_sendcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+       do {
+               rpcrdma_sendcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
+ }
  
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
+ static void
+ rpcrdma_receive_worker(struct work_struct *work)
+ {
+       struct rpcrdma_rep *rep =
+                       container_of(work, struct rpcrdma_rep, rr_work);
  
-       rpcrdma_sendcq_poll(cq, ep);
+       rpcrdma_reply_handler(rep);
  }
  
  static void
- rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+ rpcrdma_recvcq_process_wc(struct ib_wc *wc)
  {
        struct rpcrdma_rep *rep =
                        (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
        prefetch(rdmab_to_msg(rep->rr_rdmabuf));
  
  out_schedule:
-       list_add_tail(&rep->rr_list, sched_list);
+       queue_work(rpcrdma_receive_wq, &rep->rr_work);
        return;
  out_fail:
        if (wc->status != IB_WC_WR_FLUSH_ERR)
                pr_err("RPC:       %s: rep %p: %s\n",
                       __func__, rep, ib_wc_status_msg(wc->status));
-       rep->rr_len = ~0U;
+       rep->rr_len = RPCRDMA_BAD_LEN;
        goto out_schedule;
  }
  
- static int
- rpcrdma_recvcq_poll(struct ib_cq *cq, struct rpcrdma_ep *ep)
+ /* The wc array is on stack: automatic memory is always CPU-local.
+  *
+  * struct ib_wc is 64 bytes, making the poll array potentially
+  * large. But this is at the bottom of the call chain. Further
+  * substantial work is done in another thread.
+  */
+ static void
+ rpcrdma_recvcq_poll(struct ib_cq *cq)
  {
-       struct list_head sched_list;
-       struct ib_wc *wcs;
-       int budget, count, rc;
+       struct ib_wc *pos, wcs[4];
+       int count, rc;
  
-       INIT_LIST_HEAD(&sched_list);
-       budget = RPCRDMA_WC_BUDGET / RPCRDMA_POLLSIZE;
        do {
-               wcs = ep->rep_recv_wcs;
+               pos = wcs;
  
-               rc = ib_poll_cq(cq, RPCRDMA_POLLSIZE, wcs);
-               if (rc <= 0)
-                       goto out_schedule;
+               rc = ib_poll_cq(cq, ARRAY_SIZE(wcs), pos);
+               if (rc < 0)
+                       break;
  
                count = rc;
                while (count-- > 0)
-                       rpcrdma_recvcq_process_wc(wcs++, &sched_list);
-       } while (rc == RPCRDMA_POLLSIZE && --budget);
-       rc = 0;
- out_schedule:
-       rpcrdma_schedule_tasklet(&sched_list);
-       return rc;
+                       rpcrdma_recvcq_process_wc(pos++);
+       } while (rc == ARRAY_SIZE(wcs));
  }
  
- /*
-  * Handle receive completions.
-  *
-  * It is reentrant but processes single events in order to maintain
-  * ordering of receives to keep server credits.
-  *
-  * It is the responsibility of the scheduled tasklet to return
-  * recv buffers to the pool. NOTE: this affects synchronization of
-  * connection shutdown. That is, the structures required for
-  * the completion of the reply handler must remain intact until
-  * all memory has been reclaimed.
+ /* Handle provider receive completion upcalls.
   */
  static void
  rpcrdma_recvcq_upcall(struct ib_cq *cq, void *cq_context)
  {
-       struct rpcrdma_ep *ep = (struct rpcrdma_ep *)cq_context;
-       int rc;
-       rc = rpcrdma_recvcq_poll(cq, ep);
-       if (rc) {
-               dprintk("RPC:       %s: ib_poll_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-       rc = ib_req_notify_cq(cq,
-                       IB_CQ_NEXT_COMP | IB_CQ_REPORT_MISSED_EVENTS);
-       if (rc == 0)
-               return;
-       if (rc < 0) {
-               dprintk("RPC:       %s: ib_req_notify_cq failed: %i\n",
-                       __func__, rc);
-               return;
-       }
-       rpcrdma_recvcq_poll(cq, ep);
+       do {
+               rpcrdma_recvcq_poll(cq);
+       } while (ib_req_notify_cq(cq, IB_CQ_NEXT_COMP |
+                                 IB_CQ_REPORT_MISSED_EVENTS) > 0);
  }
  
  static void
  rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
  {
        struct ib_wc wc;
-       LIST_HEAD(sched_list);
  
        while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-               rpcrdma_recvcq_process_wc(&wc, &sched_list);
-       if (!list_empty(&sched_list))
-               rpcrdma_schedule_tasklet(&sched_list);
+               rpcrdma_recvcq_process_wc(&wc);
        while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
                rpcrdma_sendcq_process_wc(&wc);
  }
@@@ -432,8 -378,7 +378,8 @@@ rpcrdma_create_id(struct rpcrdma_xprt *
  
        init_completion(&ia->ri_done);
  
 -      id = rdma_create_id(rpcrdma_conn_upcall, xprt, RDMA_PS_TCP, IB_QPT_RC);
 +      id = rdma_create_id(&init_net, rpcrdma_conn_upcall, xprt, RDMA_PS_TCP,
 +                          IB_QPT_RC);
        if (IS_ERR(id)) {
                rc = PTR_ERR(id);
                dprintk("RPC:       %s: rdma_create_id() failed %i\n",
@@@ -623,6 -568,7 +569,7 @@@ rpcrdma_ep_create(struct rpcrdma_ep *ep
        struct ib_device_attr *devattr = &ia->ri_devattr;
        struct ib_cq *sendcq, *recvcq;
        struct ib_cq_init_attr cq_attr = {};
+       unsigned int max_qp_wr;
        int rc, err;
  
        if (devattr->max_sge < RPCRDMA_MAX_IOVS) {
                return -ENOMEM;
        }
  
+       if (devattr->max_qp_wr <= RPCRDMA_BACKWARD_WRS) {
+               dprintk("RPC:       %s: insufficient wqe's available\n",
+                       __func__);
+               return -ENOMEM;
+       }
+       max_qp_wr = devattr->max_qp_wr - RPCRDMA_BACKWARD_WRS;
        /* check provider's send/recv wr limits */
-       if (cdata->max_requests > devattr->max_qp_wr)
-               cdata->max_requests = devattr->max_qp_wr;
+       if (cdata->max_requests > max_qp_wr)
+               cdata->max_requests = max_qp_wr;
  
        ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
        ep->rep_attr.qp_context = ep;
        ep->rep_attr.srq = NULL;
        ep->rep_attr.cap.max_send_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_send_wr += RPCRDMA_BACKWARD_WRS;
        rc = ia->ri_ops->ro_open(ia, ep, cdata);
        if (rc)
                return rc;
        ep->rep_attr.cap.max_recv_wr = cdata->max_requests;
+       ep->rep_attr.cap.max_recv_wr += RPCRDMA_BACKWARD_WRS;
        ep->rep_attr.cap.max_send_sge = RPCRDMA_MAX_IOVS;
        ep->rep_attr.cap.max_recv_sge = 1;
        ep->rep_attr.cap.max_inline_data = 0;
  
        cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
        sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
-                             rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(sendcq)) {
                rc = PTR_ERR(sendcq);
                dprintk("RPC:       %s: failed to create send CQ: %i\n",
  
        cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
        recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
-                             rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+                             rpcrdma_cq_async_error_upcall, NULL, &cq_attr);
        if (IS_ERR(recvcq)) {
                rc = PTR_ERR(recvcq);
                dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@@ -886,7 -841,21 +842,21 @@@ retry
                }
                rc = ep->rep_connected;
        } else {
+               struct rpcrdma_xprt *r_xprt;
+               unsigned int extras;
                dprintk("RPC:       %s: connected\n", __func__);
+               r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+               extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+               if (extras) {
+                       rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+                       if (rc)
+                               pr_warn("%s: rpcrdma_ep_post_extra_recv: %i\n",
+                                       __func__, rc);
+                               rc = 0;
+               }
        }
  
  out:
@@@ -923,20 -892,25 +893,25 @@@ rpcrdma_ep_disconnect(struct rpcrdma_e
        }
  }
  
- static struct rpcrdma_req *
+ struct rpcrdma_req *
  rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
  {
+       struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
  
        req = kzalloc(sizeof(*req), GFP_KERNEL);
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
  
+       INIT_LIST_HEAD(&req->rl_free);
+       spin_lock(&buffer->rb_reqslock);
+       list_add(&req->rl_all, &buffer->rb_allreqs);
+       spin_unlock(&buffer->rb_reqslock);
        req->rl_buffer = &r_xprt->rx_buf;
        return req;
  }
  
- static struct rpcrdma_rep *
+ struct rpcrdma_rep *
  rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
  {
        struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
  
        rep->rr_device = ia->ri_device;
        rep->rr_rxprt = r_xprt;
+       INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
        return rep;
  
  out_free:
@@@ -971,44 -946,21 +947,21 @@@ rpcrdma_buffer_create(struct rpcrdma_xp
  {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-       struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
-       char *p;
-       size_t len;
        int i, rc;
  
-       buf->rb_max_requests = cdata->max_requests;
+       buf->rb_max_requests = r_xprt->rx_data.max_requests;
+       buf->rb_bc_srv_max_requests = 0;
        spin_lock_init(&buf->rb_lock);
  
-       /* Need to allocate:
-        *   1.  arrays for send and recv pointers
-        *   2.  arrays of struct rpcrdma_req to fill in pointers
-        *   3.  array of struct rpcrdma_rep for replies
-        * Send/recv buffers in req/rep need to be registered
-        */
-       len = buf->rb_max_requests *
-               (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-       p = kzalloc(len, GFP_KERNEL);
-       if (p == NULL) {
-               dprintk("RPC:       %s: req_t/rep_t/pad kzalloc(%zd) failed\n",
-                       __func__, len);
-               rc = -ENOMEM;
-               goto out;
-       }
-       buf->rb_pool = p;       /* for freeing it later */
-       buf->rb_send_bufs = (struct rpcrdma_req **) p;
-       p = (char *) &buf->rb_send_bufs[buf->rb_max_requests];
-       buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
-       p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
        rc = ia->ri_ops->ro_init(r_xprt);
        if (rc)
                goto out;
  
+       INIT_LIST_HEAD(&buf->rb_send_bufs);
+       INIT_LIST_HEAD(&buf->rb_allreqs);
+       spin_lock_init(&buf->rb_reqslock);
        for (i = 0; i < buf->rb_max_requests; i++) {
                struct rpcrdma_req *req;
-               struct rpcrdma_rep *rep;
  
                req = rpcrdma_create_req(r_xprt);
                if (IS_ERR(req)) {
                        rc = PTR_ERR(req);
                        goto out;
                }
-               buf->rb_send_bufs[i] = req;
+               req->rl_backchannel = false;
+               list_add(&req->rl_free, &buf->rb_send_bufs);
+       }
+       INIT_LIST_HEAD(&buf->rb_recv_bufs);
+       for (i = 0; i < buf->rb_max_requests + 2; i++) {
+               struct rpcrdma_rep *rep;
  
                rep = rpcrdma_create_rep(r_xprt);
                if (IS_ERR(rep)) {
                        rc = PTR_ERR(rep);
                        goto out;
                }
-               buf->rb_recv_bufs[i] = rep;
+               list_add(&rep->rr_list, &buf->rb_recv_bufs);
        }
  
        return 0;
@@@ -1035,22 -993,38 +994,38 @@@ out
        return rc;
  }
  
+ static struct rpcrdma_req *
+ rpcrdma_buffer_get_req_locked(struct rpcrdma_buffer *buf)
+ {
+       struct rpcrdma_req *req;
+       req = list_first_entry(&buf->rb_send_bufs,
+                              struct rpcrdma_req, rl_free);
+       list_del(&req->rl_free);
+       return req;
+ }
+ static struct rpcrdma_rep *
+ rpcrdma_buffer_get_rep_locked(struct rpcrdma_buffer *buf)
+ {
+       struct rpcrdma_rep *rep;
+       rep = list_first_entry(&buf->rb_recv_bufs,
+                              struct rpcrdma_rep, rr_list);
+       list_del(&rep->rr_list);
+       return rep;
+ }
  static void
  rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
  {
-       if (!rep)
-               return;
        rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
        kfree(rep);
  }
  
static void
+ void
  rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
  {
-       if (!req)
-               return;
        rpcrdma_free_regbuf(ia, req->rl_sendbuf);
        rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
        kfree(req);
  rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
  {
        struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-       int i;
  
-       /* clean up in reverse order from create
-        *   1.  recv mr memory (mr free, then kfree)
-        *   2.  send mr memory (mr free, then kfree)
-        *   3.  MWs
-        */
-       dprintk("RPC:       %s: entering\n", __func__);
+       while (!list_empty(&buf->rb_recv_bufs)) {
+               struct rpcrdma_rep *rep;
  
-       for (i = 0; i < buf->rb_max_requests; i++) {
-               if (buf->rb_recv_bufs)
-                       rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
-               if (buf->rb_send_bufs)
-                       rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
+               rep = rpcrdma_buffer_get_rep_locked(buf);
+               rpcrdma_destroy_rep(ia, rep);
        }
  
-       ia->ri_ops->ro_destroy(buf);
+       spin_lock(&buf->rb_reqslock);
+       while (!list_empty(&buf->rb_allreqs)) {
+               struct rpcrdma_req *req;
+               req = list_first_entry(&buf->rb_allreqs,
+                                      struct rpcrdma_req, rl_all);
+               list_del(&req->rl_all);
+               spin_unlock(&buf->rb_reqslock);
+               rpcrdma_destroy_req(ia, req);
+               spin_lock(&buf->rb_reqslock);
+       }
+       spin_unlock(&buf->rb_reqslock);
  
-       kfree(buf->rb_pool);
+       ia->ri_ops->ro_destroy(buf);
  }
  
  struct rpcrdma_mw *
@@@ -1110,53 -1088,34 +1089,34 @@@ rpcrdma_put_mw(struct rpcrdma_xprt *r_x
        spin_unlock(&buf->rb_mwlock);
  }
  
- static void
- rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
- {
-       buf->rb_send_bufs[--buf->rb_send_index] = req;
-       req->rl_niovs = 0;
-       if (req->rl_reply) {
-               buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-               req->rl_reply = NULL;
-       }
- }
  /*
   * Get a set of request/reply buffers.
   *
-  * Reply buffer (if needed) is attached to send buffer upon return.
-  * Rule:
-  *    rb_send_index and rb_recv_index MUST always be pointing to the
-  *    *next* available buffer (non-NULL). They are incremented after
-  *    removing buffers, and decremented *before* returning them.
+  * Reply buffer (if available) is attached to send buffer upon return.
   */
  struct rpcrdma_req *
  rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  {
        struct rpcrdma_req *req;
-       unsigned long flags;
-       spin_lock_irqsave(&buffers->rb_lock, flags);
  
-       if (buffers->rb_send_index == buffers->rb_max_requests) {
-               spin_unlock_irqrestore(&buffers->rb_lock, flags);
-               dprintk("RPC:       %s: out of request buffers\n", __func__);
-               return ((struct rpcrdma_req *)NULL);
-       }
-       req = buffers->rb_send_bufs[buffers->rb_send_index];
-       if (buffers->rb_send_index < buffers->rb_recv_index) {
-               dprintk("RPC:       %s: %d extra receives outstanding (ok)\n",
-                       __func__,
-                       buffers->rb_recv_index - buffers->rb_send_index);
-               req->rl_reply = NULL;
-       } else {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
+       spin_lock(&buffers->rb_lock);
+       if (list_empty(&buffers->rb_send_bufs))
+               goto out_reqbuf;
+       req = rpcrdma_buffer_get_req_locked(buffers);
+       if (list_empty(&buffers->rb_recv_bufs))
+               goto out_repbuf;
+       req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
+       return req;
  
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+ out_reqbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of request buffers\n", __func__);
+       return NULL;
+ out_repbuf:
+       spin_unlock(&buffers->rb_lock);
+       pr_warn("RPC:       %s: out of reply buffers\n", __func__);
+       req->rl_reply = NULL;
        return req;
  }
  
  rpcrdma_buffer_put(struct rpcrdma_req *req)
  {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       unsigned long flags;
+       struct rpcrdma_rep *rep = req->rl_reply;
  
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       rpcrdma_buffer_put_sendbuf(req, buffers);
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       req->rl_niovs = 0;
+       req->rl_reply = NULL;
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&req->rl_free, &buffers->rb_send_bufs);
+       if (rep)
+               list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
  }
  
  /*
   * Recover reply buffers from pool.
-  * This happens when recovering from error conditions.
-  * Post-increment counter/array index.
+  * This happens when recovering from disconnect.
   */
  void
  rpcrdma_recv_buffer_get(struct rpcrdma_req *req)
  {
        struct rpcrdma_buffer *buffers = req->rl_buffer;
-       unsigned long flags;
  
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       if (buffers->rb_recv_index < buffers->rb_max_requests) {
-               req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
-               buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL;
-       }
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       if (!list_empty(&buffers->rb_recv_bufs))
+               req->rl_reply = rpcrdma_buffer_get_rep_locked(buffers);
+       spin_unlock(&buffers->rb_lock);
  }
  
  /*
  rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
  {
        struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
-       unsigned long flags;
  
-       spin_lock_irqsave(&buffers->rb_lock, flags);
-       buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
-       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       spin_lock(&buffers->rb_lock);
+       list_add_tail(&rep->rr_list, &buffers->rb_recv_bufs);
+       spin_unlock(&buffers->rb_lock);
  }
  
  /*
@@@ -1363,6 -1322,47 +1323,47 @@@ rpcrdma_ep_post_recv(struct rpcrdma_ia 
        return rc;
  }
  
+ /**
+  * rpcrdma_ep_post_extra_recv - Post buffers for incoming backchannel requests
+  * @r_xprt: transport associated with these backchannel resources
+  * @min_reqs: minimum number of incoming requests expected
+  *
+  * Returns zero if all requested buffers were posted, or a negative errno.
+  */
+ int
+ rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+ {
+       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+       struct rpcrdma_rep *rep;
+       unsigned long flags;
+       int rc;
+       while (count--) {
+               spin_lock_irqsave(&buffers->rb_lock, flags);
+               if (list_empty(&buffers->rb_recv_bufs))
+                       goto out_reqbuf;
+               rep = rpcrdma_buffer_get_rep_locked(buffers);
+               spin_unlock_irqrestore(&buffers->rb_lock, flags);
+               rc = rpcrdma_ep_post_recv(ia, ep, rep);
+               if (rc)
+                       goto out_rc;
+       }
+       return 0;
+ out_reqbuf:
+       spin_unlock_irqrestore(&buffers->rb_lock, flags);
+       pr_warn("%s: no extra receive buffers\n", __func__);
+       return -ENOMEM;
+ out_rc:
+       rpcrdma_recv_buffer_put(rep);
+       return rc;
+ }
  /* How many chunk list items fit within our inline buffers?
   */
  unsigned int
index c82abf44e39db9954cd7e27fab2595b9552c9668,f8dd17be9f43cefe89470cd2c9c9954f76a93ca0..ac7f8d4f632a9e923fdcf3fdbd8f628ad044d34b
@@@ -77,9 -77,6 +77,6 @@@ struct rpcrdma_ia 
   * RDMA Endpoint -- one per transport instance
   */
  
- #define RPCRDMA_WC_BUDGET     (128)
- #define RPCRDMA_POLLSIZE      (16)
  struct rpcrdma_ep {
        atomic_t                rep_cqcount;
        int                     rep_cqinit;
@@@ -89,8 -86,6 +86,6 @@@
        struct rdma_conn_param  rep_remote_cma;
        struct sockaddr_storage rep_remote_addr;
        struct delayed_work     rep_connect_worker;
-       struct ib_wc            rep_send_wcs[RPCRDMA_POLLSIZE];
-       struct ib_wc            rep_recv_wcs[RPCRDMA_POLLSIZE];
  };
  
  /*
   */
  #define RPCRDMA_IGNORE_COMPLETION     (0ULL)
  
+ /* Pre-allocate extra Work Requests for handling backward receives
+  * and sends. This is a fixed value because the Work Queues are
+  * allocated when the forward channel is set up.
+  */
+ #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ #define RPCRDMA_BACKWARD_WRS          (8)
+ #else
+ #define RPCRDMA_BACKWARD_WRS          (0)
+ #endif
  /* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
   *
   * The below structure appears at the front of a large region of kmalloc'd
@@@ -169,10 -174,13 +174,13 @@@ struct rpcrdma_rep 
        unsigned int            rr_len;
        struct ib_device        *rr_device;
        struct rpcrdma_xprt     *rr_rxprt;
+       struct work_struct      rr_work;
        struct list_head        rr_list;
        struct rpcrdma_regbuf   *rr_rdmabuf;
  };
  
+ #define RPCRDMA_BAD_LEN               (~0U)
  /*
   * struct rpcrdma_mw - external memory region metadata
   *
@@@ -193,8 -201,7 +201,8 @@@ enum rpcrdma_frmr_state 
  };
  
  struct rpcrdma_frmr {
 -      struct ib_fast_reg_page_list    *fr_pgl;
 +      struct scatterlist              *sg;
 +      int                             sg_nents;
        struct ib_mr                    *fr_mr;
        enum rpcrdma_frmr_state         fr_state;
        struct work_struct              fr_work;
@@@ -256,6 -263,7 +264,7 @@@ struct rpcrdma_mr_seg {            /* chunk descr
  #define RPCRDMA_MAX_IOVS      (2)
  
  struct rpcrdma_req {
+       struct list_head        rl_free;
        unsigned int            rl_niovs;
        unsigned int            rl_nchunks;
        unsigned int            rl_connect_cookie;
        struct rpcrdma_regbuf   *rl_rdmabuf;
        struct rpcrdma_regbuf   *rl_sendbuf;
        struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
+       struct list_head        rl_all;
+       bool                    rl_backchannel;
  };
  
  static inline struct rpcrdma_req *
@@@ -289,12 -300,14 +301,14 @@@ struct rpcrdma_buffer 
        struct list_head        rb_all;
        char                    *rb_pool;
  
-       spinlock_t              rb_lock;        /* protect buf arrays */
+       spinlock_t              rb_lock;        /* protect buf lists */
+       struct list_head        rb_send_bufs;
+       struct list_head        rb_recv_bufs;
        u32                     rb_max_requests;
-       int                     rb_send_index;
-       int                     rb_recv_index;
-       struct rpcrdma_req      **rb_send_bufs;
-       struct rpcrdma_rep      **rb_recv_bufs;
+       u32                     rb_bc_srv_max_requests;
+       spinlock_t              rb_reqslock;    /* protect rb_allreqs */
+       struct list_head        rb_allreqs;
  };
  #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
  
@@@ -340,6 -353,7 +354,7 @@@ struct rpcrdma_stats 
        unsigned long           failed_marshal_count;
        unsigned long           bad_reply_count;
        unsigned long           nomsg_call_count;
+       unsigned long           bcall_count;
  };
  
  /*
@@@ -415,6 -429,9 +430,9 @@@ int rpcrdma_ep_post_recv(struct rpcrdma
  /*
   * Buffer calls - xprtrdma/verbs.c
   */
+ struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+ struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+ void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
  int rpcrdma_buffer_create(struct rpcrdma_xprt *);
  void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
  
@@@ -431,10 -448,14 +449,14 @@@ void rpcrdma_free_regbuf(struct rpcrdma
                         struct rpcrdma_regbuf *);
  
  unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+ int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
  
  int frwr_alloc_recovery_wq(void);
  void frwr_destroy_recovery_wq(void);
  
+ int rpcrdma_alloc_wq(void);
+ void rpcrdma_destroy_wq(void);
  /*
   * Wrappers for chunk registration, shared by read/write chunk code.
   */
@@@ -495,6 -516,18 +517,18 @@@ int rpcrdma_marshal_req(struct rpc_rqs
  int xprt_rdma_init(void);
  void xprt_rdma_cleanup(void);
  
+ /* Backchannel calls - xprtrdma/backchannel.c
+  */
+ #if defined(CONFIG_SUNRPC_BACKCHANNEL)
+ int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+ int xprt_rdma_bc_up(struct svc_serv *, struct net *);
+ int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+ void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
+ int rpcrdma_bc_marshal_reply(struct rpc_rqst *);
+ void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+ void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+ #endif        /* CONFIG_SUNRPC_BACKCHANNEL */
  /* Temporary NFS request map cache. Created in svc_rdma.c  */
  extern struct kmem_cache *svc_rdma_map_cachep;
  /* WR context cache. Created in svc_rdma.c  */