Merge tag 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/dledford/rdma
[firefly-linux-kernel-4.4.55.git] / net / rds / ib_cm.c
index f5a98068faf07ddd6df72647dd9bd34876adc6d2..da5a7fb98c77abf0c43f0c4825657874eda89ba3 100644 (file)
@@ -216,6 +216,96 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
                 event->event, ib_event_msg(event->event), data);
 }
 
+/* Plucking the oldest entry from the ring can be done concurrently with
+ * the thread refilling the ring.  Each ring operation is protected by
+ * spinlocks and the transient state of refilling doesn't change the
+ * recording of which entry is oldest.
+ *
+ * This relies on IB only calling one cq comp_handler for each cq so that
+ * there will only be one caller of rds_recv_incoming() per RDS connection.
+ */
+static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_recv_tasklet);
+}
+
+static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
+                   struct ib_wc *wcs,
+                   struct rds_ib_ack_state *ack_state)
+{
+       int nr;
+       int i;
+       struct ib_wc *wc;
+
+       while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
+               for (i = 0; i < nr; i++) {
+                       wc = wcs + i;
+                       rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
+                                (unsigned long long)wc->wr_id, wc->status,
+                                wc->byte_len, be32_to_cpu(wc->ex.imm_data));
+
+                       if (wc->wr_id & RDS_IB_SEND_OP)
+                               rds_ib_send_cqe_handler(ic, wc);
+                       else
+                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
+               }
+       }
+}
+
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state state;
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+       ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+       if (rds_conn_up(conn) &&
+           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued)))
+               rds_send_xmit(ic->conn);
+}
+
+static void rds_ib_tasklet_fn_recv(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
+       struct rds_ib_ack_state state;
+
+       if (!rds_ibdev)
+               rds_conn_drop(conn);
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+       ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
+       poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
+
+       if (state.ack_next_valid)
+               rds_ib_set_ack(ic, state.ack_next, state.ack_required);
+       if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
+               rds_send_drop_acked(conn, state.ack_recv, NULL);
+               ic->i_ack_recv = state.ack_recv;
+       }
+
+       if (rds_conn_up(conn))
+               rds_ib_attempt_ack(ic);
+}
+
 static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
 {
        struct rds_connection *conn = data;
@@ -238,6 +328,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
        }
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -271,7 +373,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        ic->i_pd = rds_ibdev->pd;
 
        cq_attr.cqe = ic->i_send_ring.w_nr + 1;
-       ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+       ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                     rds_ib_cq_event_handler, conn,
                                     &cq_attr);
        if (IS_ERR(ic->i_send_cq)) {
@@ -282,7 +385,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        }
 
        cq_attr.cqe = ic->i_recv_ring.w_nr;
-       ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler,
+       ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
                                     rds_ib_cq_event_handler, conn,
                                     &cq_attr);
        if (IS_ERR(ic->i_recv_cq)) {
@@ -637,6 +740,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                wait_event(rds_ib_ring_empty_wait,
                           rds_ib_ring_empty(&ic->i_recv_ring) &&
                           (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_send_tasklet);
                tasklet_kill(&ic->i_recv_tasklet);
 
                /* first destroy the ib state that generates callbacks */
@@ -743,8 +847,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        }
 
        INIT_LIST_HEAD(&ic->ib_node);
-       tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn,
-                    (unsigned long) ic);
+       tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+                    (unsigned long)ic);
+       tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
+                    (unsigned long)ic);
        mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);