drbd: Move list of epochs from mdev to tconn
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
index ff75cce21f402ba4e86ad9625271b2c6e576306e..8b99f4e28ccc2cdc5ff03df07aad7853fda73eaa 100644 (file)
@@ -120,7 +120,6 @@ module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0
  */
 struct idr minors;
 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
-DEFINE_MUTEX(drbd_cfg_mutex);
 
 struct kmem_cache *drbd_request_cache;
 struct kmem_cache *drbd_ee_cache;      /* peer requests */
@@ -216,6 +215,7 @@ static int tl_init(struct drbd_tconn *tconn)
        tconn->oldest_tle = b;
        tconn->newest_tle = b;
        INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
+       INIT_LIST_HEAD(&tconn->barrier_acked_requests);
 
        return 1;
 }
@@ -316,11 +316,11 @@ void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
           These have been list_move'd to the out_of_sequence_requests list in
           _req_mod(, BARRIER_ACKED) above.
           */
-       list_del_init(&b->requests);
+       list_splice_init(&b->requests, &tconn->barrier_acked_requests);
        mdev = b->w.mdev;
 
        nob = b->next;
-       if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
+       if (test_and_clear_bit(CREATE_BARRIER, &tconn->flags)) {
                _tl_add_barrier(tconn, b);
                if (nob)
                        tconn->oldest_tle = nob;
@@ -368,8 +368,10 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
                        req = list_entry(le, struct drbd_request, tl_requests);
                        rv = _req_mod(req, what);
 
-                       n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
-                       n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
+                       if (rv & MR_WRITE)
+                               n_writes++;
+                       if (rv & MR_READ)
+                               n_reads++;
                }
                tmp = b->next;
 
@@ -379,7 +381,7 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
                                if (b->w.cb == NULL) {
                                        b->w.cb = w_send_barrier;
                                        inc_ap_pending(b->w.mdev);
-                                       set_bit(CREATE_BARRIER, &b->w.mdev->flags);
+                                       set_bit(CREATE_BARRIER, &tconn->flags);
                                }
 
                                drbd_queue_work(&tconn->data.work, &b->w);
@@ -418,8 +420,23 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
                b = tmp;
                list_splice(&carry_reads, &b->requests);
        }
-}
 
+       /* Actions operating on the disk state, also want to work on
+          requests that got barrier acked. */
+       switch (what) {
+       case FAIL_FROZEN_DISK_IO:
+       case RESTART_FROZEN_DISK_IO:
+               list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
+                       req = list_entry(le, struct drbd_request, tl_requests);
+                       _req_mod(req, what);
+               }
+       case CONNECTION_LOST_WHILE_PENDING:
+       case RESEND:
+               break;
+       default:
+               conn_err(tconn, "what = %d in _tl_restart()\n", what);
+       }
+}
 
 /**
  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
@@ -431,10 +448,8 @@ void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
  */
 void tl_clear(struct drbd_tconn *tconn)
 {
-       struct drbd_conf *mdev;
        struct list_head *le, *tle;
        struct drbd_request *r;
-       int vnr;
 
        spin_lock_irq(&tconn->req_lock);
 
@@ -453,8 +468,7 @@ void tl_clear(struct drbd_tconn *tconn)
        }
 
        /* ensure bit indicating barrier is required is clear */
-       idr_for_each_entry(&tconn->volumes, mdev, vnr)
-               clear_bit(CREATE_BARRIER, &mdev->flags);
+       clear_bit(CREATE_BARRIER, &tconn->flags);
 
        spin_unlock_irq(&tconn->req_lock);
 }
@@ -466,6 +480,41 @@ void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
        spin_unlock_irq(&tconn->req_lock);
 }
 
+/**
+ * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
+ * @mdev:      DRBD device.
+ */
+void tl_abort_disk_io(struct drbd_conf *mdev)
+{
+       struct drbd_tconn *tconn = mdev->tconn;
+       struct drbd_tl_epoch *b;
+       struct list_head *le, *tle;
+       struct drbd_request *req;
+
+       spin_lock_irq(&tconn->req_lock);
+       b = tconn->oldest_tle;
+       while (b) {
+               list_for_each_safe(le, tle, &b->requests) {
+                       req = list_entry(le, struct drbd_request, tl_requests);
+                       if (!(req->rq_state & RQ_LOCAL_PENDING))
+                               continue;
+                       if (req->w.mdev == mdev)
+                               _req_mod(req, ABORT_DISK_IO);
+               }
+               b = b->next;
+       }
+
+       list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
+               req = list_entry(le, struct drbd_request, tl_requests);
+               if (!(req->rq_state & RQ_LOCAL_PENDING))
+                       continue;
+               if (req->w.mdev == mdev)
+                       _req_mod(req, ABORT_DISK_IO);
+       }
+
+       spin_unlock_irq(&tconn->req_lock);
+}
+
 static int drbd_thread_setup(void *arg)
 {
        struct drbd_thread *thi = (struct drbd_thread *) arg;
@@ -501,12 +550,14 @@ restart:
        thi->task = NULL;
        thi->t_state = NONE;
        smp_mb();
-       complete(&thi->stop);
+       complete_all(&thi->stop);
        spin_unlock_irqrestore(&thi->t_lock, flags);
 
        conn_info(tconn, "Terminating %s\n", current->comm);
 
        /* Release mod reference taken when thread was started */
+
+       kref_put(&tconn->kref, &conn_destroy);
        module_put(THIS_MODULE);
        return retval;
 }
@@ -544,6 +595,8 @@ int drbd_thread_start(struct drbd_thread *thi)
                        return false;
                }
 
+               kref_get(&thi->tconn->kref);
+
                init_completion(&thi->stop);
                thi->reset_cpu_mask = 1;
                thi->t_state = RUNNING;
@@ -556,6 +609,7 @@ int drbd_thread_start(struct drbd_thread *thi)
                if (IS_ERR(nt)) {
                        conn_err(tconn, "Couldn't start thread\n");
 
+                       kref_put(&tconn->kref, &conn_destroy);
                        module_put(THIS_MODULE);
                        return false;
                }
@@ -634,13 +688,15 @@ char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *tas
 
 int conn_lowest_minor(struct drbd_tconn *tconn)
 {
-       int vnr = 0;
        struct drbd_conf *mdev;
+       int vnr = 0, m;
 
+       rcu_read_lock();
        mdev = idr_get_next(&tconn->volumes, &vnr);
-       if (!mdev)
-               return -1;
-       return mdev_to_minor(mdev);
+       m = mdev ? mdev_to_minor(mdev) : -1;
+       rcu_read_unlock();
+
+       return m;
 }
 
 #ifdef CONFIG_SMP
@@ -689,204 +745,305 @@ void drbd_thread_current_set_cpu(struct drbd_thread *thi)
 }
 #endif
 
-static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
+/**
+ * drbd_header_size  -  size of a packet header
+ *
+ * The header size is a multiple of 8, so any payload following the header is
+ * word aligned on 64-bit architectures.  (The bitmap send and receive code
+ * relies on this.)
+ */
+unsigned int drbd_header_size(struct drbd_tconn *tconn)
+{
+       if (tconn->agreed_pro_version >= 100) {
+               BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
+               return sizeof(struct p_header100);
+       } else {
+               BUILD_BUG_ON(sizeof(struct p_header80) !=
+                            sizeof(struct p_header95));
+               BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
+               return sizeof(struct p_header80);
+       }
+}
+
+static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
 {
        h->magic   = cpu_to_be32(DRBD_MAGIC);
        h->command = cpu_to_be16(cmd);
        h->length  = cpu_to_be16(size);
+       return sizeof(struct p_header80);
 }
 
-static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
+static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
 {
        h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
        h->command = cpu_to_be16(cmd);
-       h->length  = cpu_to_be32(size);
+       h->length = cpu_to_be32(size);
+       return sizeof(struct p_header95);
+}
+
+static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
+                                     int size, int vnr)
+{
+       h->magic = cpu_to_be32(DRBD_MAGIC_100);
+       h->volume = cpu_to_be16(vnr);
+       h->command = cpu_to_be16(cmd);
+       h->length = cpu_to_be32(size);
+       h->pad = 0;
+       return sizeof(struct p_header100);
 }
 
-static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
-                           enum drbd_packet cmd, int size)
+static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
+                                  void *buffer, enum drbd_packet cmd, int size)
 {
-       if (tconn->agreed_pro_version >= 100 || size > DRBD_MAX_SIZE_H80_PACKET)
-               prepare_header95(&h->h95, cmd, size);
+       if (tconn->agreed_pro_version >= 100)
+               return prepare_header100(buffer, cmd, size, vnr);
+       else if (tconn->agreed_pro_version >= 95 &&
+                size > DRBD_MAX_SIZE_H80_PACKET)
+               return prepare_header95(buffer, cmd, size);
        else
-               prepare_header80(&h->h80, cmd, size);
+               return prepare_header80(buffer, cmd, size);
+}
+
+static void *__conn_prepare_command(struct drbd_tconn *tconn,
+                                   struct drbd_socket *sock)
+{
+       if (!sock->socket)
+               return NULL;
+       return sock->sbuf + drbd_header_size(tconn);
+}
+
+void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
+{
+       void *p;
+
+       mutex_lock(&sock->mutex);
+       p = __conn_prepare_command(tconn, sock);
+       if (!p)
+               mutex_unlock(&sock->mutex);
+
+       return p;
 }
 
-static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
-                          enum drbd_packet cmd, int size)
+void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
 {
-       _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
+       return conn_prepare_command(mdev->tconn, sock);
 }
 
-/* the appropriate socket mutex must be held already */
-int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
-                  enum drbd_packet cmd, struct p_header *h, size_t size,
-                  unsigned msg_flags)
+static int __send_command(struct drbd_tconn *tconn, int vnr,
+                         struct drbd_socket *sock, enum drbd_packet cmd,
+                         unsigned int header_size, void *data,
+                         unsigned int size)
 {
+       int msg_flags;
        int err;
 
-       _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
-       err = drbd_send_all(tconn, sock, h, size, msg_flags);
-       if (err && !signal_pending(current))
-               conn_warn(tconn, "short send %s size=%d\n",
-                         cmdname(cmd), (int)size);
+       /*
+        * Called with @data == NULL and the size of the data blocks in @size
+        * for commands that send data blocks.  For those commands, omit the
+        * MSG_MORE flag: this will increase the likelihood that data blocks
+        * which are page aligned on the sender will end up page aligned on the
+        * receiver.
+        */
+       msg_flags = data ? MSG_MORE : 0;
+
+       header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
+                                     header_size + size);
+       err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
+                           msg_flags);
+       if (data && !err)
+               err = drbd_send_all(tconn, sock->socket, data, size, 0);
        return err;
 }
 
-/* don't pass the socket. we may only look at it
- * when we hold the appropriate socket mutex.
- */
-int conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
-                 enum drbd_packet cmd, struct p_header *h, size_t size)
+static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
+                              enum drbd_packet cmd, unsigned int header_size,
+                              void *data, unsigned int size)
 {
-       int err = -EIO;
+       return __send_command(tconn, 0, sock, cmd, header_size, data, size);
+}
 
-       mutex_lock(&sock->mutex);
-       if (sock->socket)
-               err = _conn_send_cmd(tconn, vnr, sock->socket, cmd, h, size, 0);
+int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
+                     enum drbd_packet cmd, unsigned int header_size,
+                     void *data, unsigned int size)
+{
+       int err;
+
+       err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
        mutex_unlock(&sock->mutex);
        return err;
 }
 
-int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
-                  size_t size)
+int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
+                     enum drbd_packet cmd, unsigned int header_size,
+                     void *data, unsigned int size)
 {
-       struct p_header80 h;
        int err;
 
-       prepare_header80(&h, cmd, size);
-       err = drbd_get_data_sock(tconn);
-       if (!err) {
-               err = drbd_send_all(tconn, tconn->data.socket, &h, sizeof(h), 0);
-               if (!err)
-                       err = drbd_send_all(tconn, tconn->data.socket, data, size, 0);
-               drbd_put_data_sock(tconn);
-       }
+       err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
+                            data, size);
+       mutex_unlock(&sock->mutex);
        return err;
 }
 
+int drbd_send_ping(struct drbd_tconn *tconn)
+{
+       struct drbd_socket *sock;
+
+       sock = &tconn->meta;
+       if (!conn_prepare_command(tconn, sock))
+               return -EIO;
+       return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
+}
+
+int drbd_send_ping_ack(struct drbd_tconn *tconn)
+{
+       struct drbd_socket *sock;
+
+       sock = &tconn->meta;
+       if (!conn_prepare_command(tconn, sock))
+               return -EIO;
+       return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
+}
+
 int drbd_send_sync_param(struct drbd_conf *mdev)
 {
+       struct drbd_socket *sock;
        struct p_rs_param_95 *p;
-       struct socket *sock;
-       int size, err;
+       int size;
        const int apv = mdev->tconn->agreed_pro_version;
+       enum drbd_packet cmd;
+       struct net_conf *nc;
+       struct disk_conf *dc;
+
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+
+       rcu_read_lock();
+       nc = rcu_dereference(mdev->tconn->net_conf);
 
        size = apv <= 87 ? sizeof(struct p_rs_param)
                : apv == 88 ? sizeof(struct p_rs_param)
-                       + strlen(mdev->tconn->net_conf->verify_alg) + 1
+                       + strlen(nc->verify_alg) + 1
                : apv <= 94 ? sizeof(struct p_rs_param_89)
                : /* apv >= 95 */ sizeof(struct p_rs_param_95);
 
-       /* used from admin command context and receiver/worker context.
-        * to avoid kmalloc, grab the socket right here,
-        * then use the pre-allocated sbuf there */
-       mutex_lock(&mdev->tconn->data.mutex);
-       sock = mdev->tconn->data.socket;
-
-       if (likely(sock != NULL)) {
-               enum drbd_packet cmd =
-                       apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
-
-               p = &mdev->tconn->data.sbuf.rs_param_95;
-
-               /* initialize verify_alg and csums_alg */
-               memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
-
-               if (get_ldev(mdev)) {
-                       p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
-                       p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
-                       p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
-                       p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
-                       p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
-                       put_ldev(mdev);
-               } else {
-                       p->rate = cpu_to_be32(DRBD_RATE_DEF);
-                       p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
-                       p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
-                       p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
-                       p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
-               }
+       cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
 
-               if (apv >= 88)
-                       strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
-               if (apv >= 89)
-                       strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
+       /* initialize verify_alg and csums_alg */
+       memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
 
-               err = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
-       } else
-               err = -EIO;
+       if (get_ldev(mdev)) {
+               dc = rcu_dereference(mdev->ldev->disk_conf);
+               p->resync_rate = cpu_to_be32(dc->resync_rate);
+               p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
+               p->c_delay_target = cpu_to_be32(dc->c_delay_target);
+               p->c_fill_target = cpu_to_be32(dc->c_fill_target);
+               p->c_max_rate = cpu_to_be32(dc->c_max_rate);
+               put_ldev(mdev);
+       } else {
+               p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
+               p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
+               p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
+               p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
+               p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
+       }
 
-       mutex_unlock(&mdev->tconn->data.mutex);
+       if (apv >= 88)
+               strcpy(p->verify_alg, nc->verify_alg);
+       if (apv >= 89)
+               strcpy(p->csums_alg, nc->csums_alg);
+       rcu_read_unlock();
 
-       return err;
+       return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
 }
 
-int drbd_send_protocol(struct drbd_tconn *tconn)
+int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
 {
+       struct drbd_socket *sock;
        struct p_protocol *p;
-       int size, cf, err;
+       struct net_conf *nc;
+       int size, cf;
 
-       size = sizeof(struct p_protocol);
+       sock = &tconn->data;
+       p = __conn_prepare_command(tconn, sock);
+       if (!p)
+               return -EIO;
 
-       if (tconn->agreed_pro_version >= 87)
-               size += strlen(tconn->net_conf->integrity_alg) + 1;
+       rcu_read_lock();
+       nc = rcu_dereference(tconn->net_conf);
 
-       /* we must not recurse into our own queue,
-        * as that is blocked during handshake */
-       p = kmalloc(size, GFP_NOIO);
-       if (p == NULL)
-               return -ENOMEM;
+       if (nc->tentative && tconn->agreed_pro_version < 92) {
+               rcu_read_unlock();
+               mutex_unlock(&sock->mutex);
+               conn_err(tconn, "--dry-run is not supported by peer");
+               return -EOPNOTSUPP;
+       }
 
-       p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
-       p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
-       p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
-       p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
-       p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
+       size = sizeof(*p);
+       if (tconn->agreed_pro_version >= 87)
+               size += strlen(nc->integrity_alg) + 1;
 
+       p->protocol      = cpu_to_be32(nc->wire_protocol);
+       p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
+       p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
+       p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
+       p->two_primaries = cpu_to_be32(nc->two_primaries);
        cf = 0;
-       if (tconn->net_conf->want_lose)
-               cf |= CF_WANT_LOSE;
-       if (tconn->net_conf->dry_run) {
-               if (tconn->agreed_pro_version >= 92)
-                       cf |= CF_DRY_RUN;
-               else {
-                       conn_err(tconn, "--dry-run is not supported by peer");
-                       kfree(p);
-                       return -EOPNOTSUPP;
-               }
-       }
+       if (nc->discard_my_data)
+               cf |= CF_DISCARD_MY_DATA;
+       if (nc->tentative)
+               cf |= CF_DRY_RUN;
        p->conn_flags    = cpu_to_be32(cf);
 
        if (tconn->agreed_pro_version >= 87)
-               strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
+               strcpy(p->integrity_alg, nc->integrity_alg);
+       rcu_read_unlock();
+
+       return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
+}
+
+int drbd_send_protocol(struct drbd_tconn *tconn)
+{
+       int err;
+
+       mutex_lock(&tconn->data.mutex);
+       err = __drbd_send_protocol(tconn, P_PROTOCOL);
+       mutex_unlock(&tconn->data.mutex);
 
-       err = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
-       kfree(p);
        return err;
 }
 
 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
 {
-       struct p_uuids p;
+       struct drbd_socket *sock;
+       struct p_uuids *p;
        int i;
 
        if (!get_ldev_if_state(mdev, D_NEGOTIATING))
                return 0;
 
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p) {
+               put_ldev(mdev);
+               return -EIO;
+       }
        for (i = UI_CURRENT; i < UI_SIZE; i++)
-               p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
+               p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
 
        mdev->comm_bm_set = drbd_bm_total_weight(mdev);
-       p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
-       uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
+       p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
+       rcu_read_lock();
+       uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
+       rcu_read_unlock();
        uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
        uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
-       p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
+       p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
 
        put_ldev(mdev);
-
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_UUIDS, &p.head, sizeof(p));
+       return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
 }
 
 int drbd_send_uuids(struct drbd_conf *mdev)
@@ -919,30 +1076,42 @@ void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
 
 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
 {
-       struct p_rs_uuid p;
+       struct drbd_socket *sock;
+       struct p_rs_uuid *p;
        u64 uuid;
 
        D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
 
-       uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
+       uuid = mdev->ldev->md.uuid[UI_BITMAP];
+       if (uuid && uuid != UUID_JUST_CREATED)
+               uuid = uuid + UUID_NEW_BM_OFFSET;
+       else
+               get_random_bytes(&uuid, sizeof(u64));
        drbd_uuid_set(mdev, UI_BITMAP, uuid);
        drbd_print_uuids(mdev, "updated sync UUID");
        drbd_md_sync(mdev);
-       p.uuid = cpu_to_be64(uuid);
 
-       drbd_send_cmd(mdev, &mdev->tconn->data, P_SYNC_UUID, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (p) {
+               p->uuid = cpu_to_be64(uuid);
+               drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
+       }
 }
 
 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
 {
-       struct p_sizes p;
+       struct drbd_socket *sock;
+       struct p_sizes *p;
        sector_t d_size, u_size;
        int q_order_type, max_bio_size;
 
        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
                D_ASSERT(mdev->ldev->backing_bdev);
                d_size = drbd_get_max_capacity(mdev->ldev);
-               u_size = mdev->ldev->dc.disk_size;
+               rcu_read_lock();
+               u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
+               rcu_read_unlock();
                q_order_type = drbd_queue_order_type(mdev);
                max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
                max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
@@ -954,72 +1123,143 @@ int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags fl
                max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
        }
 
-       p.d_size = cpu_to_be64(d_size);
-       p.u_size = cpu_to_be64(u_size);
-       p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
-       p.max_bio_size = cpu_to_be32(max_bio_size);
-       p.queue_order_type = cpu_to_be16(q_order_type);
-       p.dds_flags = cpu_to_be16(flags);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+
+       if (mdev->tconn->agreed_pro_version <= 94)
+               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
+       else if (mdev->tconn->agreed_pro_version < 100)
+               max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_SIZES, &p.head, sizeof(p));
+       p->d_size = cpu_to_be64(d_size);
+       p->u_size = cpu_to_be64(u_size);
+       p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
+       p->max_bio_size = cpu_to_be32(max_bio_size);
+       p->queue_order_type = cpu_to_be16(q_order_type);
+       p->dds_flags = cpu_to_be16(flags);
+       return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
 }
 
 /**
- * drbd_send_state() - Sends the drbd state to the peer
+ * drbd_send_current_state() - Sends the drbd state to the peer
  * @mdev:      DRBD device.
  */
-int drbd_send_state(struct drbd_conf *mdev)
+int drbd_send_current_state(struct drbd_conf *mdev)
 {
-       struct socket *sock;
-       struct p_state p;
-       int err = -EIO;
+       struct drbd_socket *sock;
+       struct p_state *p;
 
-       mutex_lock(&mdev->tconn->data.mutex);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
+       return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
+}
 
-       p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
-       sock = mdev->tconn->data.socket;
+/**
+ * drbd_send_state() - After a state change, sends the new state to the peer
+ * @mdev:      DRBD device.
+ * @state:     the state to send, not necessarily the current state.
+ *
+ * Each state change queues an "after_state_ch" work, which will eventually
+ * send the resulting new state to the peer. If more state changes happen
+ * between queuing and processing of the after_state_ch work, we still
+ * want to send each intermediary state in the order it occurred.
+ */
+int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
+{
+       struct drbd_socket *sock;
+       struct p_state *p;
 
-       if (likely(sock != NULL))
-               err = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->state = cpu_to_be32(state.i); /* Within the send mutex */
+       return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
+}
 
-       mutex_unlock(&mdev->tconn->data.mutex);
+int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
+{
+       struct drbd_socket *sock;
+       struct p_req_state *p;
 
-       return err;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->mask = cpu_to_be32(mask.i);
+       p->val = cpu_to_be32(val.i);
+       return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
 }
 
-int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
-                        union drbd_state mask, union drbd_state val)
+int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
 {
-       struct p_req_state p;
+       enum drbd_packet cmd;
+       struct drbd_socket *sock;
+       struct p_req_state *p;
 
-       p.mask    = cpu_to_be32(mask.i);
-       p.val     = cpu_to_be32(val.i);
-
-       return conn_send_cmd(tconn, vnr, &tconn->data, cmd, &p.head, sizeof(p));
+       cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
+       sock = &tconn->data;
+       p = conn_prepare_command(tconn, sock);
+       if (!p)
+               return -EIO;
+       p->mask = cpu_to_be32(mask.i);
+       p->val = cpu_to_be32(val.i);
+       return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
 {
-       struct p_req_state_reply p;
+       struct drbd_socket *sock;
+       struct p_req_state_reply *p;
 
-       p.retcode    = cpu_to_be32(retcode);
-
-       drbd_send_cmd(mdev, &mdev->tconn->meta, P_STATE_CHG_REPLY, &p.head, sizeof(p));
+       sock = &mdev->tconn->meta;
+       p = drbd_prepare_command(mdev, sock);
+       if (p) {
+               p->retcode = cpu_to_be32(retcode);
+               drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
+       }
 }
 
-int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
+void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
 {
-       struct p_req_state_reply p;
+       struct drbd_socket *sock;
+       struct p_req_state_reply *p;
        enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
 
-       p.retcode    = cpu_to_be32(retcode);
+       sock = &tconn->meta;
+       p = conn_prepare_command(tconn, sock);
+       if (p) {
+               p->retcode = cpu_to_be32(retcode);
+               conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
+       }
+}
+
+static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
+{
+       BUG_ON(code & ~0xf);
+       p->encoding = (p->encoding & ~0xf) | code;
+}
+
+static void dcbp_set_start(struct p_compressed_bm *p, int set)
+{
+       p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
+}
 
-       return !conn_send_cmd(tconn, 0, &tconn->meta, cmd, &p.head, sizeof(p));
+static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
+{
+       BUG_ON(n & ~0x7);
+       p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
 }
 
 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
-       struct p_compressed_bm *p,
-       struct bm_xfer_ctx *c)
+                        struct p_compressed_bm *p,
+                        unsigned int size,
+                        struct bm_xfer_ctx *c)
 {
        struct bitstream bs;
        unsigned long plain_bits;
@@ -1027,19 +1267,21 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
        unsigned long rl;
        unsigned len;
        unsigned toggle;
-       int bits;
+       int bits, use_rle;
 
        /* may we use this feature? */
-       if ((mdev->tconn->net_conf->use_rle == 0) ||
-               (mdev->tconn->agreed_pro_version < 90))
-                       return 0;
+       rcu_read_lock();
+       use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
+       rcu_read_unlock();
+       if (!use_rle || mdev->tconn->agreed_pro_version < 90)
+               return 0;
 
        if (c->bit_offset >= c->bm_bits)
                return 0; /* nothing to do. */
 
        /* use at most thus many bytes */
-       bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
-       memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
+       bitstream_init(&bs, p->code, size, 0);
+       memset(p->code, 0, size);
        /* plain bits covered in this code string */
        plain_bits = 0;
 
@@ -1061,12 +1303,12 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
                        if (rl == 0) {
                                /* the first checked bit was set,
                                 * store start value, */
-                               DCBP_set_start(p, 1);
+                               dcbp_set_start(p, 1);
                                /* but skip encoding of zero run length */
                                toggle = !toggle;
                                continue;
                        }
-                       DCBP_set_start(p, 0);
+                       dcbp_set_start(p, 0);
                }
 
                /* paranoia: catch zero runlength.
@@ -1106,7 +1348,7 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
        bm_xfer_ctx_bit_to_word_offset(c);
 
        /* store pad_bits */
-       DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
+       dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
 
        return len;
 }
@@ -1118,48 +1360,52 @@ int fill_bitmap_rle_bits(struct drbd_conf *mdev,
  * code upon failure.
  */
 static int
-send_bitmap_rle_or_plain(struct drbd_conf *mdev,
-                        struct p_header *h, struct bm_xfer_ctx *c)
+send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
 {
-       struct p_compressed_bm *p = (void*)h;
-       unsigned long num_words;
-       int len;
-       int ok;
-
-       len = fill_bitmap_rle_bits(mdev, p, c);
+       struct drbd_socket *sock = &mdev->tconn->data;
+       unsigned int header_size = drbd_header_size(mdev->tconn);
+       struct p_compressed_bm *p = sock->sbuf + header_size;
+       int len, err;
 
+       len = fill_bitmap_rle_bits(mdev, p,
+                       DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
        if (len < 0)
                return -EIO;
 
        if (len) {
-               DCBP_set_code(p, RLE_VLI_Bits);
-               ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_COMPRESSED_BITMAP, h,
-                                    sizeof(*p) + len, 0);
-
+               dcbp_set_code(p, RLE_VLI_Bits);
+               err = __send_command(mdev->tconn, mdev->vnr, sock,
+                                    P_COMPRESSED_BITMAP, sizeof(*p) + len,
+                                    NULL, 0);
                c->packets[0]++;
-               c->bytes[0] += sizeof(*p) + len;
+               c->bytes[0] += header_size + sizeof(*p) + len;
 
                if (c->bit_offset >= c->bm_bits)
                        len = 0; /* DONE */
        } else {
                /* was not compressible.
                 * send a buffer full of plain text bits instead. */
-               num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
-               len = num_words * sizeof(long);
+               unsigned int data_size;
+               unsigned long num_words;
+               unsigned long *p = sock->sbuf + header_size;
+
+               data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
+               num_words = min_t(size_t, data_size / sizeof(*p),
+                                 c->bm_words - c->word_offset);
+               len = num_words * sizeof(*p);
                if (len)
-                       drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
-               ok = !_drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
-                                    h, sizeof(struct p_header80) + len, 0);
+                       drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
+               err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
                c->word_offset += num_words;
                c->bit_offset = c->word_offset * BITS_PER_LONG;
 
                c->packets[1]++;
-               c->bytes[1] += sizeof(struct p_header80) + len;
+               c->bytes[1] += header_size + len;
 
                if (c->bit_offset > c->bm_bits)
                        c->bit_offset = c->bm_bits;
        }
-       if (ok) {
+       if (!err) {
                if (len == 0) {
                        INFO_bm_xfer_stats(mdev, "send", c);
                        return 0;
@@ -1170,23 +1416,14 @@ send_bitmap_rle_or_plain(struct drbd_conf *mdev,
 }
 
 /* See the comment at receive_bitmap() */
-int _drbd_send_bitmap(struct drbd_conf *mdev)
+static int _drbd_send_bitmap(struct drbd_conf *mdev)
 {
        struct bm_xfer_ctx c;
-       struct p_header *p;
        int err;
 
        if (!expect(mdev->bitmap))
                return false;
 
-       /* maybe we should use some per thread scratch page,
-        * and allocate that during initial device creation? */
-       p = (struct p_header *) __get_free_page(GFP_NOIO);
-       if (!p) {
-               dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
-               return false;
-       }
-
        if (get_ldev(mdev)) {
                if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
                        dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
@@ -1210,32 +1447,39 @@ int _drbd_send_bitmap(struct drbd_conf *mdev)
        };
 
        do {
-               err = send_bitmap_rle_or_plain(mdev, p, &c);
+               err = send_bitmap_rle_or_plain(mdev, &c);
        } while (err > 0);
 
-       free_page((unsigned long) p);
        return err == 0;
 }
 
 int drbd_send_bitmap(struct drbd_conf *mdev)
 {
-       int err;
+       struct drbd_socket *sock = &mdev->tconn->data;
+       int err = -1;
 
-       if (drbd_get_data_sock(mdev->tconn))
-               return -1;
-       err = !_drbd_send_bitmap(mdev);
-       drbd_put_data_sock(mdev->tconn);
+       mutex_lock(&sock->mutex);
+       if (sock->socket)
+               err = !_drbd_send_bitmap(mdev);
+       mutex_unlock(&sock->mutex);
        return err;
 }
+
 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
 {
-       struct p_barrier_ack p;
+       struct drbd_socket *sock;
+       struct p_barrier_ack *p;
 
-       p.barrier  = barrier_nr;
-       p.set_size = cpu_to_be32(set_size);
+       if (mdev->state.conn < C_CONNECTED)
+               return;
 
-       if (mdev->state.conn >= C_CONNECTED)
-               drbd_send_cmd(mdev, &mdev->tconn->meta, P_BARRIER_ACK, &p.head, sizeof(p));
+       sock = &mdev->tconn->meta;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return;
+       p->barrier = barrier_nr;
+       p->set_size = cpu_to_be32(set_size);
+       drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
 }
 
 /**
@@ -1249,16 +1493,21 @@ void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
                          u64 sector, u32 blksize, u64 block_id)
 {
-       struct p_block_ack p;
+       struct drbd_socket *sock;
+       struct p_block_ack *p;
 
-       p.sector   = sector;
-       p.block_id = block_id;
-       p.blksize  = blksize;
-       p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
+       if (mdev->state.conn < C_CONNECTED)
+               return -EIO;
 
-       if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
+       sock = &mdev->tconn->meta;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
                return -EIO;
-       return drbd_send_cmd(mdev, &mdev->tconn->meta, cmd, &p.head, sizeof(p));
+       p->sector = sector;
+       p->block_id = block_id;
+       p->blksize = blksize;
+       p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 /* dp->sector and dp->block_id already/still in network byte order,
@@ -1267,8 +1516,8 @@ static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
                      struct p_data *dp, int data_size)
 {
-       data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
+       if (mdev->tconn->peer_integrity_tfm)
+               data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
        _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
                       dp->block_id);
 }
@@ -1308,43 +1557,51 @@ int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
                       sector_t sector, int size, u64 block_id)
 {
-       struct p_block_req p;
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = block_id;
-       p.blksize  = cpu_to_be32(size);
-
-       return drbd_send_cmd(mdev, &mdev->tconn->data, cmd, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = block_id;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
 }
 
 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
                            void *digest, int digest_size, enum drbd_packet cmd)
 {
-       int err;
-       struct p_block_req p;
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = ID_SYNCER /* unused */;
-       p.blksize  = cpu_to_be32(size);
+       /* FIXME: Put the digest into the preallocated socket buffer.  */
 
-       mutex_lock(&mdev->tconn->data.mutex);
-       err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0);
-       if (!err)
-               err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0);
-       mutex_unlock(&mdev->tconn->data.mutex);
-       return err;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = ID_SYNCER /* unused */;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, cmd, sizeof(*p),
+                                digest, digest_size);
 }
 
 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
 {
-       struct p_block_req p;
-
-       p.sector   = cpu_to_be64(sector);
-       p.block_id = ID_SYNCER /* unused */;
-       p.blksize  = cpu_to_be32(size);
+       struct drbd_socket *sock;
+       struct p_block_req *p;
 
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_OV_REQUEST, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(sector);
+       p->block_id = ID_SYNCER /* unused */;
+       p->blksize = cpu_to_be32(size);
+       return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
 }
 
 /* called on sndtimeo
@@ -1421,9 +1678,10 @@ static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
                    int offset, size_t size, unsigned msg_flags)
 {
+       struct socket *socket = mdev->tconn->data.socket;
        mm_segment_t oldfs = get_fs();
-       int sent, ok;
        int len = size;
+       int err = -EIO;
 
        /* e.g. XFS meta- & log-data is in slab pages, which have a
         * page_count of 0 and/or have PageSlab() set.
@@ -1432,25 +1690,25 @@ static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
         * __page_cache_release a page that would actually still be referenced
         * by someone, leading to some obscure delayed Oops somewhere else. */
        if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
-               return !_drbd_no_send_page(mdev, page, offset, size, msg_flags);
+               return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
 
        msg_flags |= MSG_NOSIGNAL;
        drbd_update_congested(mdev->tconn);
        set_fs(KERNEL_DS);
        do {
-               sent = mdev->tconn->data.socket->ops->sendpage(mdev->tconn->data.socket, page,
-                                                       offset, len,
-                                                       msg_flags);
-               if (sent == -EAGAIN) {
-                       if (we_should_drop_the_connection(mdev->tconn,
-                                                         mdev->tconn->data.socket))
-                               break;
-                       else
-                               continue;
-               }
+               int sent;
+
+               sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
                if (sent <= 0) {
+                       if (sent == -EAGAIN) {
+                               if (we_should_drop_the_connection(mdev->tconn, socket))
+                                       break;
+                               continue;
+                       }
                        dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
                             __func__, (int)size, len, sent);
+                       if (sent < 0)
+                               err = sent;
                        break;
                }
                len    -= sent;
@@ -1459,10 +1717,11 @@ static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
        set_fs(oldfs);
        clear_bit(NET_CONGESTED, &mdev->tconn->flags);
 
-       ok = (len == 0);
-       if (likely(ok))
-               mdev->send_cnt += size>>9;
-       return ok;
+       if (len == 0) {
+               err = 0;
+               mdev->send_cnt += size >> 9;
+       }
+       return err;
 }
 
 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
@@ -1471,12 +1730,15 @@ static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
        int i;
        /* hint all but last page with MSG_MORE */
        __bio_for_each_segment(bvec, bio, i, 0) {
-               if (_drbd_no_send_page(mdev, bvec->bv_page,
-                                      bvec->bv_offset, bvec->bv_len,
-                                      i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
-                       return 0;
+               int err;
+
+               err = _drbd_no_send_page(mdev, bvec->bv_page,
+                                        bvec->bv_offset, bvec->bv_len,
+                                        i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
+               if (err)
+                       return err;
        }
-       return 1;
+       return 0;
 }
 
 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
@@ -1485,12 +1747,15 @@ static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
        int i;
        /* hint all but last page with MSG_MORE */
        __bio_for_each_segment(bvec, bio, i, 0) {
-               if (!_drbd_send_page(mdev, bvec->bv_page,
-                                    bvec->bv_offset, bvec->bv_len,
-                                    i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
-                       return 0;
+               int err;
+
+               err = _drbd_send_page(mdev, bvec->bv_page,
+                                     bvec->bv_offset, bvec->bv_len,
+                                     i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
+               if (err)
+                       return err;
        }
-       return 1;
+       return 0;
 }
 
 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
@@ -1498,16 +1763,19 @@ static int _drbd_send_zc_ee(struct drbd_conf *mdev,
 {
        struct page *page = peer_req->pages;
        unsigned len = peer_req->i.size;
+       int err;
 
        /* hint all but last page with MSG_MORE */
        page_chain_for_each(page) {
                unsigned l = min_t(unsigned, len, PAGE_SIZE);
-               if (!_drbd_send_page(mdev, page, 0, l,
-                               page_chain_next(page) ? MSG_MORE : 0))
-                       return 0;
+
+               err = _drbd_send_page(mdev, page, 0, l,
+                                     page_chain_next(page) ? MSG_MORE : 0);
+               if (err)
+                       return err;
                len -= l;
        }
-       return 1;
+       return 0;
 }
 
 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
@@ -1526,39 +1794,36 @@ static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
  */
 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
 {
-       int ok = 1;
-       struct p_data p;
+       struct drbd_socket *sock;
+       struct p_data *p;
        unsigned int dp_flags = 0;
-       void *dgb;
        int dgs;
+       int err;
 
-       if (drbd_get_data_sock(mdev->tconn))
-               return 0;
-
-       dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
-
-       prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
-       p.sector   = cpu_to_be64(req->i.sector);
-       p.block_id = (unsigned long)req;
-       p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
 
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(req->i.sector);
+       p->block_id = (unsigned long)req;
+       p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
        dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
-
        if (mdev->state.conn >= C_SYNC_SOURCE &&
            mdev->state.conn <= C_PAUSED_SYNC_T)
                dp_flags |= DP_MAY_SET_IN_SYNC;
-
-       p.dp_flags = cpu_to_be32(dp_flags);
-       set_bit(UNPLUG_REMOTE, &mdev->flags);
-       ok = (sizeof(p) ==
-               drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
-       if (ok && dgs) {
-               dgb = mdev->tconn->int_dig_out;
-               drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
-               ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
-       }
-       if (ok) {
+       if (mdev->tconn->agreed_pro_version >= 100) {
+               if (req->rq_state & RQ_EXP_RECEIVE_ACK)
+                       dp_flags |= DP_SEND_RECEIVE_ACK;
+               if (req->rq_state & RQ_EXP_WRITE_ACK)
+                       dp_flags |= DP_SEND_WRITE_ACK;
+       }
+       p->dp_flags = cpu_to_be32(dp_flags);
+       if (dgs)
+               drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
+       err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
+       if (!err) {
                /* For protocol A, we have to memcpy the payload into
                 * socket buffers, as we may complete right away
                 * as soon as we handed it over to tcp, at which point the data
@@ -1570,18 +1835,18 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                 * out ok after sending on this side, but does not fit on the
                 * receiving side, we sure have detected corruption elsewhere.
                 */
-               if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
-                       ok = _drbd_send_bio(mdev, req->master_bio);
+               if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
+                       err = _drbd_send_bio(mdev, req->master_bio);
                else
-                       ok = _drbd_send_zc_bio(mdev, req->master_bio);
+                       err = _drbd_send_zc_bio(mdev, req->master_bio);
 
                /* double check digest, sometimes buffers have been modified in flight. */
                if (dgs > 0 && dgs <= 64) {
                        /* 64 byte, 512 bit, is the largest digest size
                         * currently supported in kernel crypto. */
                        unsigned char digest[64];
-                       drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
-                       if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
+                       drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
+                       if (memcmp(p + 1, digest, dgs)) {
                                dev_warn(DEV,
                                        "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
                                        (unsigned long long)req->i.sector, req->i.size);
@@ -1590,10 +1855,9 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
                     ... Be noisy about digest too large ...
                } */
        }
+       mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
 
-       drbd_put_data_sock(mdev->tconn);
-
-       return ok;
+       return err;
 }
 
 /* answer packet, used to send data back for read requests:
@@ -1603,50 +1867,43 @@ int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
                    struct drbd_peer_request *peer_req)
 {
-       int ok;
-       struct p_data p;
-       void *dgb;
+       struct drbd_socket *sock;
+       struct p_data *p;
+       int err;
        int dgs;
 
-       dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
-               crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
-
-       prepare_header(mdev, &p.head, cmd, sizeof(p) -
-                                          sizeof(struct p_header80) +
-                                          dgs + peer_req->i.size);
-       p.sector   = cpu_to_be64(peer_req->i.sector);
-       p.block_id = peer_req->block_id;
-       p.seq_num = 0;  /* unused */
-
-       /* Only called by our kernel thread.
-        * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
-        * in response to admin command or module unload.
-        */
-       if (drbd_get_data_sock(mdev->tconn))
-               return 0;
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
 
-       ok = sizeof(p) == drbd_send(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
-       if (ok && dgs) {
-               dgb = mdev->tconn->int_dig_out;
-               drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
-               ok = dgs == drbd_send(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
-       }
-       if (ok)
-               ok = _drbd_send_zc_ee(mdev, peer_req);
+       dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
 
-       drbd_put_data_sock(mdev->tconn);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(peer_req->i.sector);
+       p->block_id = peer_req->block_id;
+       p->seq_num = 0;  /* unused */
+       if (dgs)
+               drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
+       err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
+       if (!err)
+               err = _drbd_send_zc_ee(mdev, peer_req);
+       mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
 
-       return ok;
+       return err;
 }
 
-int drbd_send_oos(struct drbd_conf *mdev, struct drbd_request *req)
+int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
 {
-       struct p_block_desc p;
+       struct drbd_socket *sock;
+       struct p_block_desc *p;
 
-       p.sector  = cpu_to_be64(req->i.sector);
-       p.blksize = cpu_to_be32(req->i.size);
-
-       return drbd_send_cmd(mdev, &mdev->tconn->data, P_OUT_OF_SYNC, &p.head, sizeof(p));
+       sock = &mdev->tconn->data;
+       p = drbd_prepare_command(mdev, sock);
+       if (!p)
+               return -EIO;
+       p->sector = cpu_to_be64(req->i.sector);
+       p->blksize = cpu_to_be32(req->i.size);
+       return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
 }
 
 /*
@@ -1687,7 +1944,9 @@ int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
        msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
 
        if (sock == tconn->data.socket) {
-               tconn->ko_count = tconn->net_conf->ko_count;
+               rcu_read_lock();
+               tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
+               rcu_read_unlock();
                drbd_update_congested(tconn);
        }
        do {
@@ -1791,15 +2050,12 @@ static void drbd_set_defaults(struct drbd_conf *mdev)
 {
        /* Beware! The actual layout differs
         * between big endian and little endian */
-       mdev->state = (union drbd_state) {
+       mdev->state = (union drbd_dev_state) {
                { .role = R_SECONDARY,
                  .peer = R_UNKNOWN,
                  .conn = C_STANDALONE,
                  .disk = D_DISKLESS,
                  .pdsk = D_UNKNOWN,
-                 .susp = 0,
-                 .susp_nod = 0,
-                 .susp_fen = 0
                } };
 }
 
@@ -1815,19 +2071,17 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        atomic_set(&mdev->rs_pending_cnt, 0);
        atomic_set(&mdev->unacked_cnt, 0);
        atomic_set(&mdev->local_cnt, 0);
-       atomic_set(&mdev->pp_in_use, 0);
        atomic_set(&mdev->pp_in_use_by_net, 0);
        atomic_set(&mdev->rs_sect_in, 0);
        atomic_set(&mdev->rs_sect_ev, 0);
        atomic_set(&mdev->ap_in_flight, 0);
+       atomic_set(&mdev->md_io_in_use, 0);
 
-       mutex_init(&mdev->md_io_mutex);
        mutex_init(&mdev->own_state_mutex);
        mdev->state_mutex = &mdev->own_state_mutex;
 
        spin_lock_init(&mdev->al_lock);
        spin_lock_init(&mdev->peer_seq_lock);
-       spin_lock_init(&mdev->epoch_lock);
 
        INIT_LIST_HEAD(&mdev->active_ee);
        INIT_LIST_HEAD(&mdev->sync_ee);
@@ -1875,8 +2129,6 @@ void drbd_init_set_defaults(struct drbd_conf *mdev)
        init_waitqueue_head(&mdev->al_wait);
        init_waitqueue_head(&mdev->seq_wait);
 
-       /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
-       mdev->write_ordering = WO_bdev_flush;
        mdev->resync_wenr = LC_FREE;
        mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
        mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
@@ -1889,9 +2141,6 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
                dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
                                mdev->tconn->receiver.t_state);
 
-       /* no need to lock it, I'm the only thread alive */
-       if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
-               dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
        mdev->al_writ_cnt  =
        mdev->bm_writ_cnt  =
        mdev->read_cnt     =
@@ -1917,13 +2166,11 @@ void drbd_mdev_cleanup(struct drbd_conf *mdev)
                drbd_bm_cleanup(mdev);
        }
 
-       drbd_free_resources(mdev);
+       drbd_free_bc(mdev->ldev);
+       mdev->ldev = NULL;
+
        clear_bit(AL_SUSPENDED, &mdev->flags);
 
-       /*
-        * currently we drbd_init_ee only on module load, so
-        * we may do drbd_release_ee only on module unload!
-        */
        D_ASSERT(list_empty(&mdev->active_ee));
        D_ASSERT(list_empty(&mdev->sync_ee));
        D_ASSERT(list_empty(&mdev->done_ee));
@@ -2071,59 +2318,54 @@ static struct notifier_block drbd_notifier = {
        .notifier_call = drbd_notify_sys,
 };
 
-static void drbd_release_ee_lists(struct drbd_conf *mdev)
+static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
 {
        int rr;
 
-       rr = drbd_release_ee(mdev, &mdev->active_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
        if (rr)
                dev_err(DEV, "%d EEs in active list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->sync_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
        if (rr)
                dev_err(DEV, "%d EEs in sync list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->read_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
        if (rr)
                dev_err(DEV, "%d EEs in read list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->done_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
        if (rr)
                dev_err(DEV, "%d EEs in done list found!\n", rr);
 
-       rr = drbd_release_ee(mdev, &mdev->net_ee);
+       rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
        if (rr)
                dev_err(DEV, "%d EEs in net list found!\n", rr);
 }
 
 /* caution. no locking. */
-void drbd_delete_device(unsigned int minor)
+void drbd_minor_destroy(struct kref *kref)
 {
-       struct drbd_conf *mdev = minor_to_mdev(minor);
+       struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
+       struct drbd_tconn *tconn = mdev->tconn;
 
-       if (!mdev)
-               return;
-
-       idr_remove(&mdev->tconn->volumes, mdev->vnr);
-       idr_remove(&minors, minor);
-       synchronize_rcu();
+       del_timer_sync(&mdev->request_timer);
 
        /* paranoia asserts */
        D_ASSERT(mdev->open_cnt == 0);
        D_ASSERT(list_empty(&mdev->tconn->data.work.q));
        /* end paranoia asserts */
 
-       del_gendisk(mdev->vdisk);
-
        /* cleanup stuff that may have been allocated during
         * device (re-)configuration or state changes */
 
        if (mdev->this_bdev)
                bdput(mdev->this_bdev);
 
-       drbd_free_resources(mdev);
+       drbd_free_bc(mdev->ldev);
+       mdev->ldev = NULL;
 
-       drbd_release_ee_lists(mdev);
+       drbd_release_all_peer_reqs(mdev);
 
        lc_destroy(mdev->act_log);
        lc_destroy(mdev->resync);
@@ -2131,16 +2373,22 @@ void drbd_delete_device(unsigned int minor)
        kfree(mdev->p_uuid);
        /* mdev->p_uuid = NULL; */
 
-       /* cleanup the rest that has been
-        * allocated from drbd_new_device
-        * and actually free the mdev itself */
-       drbd_free_mdev(mdev);
+       if (mdev->bitmap) /* should no longer be there. */
+               drbd_bm_cleanup(mdev);
+       __free_page(mdev->md_io_page);
+       put_disk(mdev->vdisk);
+       blk_cleanup_queue(mdev->rq_queue);
+       kfree(mdev->rs_plan_s);
+       kfree(mdev);
+
+       kref_put(&tconn->kref, &conn_destroy);
 }
 
 static void drbd_cleanup(void)
 {
        unsigned int i;
        struct drbd_conf *mdev;
+       struct drbd_tconn *tconn, *tmp;
 
        unregister_reboot_notifier(&drbd_notifier);
 
@@ -2157,8 +2405,21 @@ static void drbd_cleanup(void)
 
        drbd_genl_unregister();
 
-       idr_for_each_entry(&minors, mdev, i)
-               drbd_delete_device(i);
+       idr_for_each_entry(&minors, mdev, i) {
+               idr_remove(&minors, mdev_to_minor(mdev));
+               idr_remove(&mdev->tconn->volumes, mdev->vnr);
+               del_gendisk(mdev->vdisk);
+               /* synchronize_rcu(); No other threads running at this point */
+               kref_put(&mdev->kref, &drbd_minor_destroy);
+       }
+
+       /* not _rcu since, no other updater anymore. Genl already unregistered */
+       list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
+               list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
+               /* synchronize_rcu(); */
+               kref_put(&tconn->kref, &conn_destroy);
+       }
+
        drbd_destroy_mempools();
        unregister_blkdev(DRBD_MAJOR, "drbd");
 
@@ -2213,25 +2474,126 @@ static void drbd_init_workqueue(struct drbd_work_queue* wq)
        INIT_LIST_HEAD(&wq->q);
 }
 
-struct drbd_tconn *conn_by_name(const char *name)
+struct drbd_tconn *conn_get_by_name(const char *name)
 {
        struct drbd_tconn *tconn;
 
        if (!name || !name[0])
                return NULL;
 
-       mutex_lock(&drbd_cfg_mutex);
-       list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
-               if (!strcmp(tconn->name, name))
+       rcu_read_lock();
+       list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
+               if (!strcmp(tconn->name, name)) {
+                       kref_get(&tconn->kref);
+                       goto found;
+               }
+       }
+       tconn = NULL;
+found:
+       rcu_read_unlock();
+       return tconn;
+}
+
+struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
+                                    void *peer_addr, int peer_addr_len)
+{
+       struct drbd_tconn *tconn;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
+               if (tconn->my_addr_len == my_addr_len &&
+                   tconn->peer_addr_len == peer_addr_len &&
+                   !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
+                   !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
+                       kref_get(&tconn->kref);
                        goto found;
+               }
        }
        tconn = NULL;
 found:
-       mutex_unlock(&drbd_cfg_mutex);
+       rcu_read_unlock();
        return tconn;
 }
 
-struct drbd_tconn *drbd_new_tconn(const char *name)
+static int drbd_alloc_socket(struct drbd_socket *socket)
+{
+       socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
+       if (!socket->rbuf)
+               return -ENOMEM;
+       socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
+       if (!socket->sbuf)
+               return -ENOMEM;
+       return 0;
+}
+
+static void drbd_free_socket(struct drbd_socket *socket)
+{
+       free_page((unsigned long) socket->sbuf);
+       free_page((unsigned long) socket->rbuf);
+}
+
+void conn_free_crypto(struct drbd_tconn *tconn)
+{
+       drbd_free_sock(tconn);
+
+       crypto_free_hash(tconn->csums_tfm);
+       crypto_free_hash(tconn->verify_tfm);
+       crypto_free_hash(tconn->cram_hmac_tfm);
+       crypto_free_hash(tconn->integrity_tfm);
+       crypto_free_hash(tconn->peer_integrity_tfm);
+       kfree(tconn->int_dig_in);
+       kfree(tconn->int_dig_vv);
+
+       tconn->csums_tfm = NULL;
+       tconn->verify_tfm = NULL;
+       tconn->cram_hmac_tfm = NULL;
+       tconn->integrity_tfm = NULL;
+       tconn->peer_integrity_tfm = NULL;
+       tconn->int_dig_in = NULL;
+       tconn->int_dig_vv = NULL;
+}
+
+int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
+{
+       cpumask_var_t new_cpu_mask;
+       int err;
+
+       if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
+               return -ENOMEM;
+               /*
+               retcode = ERR_NOMEM;
+               drbd_msg_put_info("unable to allocate cpumask");
+               */
+
+       /* silently ignore cpu mask on UP kernel */
+       if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
+               /* FIXME: Get rid of constant 32 here */
+               err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
+                               cpumask_bits(new_cpu_mask), nr_cpu_ids);
+               if (err) {
+                       conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
+                       /* retcode = ERR_CPU_MASK_PARSE; */
+                       goto fail;
+               }
+       }
+       tconn->res_opts = *res_opts;
+       if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
+               cpumask_copy(tconn->cpu_mask, new_cpu_mask);
+               drbd_calc_cpu_mask(tconn);
+               tconn->receiver.reset_cpu_mask = 1;
+               tconn->asender.reset_cpu_mask = 1;
+               tconn->worker.reset_cpu_mask = 1;
+       }
+       err = 0;
+
+fail:
+       free_cpumask_var(new_cpu_mask);
+       return err;
+
+}
+
+/* caller must be under genl_lock() */
+struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
 {
        struct drbd_tconn *tconn;
 
@@ -2243,17 +2605,32 @@ struct drbd_tconn *drbd_new_tconn(const char *name)
        if (!tconn->name)
                goto fail;
 
+       if (drbd_alloc_socket(&tconn->data))
+               goto fail;
+       if (drbd_alloc_socket(&tconn->meta))
+               goto fail;
+
        if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
                goto fail;
 
+       if (set_resource_options(tconn, res_opts))
+               goto fail;
+
        if (!tl_init(tconn))
                goto fail;
 
+       tconn->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
+       if (!tconn->current_epoch)
+               goto fail;
+       INIT_LIST_HEAD(&tconn->current_epoch->list);
+       tconn->epochs = 1;
+       spin_lock_init(&tconn->epoch_lock);
+       tconn->write_ordering = WO_bdev_flush;
+
        tconn->cstate = C_STANDALONE;
        mutex_init(&tconn->cstate_mutex);
        spin_lock_init(&tconn->req_lock);
-       atomic_set(&tconn->net_cnt, 0);
-       init_waitqueue_head(&tconn->net_cnt_wait);
+       mutex_init(&tconn->conf_update);
        init_waitqueue_head(&tconn->ping_wait);
        idr_init(&tconn->volumes);
 
@@ -2267,34 +2644,37 @@ struct drbd_tconn *drbd_new_tconn(const char *name)
        drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
        drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
 
-       tconn->res_opts = (struct res_opts) {
-               {}, 0, /* cpu_mask */
-               DRBD_ON_NO_DATA_DEF, /* on_no_data */
-       };
-
-       mutex_lock(&drbd_cfg_mutex);
-       list_add_tail(&tconn->all_tconn, &drbd_tconns);
-       mutex_unlock(&drbd_cfg_mutex);
+       kref_init(&tconn->kref);
+       list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
 
        return tconn;
 
 fail:
+       kfree(tconn->current_epoch);
        tl_cleanup(tconn);
        free_cpumask_var(tconn->cpu_mask);
+       drbd_free_socket(&tconn->meta);
+       drbd_free_socket(&tconn->data);
        kfree(tconn->name);
        kfree(tconn);
 
        return NULL;
 }
 
-void drbd_free_tconn(struct drbd_tconn *tconn)
+void conn_destroy(struct kref *kref)
 {
-       list_del(&tconn->all_tconn);
+       struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
+
+       if (atomic_read(&tconn->current_epoch->epoch_size) !=  0)
+               conn_err(tconn, "epoch_size:%d\n", atomic_read(&tconn->current_epoch->epoch_size));
+       kfree(tconn->current_epoch);
+
        idr_destroy(&tconn->volumes);
 
        free_cpumask_var(tconn->cpu_mask);
+       drbd_free_socket(&tconn->meta);
+       drbd_free_socket(&tconn->data);
        kfree(tconn->name);
-       kfree(tconn->int_dig_out);
        kfree(tconn->int_dig_in);
        kfree(tconn->int_dig_vv);
        kfree(tconn);
@@ -2318,7 +2698,9 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
        if (!mdev)
                return ERR_NOMEM;
 
+       kref_get(&tconn->kref);
        mdev->tconn = tconn;
+
        mdev->minor = minor;
        mdev->vnr = vnr;
 
@@ -2368,13 +2750,6 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
        mdev->read_requests = RB_ROOT;
        mdev->write_requests = RB_ROOT;
 
-       mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
-       if (!mdev->current_epoch)
-               goto out_no_epoch;
-
-       INIT_LIST_HEAD(&mdev->current_epoch->list);
-       mdev->epochs = 1;
-
        if (!idr_pre_get(&minors, GFP_KERNEL))
                goto out_no_minor_idr;
        if (idr_get_new_above(&minors, mdev, minor, &minor_got))
@@ -2395,11 +2770,12 @@ enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor,
                goto out_idr_remove_vol;
        }
        add_disk(disk);
+       kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
 
        /* inherit the connection state */
        mdev->state.conn = tconn->cstate;
        if (mdev->state.conn == C_WF_REPORT_PARAMS)
-               drbd_connected(vnr, mdev, tconn);
+               drbd_connected(mdev);
 
        return NO_ERROR;
 
@@ -2409,8 +2785,6 @@ out_idr_remove_minor:
        idr_remove(&minors, minor_got);
        synchronize_rcu();
 out_no_minor_idr:
-       kfree(mdev->current_epoch);
-out_no_epoch:
        drbd_bm_cleanup(mdev);
 out_no_bitmap:
        __free_page(mdev->md_io_page);
@@ -2420,37 +2794,21 @@ out_no_disk:
        blk_cleanup_queue(q);
 out_no_q:
        kfree(mdev);
+       kref_put(&tconn->kref, &conn_destroy);
        return err;
 }
 
-/* counterpart of drbd_new_device.
- * last part of drbd_delete_device. */
-void drbd_free_mdev(struct drbd_conf *mdev)
-{
-       kfree(mdev->current_epoch);
-       if (mdev->bitmap) /* should no longer be there. */
-               drbd_bm_cleanup(mdev);
-       __free_page(mdev->md_io_page);
-       put_disk(mdev->vdisk);
-       blk_cleanup_queue(mdev->rq_queue);
-       kfree(mdev);
-}
-
-
 int __init drbd_init(void)
 {
        int err;
 
-       BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
-       BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
-
        if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
                printk(KERN_ERR
                       "drbd: invalid minor_count (%d)\n", minor_count);
 #ifdef MODULE
                return -EINVAL;
 #else
-               minor_count = 8;
+               minor_count = DRBD_MINOR_COUNT_DEF;
 #endif
        }
 
@@ -2542,27 +2900,6 @@ void drbd_free_sock(struct drbd_tconn *tconn)
        }
 }
 
-
-void drbd_free_resources(struct drbd_conf *mdev)
-{
-       crypto_free_hash(mdev->tconn->csums_tfm);
-       mdev->tconn->csums_tfm = NULL;
-       crypto_free_hash(mdev->tconn->verify_tfm);
-       mdev->tconn->verify_tfm = NULL;
-       crypto_free_hash(mdev->tconn->cram_hmac_tfm);
-       mdev->tconn->cram_hmac_tfm = NULL;
-       crypto_free_hash(mdev->tconn->integrity_w_tfm);
-       mdev->tconn->integrity_w_tfm = NULL;
-       crypto_free_hash(mdev->tconn->integrity_r_tfm);
-       mdev->tconn->integrity_r_tfm = NULL;
-
-       drbd_free_sock(mdev->tconn);
-
-       __no_warn(local,
-                 drbd_free_bc(mdev->ldev);
-                 mdev->ldev = NULL;);
-}
-
 /* meta data management */
 
 struct meta_data_on_disk {
@@ -2603,15 +2940,17 @@ void drbd_md_sync(struct drbd_conf *mdev)
        if (!get_ldev_if_state(mdev, D_FAILED))
                return;
 
-       mutex_lock(&mdev->md_io_mutex);
-       buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
+       buffer = drbd_md_get_buffer(mdev);
+       if (!buffer)
+               goto out;
+
        memset(buffer, 0, 512);
 
        buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
        for (i = UI_CURRENT; i < UI_SIZE; i++)
                buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
        buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
-       buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
+       buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
 
        buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
        buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
@@ -2625,7 +2964,7 @@ void drbd_md_sync(struct drbd_conf *mdev)
        D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
        sector = mdev->ldev->md.md_offset;
 
-       if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
+       if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
                /* this was a try anyways ... */
                dev_err(DEV, "meta data update failed!\n");
                drbd_chk_io_error(mdev, 1, true);
@@ -2635,7 +2974,8 @@ void drbd_md_sync(struct drbd_conf *mdev)
         * since we updated it on metadata. */
        mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
 
-       mutex_unlock(&mdev->md_io_mutex);
+       drbd_md_put_buffer(mdev);
+out:
        put_ldev(mdev);
 }
 
@@ -2645,20 +2985,22 @@ void drbd_md_sync(struct drbd_conf *mdev)
  * @bdev:      Device from which the meta data should be read in.
  *
  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
- * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
+ * something goes wrong.
  */
 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
 {
        struct meta_data_on_disk *buffer;
+       u32 magic, flags;
        int i, rv = NO_ERROR;
 
        if (!get_ldev_if_state(mdev, D_ATTACHING))
                return ERR_IO_MD_DISK;
 
-       mutex_lock(&mdev->md_io_mutex);
-       buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
+       buffer = drbd_md_get_buffer(mdev);
+       if (!buffer)
+               goto out;
 
-       if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
+       if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
                /* NOTE: can't do normal error processing here as this is
                   called BEFORE disk is attached */
                dev_err(DEV, "Error while reading metadata.\n");
@@ -2666,8 +3008,20 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
                goto err;
        }
 
-       if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
-               dev_err(DEV, "Error while reading metadata, magic not found.\n");
+       magic = be32_to_cpu(buffer->magic);
+       flags = be32_to_cpu(buffer->flags);
+       if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
+           (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
+                       /* btw: that's Activity Log clean, not "all" clean. */
+               dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
+               rv = ERR_MD_UNCLEAN;
+               goto err;
+       }
+       if (magic != DRBD_MD_MAGIC_08) {
+               if (magic == DRBD_MD_MAGIC_07)
+                       dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
+               else
+                       dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
                rv = ERR_MD_INVALID;
                goto err;
        }
@@ -2701,7 +3055,6 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        for (i = UI_CURRENT; i < UI_SIZE; i++)
                bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
        bdev->md.flags = be32_to_cpu(buffer->flags);
-       bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
        bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
 
        spin_lock_irq(&mdev->tconn->req_lock);
@@ -2713,11 +3066,9 @@ int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
        }
        spin_unlock_irq(&mdev->tconn->req_lock);
 
-       if (bdev->dc.al_extents < 7)
-               bdev->dc.al_extents = 127;
-
  err:
-       mutex_unlock(&mdev->md_io_mutex);
+       drbd_md_put_buffer(mdev);
+ out:
        put_ldev(mdev);
 
        return rv;
@@ -2897,7 +3248,7 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
        work->why = NULL;
        work->flags = 0;
 
-       return 1;
+       return 0;
 }
 
 void drbd_ldev_destroy(struct drbd_conf *mdev)
@@ -2923,7 +3274,7 @@ static int w_go_diskless(struct drbd_work *w, int unused)
         * the protected members anymore, though, so once put_ldev reaches zero
         * again, it will be safe to free them. */
        drbd_force_state(mdev, NS(disk, D_DISKLESS));
-       return 1;
+       return 0;
 }
 
 void drbd_go_diskless(struct drbd_conf *mdev)
@@ -3039,7 +3390,7 @@ static int w_md_sync(struct drbd_work *w, int unused)
                mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
 #endif
        drbd_md_sync(mdev);
-       return 1;
+       return 0;
 }
 
 const char *cmdname(enum drbd_packet cmd)
@@ -3088,14 +3439,25 @@ const char *cmdname(enum drbd_packet cmd)
                [P_DELAY_PROBE]         = "DelayProbe",
                [P_OUT_OF_SYNC]         = "OutOfSync",
                [P_RETRY_WRITE]         = "RetryWrite",
+               [P_RS_CANCEL]           = "RSCancel",
+               [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
+               [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
+               [P_RETRY_WRITE]         = "retry_write",
+               [P_PROTOCOL_UPDATE]     = "protocol_update",
+
+               /* enum drbd_packet, but not commands - obsoleted flags:
+                *      P_MAY_IGNORE
+                *      P_MAX_OPT_CMD
+                */
        };
 
-       if (cmd == P_HAND_SHAKE_M)
-               return "HandShakeM";
-       if (cmd == P_HAND_SHAKE_S)
-               return "HandShakeS";
-       if (cmd == P_HAND_SHAKE)
-               return "HandShake";
+       /* too big for the array: 0xfffX */
+       if (cmd == P_INITIAL_META)
+               return "InitialMeta";
+       if (cmd == P_INITIAL_DATA)
+               return "InitialData";
+       if (cmd == P_CONNECTION_FEATURES)
+               return "ConnectionFeatures";
        if (cmd >= ARRAY_SIZE(cmdnames))
                return "Unknown";
        return cmdnames[cmd];
@@ -3109,15 +3471,18 @@ const char *cmdname(enum drbd_packet cmd)
  */
 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
 {
-       struct net_conf *net_conf = mdev->tconn->net_conf;
+       struct net_conf *nc;
        DEFINE_WAIT(wait);
        long timeout;
 
-       if (!net_conf)
+       rcu_read_lock();
+       nc = rcu_dereference(mdev->tconn->net_conf);
+       if (!nc) {
+               rcu_read_unlock();
                return -ETIMEDOUT;
-       timeout = MAX_SCHEDULE_TIMEOUT;
-       if (net_conf->ko_count)
-               timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
+       }
+       timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
+       rcu_read_unlock();
 
        /* Indicate to wake up mdev->misc_wait on progress.  */
        i->waiting = true;