4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75 * some helper functions to deal with single linked page lists,
76 * page->private being our "next" pointer.
79 /* If at least n pages are linked at head, get n pages off.
80 * Otherwise, don't modify head, and return NULL.
81 * Locking is the responsibility of the caller.
83 static struct page *page_chain_del(struct page **head, int n)
97 tmp = page_chain_next(page);
99 break; /* found sufficient pages */
101 /* insufficient pages, don't use any of them. */
106 /* add end of list marker for the returned list */
107 set_page_private(page, 0);
108 /* actual return value, and adjustment of head */
114 /* may be used outside of locks to find the tail of a (usually short)
115 * "private" page chain, before adding it back to a global chain head
116 * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
121 while ((tmp = page_chain_next(page)))
128 static int page_chain_free(struct page *page)
132 page_chain_for_each_safe(page, tmp) {
139 static void page_chain_add(struct page **head,
140 struct page *chain_first, struct page *chain_last)
144 tmp = page_chain_tail(chain_first, NULL);
145 BUG_ON(tmp != chain_last);
148 /* add chain to head */
149 set_page_private(chain_last, (unsigned long)*head);
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
156 struct page *page = NULL;
157 struct page *tmp = NULL;
160 /* Yes, testing drbd_pp_vacant outside the lock is racy.
161 * So what. It saves a spin_lock. */
162 if (drbd_pp_vacant >= number) {
163 spin_lock(&drbd_pp_lock);
164 page = page_chain_del(&drbd_pp_pool, number);
166 drbd_pp_vacant -= number;
167 spin_unlock(&drbd_pp_lock);
172 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173 * "criss-cross" setup, that might cause write-out on some other DRBD,
174 * which in turn might block on the other node at this very place. */
175 for (i = 0; i < number; i++) {
176 tmp = alloc_page(GFP_TRY);
179 set_page_private(tmp, (unsigned long)page);
186 /* Not enough pages immediately available this time.
187 * No need to jump around here, drbd_alloc_pages will retry this
188 * function "soon". */
190 tmp = page_chain_tail(page, NULL);
191 spin_lock(&drbd_pp_lock);
192 page_chain_add(&drbd_pp_pool, page, tmp);
194 spin_unlock(&drbd_pp_lock);
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200 struct list_head *to_be_freed)
202 struct drbd_peer_request *peer_req;
203 struct list_head *le, *tle;
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
210 list_for_each_safe(le, tle, &mdev->net_ee) {
211 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212 if (drbd_peer_req_has_active_page(peer_req))
214 list_move(le, to_be_freed);
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
220 LIST_HEAD(reclaimed);
221 struct drbd_peer_request *peer_req, *t;
223 spin_lock_irq(&mdev->tconn->req_lock);
224 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225 spin_unlock_irq(&mdev->tconn->req_lock);
227 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228 drbd_free_net_peer_req(mdev, peer_req);
232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233 * @mdev: DRBD device.
234 * @number: number of pages requested
235 * @retry: whether to retry, if not enough pages are available right now
237 * Tries to allocate number pages, first from our own page pool, then from
238 * the kernel, unless this allocation would exceed the max_buffers setting.
239 * Possibly retry until DRBD frees sufficient pages somewhere else.
241 * Returns a page chain linked via page->private.
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
246 struct page *page = NULL;
251 /* Yes, we may run up to @number over max_buffers. If we
252 * follow it strictly, the admin will get it wrong anyways. */
254 nc = rcu_dereference(mdev->tconn->net_conf);
255 mxb = nc ? nc->max_buffers : 1000000;
258 if (atomic_read(&mdev->pp_in_use) < mxb)
259 page = __drbd_alloc_pages(mdev, number);
261 while (page == NULL) {
262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
264 drbd_kick_lo_and_reclaim_net(mdev);
266 if (atomic_read(&mdev->pp_in_use) < mxb) {
267 page = __drbd_alloc_pages(mdev, number);
275 if (signal_pending(current)) {
276 dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
282 finish_wait(&drbd_pp_wait, &wait);
285 atomic_add(number, &mdev->pp_in_use);
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291 * Either links the page chain back to the global pool,
292 * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
295 atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
298 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299 i = page_chain_free(page);
302 tmp = page_chain_tail(page, &i);
303 spin_lock(&drbd_pp_lock);
304 page_chain_add(&drbd_pp_pool, page, tmp);
306 spin_unlock(&drbd_pp_lock);
308 i = atomic_sub_return(i, a);
310 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312 wake_up(&drbd_pp_wait);
316 You need to hold the req_lock:
317 _drbd_wait_ee_list_empty()
319 You must not have the req_lock:
321 drbd_alloc_peer_req()
322 drbd_free_peer_reqs()
324 drbd_finish_peer_reqs()
326 drbd_wait_ee_list_empty()
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
333 struct drbd_peer_request *peer_req;
335 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
337 if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
340 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
342 if (!(gfp_mask & __GFP_NOWARN))
343 dev_err(DEV, "%s: allocation failed\n", __func__);
347 page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
351 drbd_clear_interval(&peer_req->i);
352 peer_req->i.size = data_size;
353 peer_req->i.sector = sector;
354 peer_req->i.local = false;
355 peer_req->i.waiting = false;
357 peer_req->epoch = NULL;
358 peer_req->w.mdev = mdev;
359 peer_req->pages = page;
360 atomic_set(&peer_req->pending_bios, 0);
363 * The block_id is opaque to the receiver. It is not endianness
364 * converted, and sent back to the sender unchanged.
366 peer_req->block_id = id;
371 mempool_free(peer_req, drbd_ee_mempool);
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
378 if (peer_req->flags & EE_HAS_DIGEST)
379 kfree(peer_req->digest);
380 drbd_free_pages(mdev, peer_req->pages, is_net);
381 D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382 D_ASSERT(drbd_interval_empty(&peer_req->i));
383 mempool_free(peer_req, drbd_ee_mempool);
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
388 LIST_HEAD(work_list);
389 struct drbd_peer_request *peer_req, *t;
391 int is_net = list == &mdev->net_ee;
393 spin_lock_irq(&mdev->tconn->req_lock);
394 list_splice_init(list, &work_list);
395 spin_unlock_irq(&mdev->tconn->req_lock);
397 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398 __drbd_free_peer_req(mdev, peer_req, is_net);
405 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
409 LIST_HEAD(work_list);
410 LIST_HEAD(reclaimed);
411 struct drbd_peer_request *peer_req, *t;
414 spin_lock_irq(&mdev->tconn->req_lock);
415 reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416 list_splice_init(&mdev->done_ee, &work_list);
417 spin_unlock_irq(&mdev->tconn->req_lock);
419 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420 drbd_free_net_peer_req(mdev, peer_req);
422 /* possible callbacks here:
423 * e_end_block, and e_end_resync_block, e_send_discard_write.
424 * all ignore the last argument.
426 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
429 /* list_del not necessary, next/prev members not touched */
430 err2 = peer_req->w.cb(&peer_req->w, !!err);
433 drbd_free_peer_req(mdev, peer_req);
435 wake_up(&mdev->ee_wait);
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441 struct list_head *head)
445 /* avoids spin_lock/unlock
446 * and calling prepare_to_wait in the fast path */
447 while (!list_empty(head)) {
448 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449 spin_unlock_irq(&mdev->tconn->req_lock);
451 finish_wait(&mdev->ee_wait, &wait);
452 spin_lock_irq(&mdev->tconn->req_lock);
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457 struct list_head *head)
459 spin_lock_irq(&mdev->tconn->req_lock);
460 _drbd_wait_ee_list_empty(mdev, head);
461 spin_unlock_irq(&mdev->tconn->req_lock);
464 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
471 struct msghdr msg = {
473 .msg_iov = (struct iovec *)&iov,
474 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
480 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
486 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
493 struct msghdr msg = {
495 .msg_iov = (struct iovec *)&iov,
496 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
504 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
509 * ECONNRESET other side closed the connection
510 * ERESTARTSYS (on sock) we got a signal
514 if (rv == -ECONNRESET)
515 conn_info(tconn, "sock was reset by peer\n");
516 else if (rv != -ERESTARTSYS)
517 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
519 } else if (rv == 0) {
520 conn_info(tconn, "sock was shut down by peer\n");
523 /* signal came in, or peer/link went down,
524 * after we read a partial message
526 /* D_ASSERT(signal_pending(current)); */
534 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
539 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
543 err = drbd_recv(tconn, buf, size);
552 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
556 err = drbd_recv_all(tconn, buf, size);
557 if (err && !signal_pending(current))
558 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
563 * On individual connections, the socket buffer size must be set prior to the
564 * listen(2) or connect(2) calls in order to have it take effect.
565 * This is our wrapper to do so.
567 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
570 /* open coded SO_SNDBUF, SO_RCVBUF */
572 sock->sk->sk_sndbuf = snd;
573 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
576 sock->sk->sk_rcvbuf = rcv;
577 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
581 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
585 struct sockaddr_in6 src_in6;
586 struct sockaddr_in6 peer_in6;
588 int err, peer_addr_len, my_addr_len;
589 int sndbuf_size, rcvbuf_size, connect_int;
590 int disconnect_on_error = 1;
593 nc = rcu_dereference(tconn->net_conf);
598 sndbuf_size = nc->sndbuf_size;
599 rcvbuf_size = nc->rcvbuf_size;
600 connect_int = nc->connect_int;
603 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
604 memcpy(&src_in6, &tconn->my_addr, my_addr_len);
606 if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
607 src_in6.sin6_port = 0;
609 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
611 peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
612 memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
614 what = "sock_create_kern";
615 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
616 SOCK_STREAM, IPPROTO_TCP, &sock);
622 sock->sk->sk_rcvtimeo =
623 sock->sk->sk_sndtimeo = connect_int * HZ;
624 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
626 /* explicitly bind to the configured IP as source IP
627 * for the outgoing connections.
628 * This is needed for multihomed hosts and to be
629 * able to use lo: interfaces for drbd.
630 * Make sure to use 0 as port number, so linux selects
631 * a free one dynamically.
633 what = "bind before connect";
634 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
638 /* connect may fail, peer not yet available.
639 * stay C_WF_CONNECTION, don't go Disconnecting! */
640 disconnect_on_error = 0;
642 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
651 /* timeout, busy, signal pending */
652 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
653 case EINTR: case ERESTARTSYS:
654 /* peer not (yet) available, network problem */
655 case ECONNREFUSED: case ENETUNREACH:
656 case EHOSTDOWN: case EHOSTUNREACH:
657 disconnect_on_error = 0;
660 conn_err(tconn, "%s failed, err = %d\n", what, err);
662 if (disconnect_on_error)
663 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
669 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
671 int timeo, err, my_addr_len;
672 int sndbuf_size, rcvbuf_size, connect_int;
673 struct socket *s_estab = NULL, *s_listen;
674 struct sockaddr_in6 my_addr;
679 nc = rcu_dereference(tconn->net_conf);
684 sndbuf_size = nc->sndbuf_size;
685 rcvbuf_size = nc->rcvbuf_size;
686 connect_int = nc->connect_int;
689 my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
690 memcpy(&my_addr, &tconn->my_addr, my_addr_len);
692 what = "sock_create_kern";
693 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
694 SOCK_STREAM, IPPROTO_TCP, &s_listen);
700 timeo = connect_int * HZ;
701 timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
703 s_listen->sk->sk_reuse = 1; /* SO_REUSEADDR */
704 s_listen->sk->sk_rcvtimeo = timeo;
705 s_listen->sk->sk_sndtimeo = timeo;
706 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
708 what = "bind before listen";
709 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
714 err = s_listen->ops->listen(s_listen, 5);
719 err = kernel_accept(s_listen, &s_estab, 0);
723 sock_release(s_listen);
725 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
726 conn_err(tconn, "%s failed, err = %d\n", what, err);
727 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
734 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
736 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
737 enum drbd_packet cmd)
739 if (!conn_prepare_command(tconn, sock))
741 return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
744 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
746 unsigned int header_size = drbd_header_size(tconn);
747 struct packet_info pi;
750 err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
751 if (err != header_size) {
756 err = decode_header(tconn, tconn->data.rbuf, &pi);
763 * drbd_socket_okay() - Free the socket if its connection is not okay
764 * @sock: pointer to the pointer to the socket.
766 static int drbd_socket_okay(struct socket **sock)
774 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
776 if (rr > 0 || rr == -EAGAIN) {
784 /* Gets called if a connection is established, or if a new minor gets created
786 int drbd_connected(struct drbd_conf *mdev)
790 atomic_set(&mdev->packet_seq, 0);
793 mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
794 &mdev->tconn->cstate_mutex :
795 &mdev->own_state_mutex;
797 err = drbd_send_sync_param(mdev);
799 err = drbd_send_sizes(mdev, 0, 0);
801 err = drbd_send_uuids(mdev);
803 err = drbd_send_current_state(mdev);
804 clear_bit(USE_DEGR_WFC_T, &mdev->flags);
805 clear_bit(RESIZE_PENDING, &mdev->flags);
806 mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
812 * 1 yes, we have a valid connection
813 * 0 oops, did not work out, please try again
814 * -1 peer talks different language,
815 * no point in trying again, please go standalone.
816 * -2 We do not have a network config...
818 static int conn_connect(struct drbd_tconn *tconn)
820 struct drbd_socket sock, msock;
821 struct drbd_conf *mdev;
823 int vnr, timeout, try, h, ok;
824 bool discard_my_data;
825 enum drbd_state_rv rv;
827 if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
830 mutex_init(&sock.mutex);
831 sock.sbuf = tconn->data.sbuf;
832 sock.rbuf = tconn->data.rbuf;
834 mutex_init(&msock.mutex);
835 msock.sbuf = tconn->meta.sbuf;
836 msock.rbuf = tconn->meta.rbuf;
839 clear_bit(DISCARD_CONCURRENT, &tconn->flags);
841 /* Assume that the peer only understands protocol 80 until we know better. */
842 tconn->agreed_pro_version = 80;
848 /* 3 tries, this should take less than a second! */
849 s = drbd_try_connect(tconn);
852 /* give the other side time to call bind() & listen() */
853 schedule_timeout_interruptible(HZ / 10);
859 send_first_packet(tconn, &sock, P_INITIAL_DATA);
860 } else if (!msock.socket) {
862 send_first_packet(tconn, &msock, P_INITIAL_META);
864 conn_err(tconn, "Logic error in conn_connect()\n");
865 goto out_release_sockets;
869 if (sock.socket && msock.socket) {
871 nc = rcu_dereference(tconn->net_conf);
872 timeout = nc->ping_timeo * HZ / 10;
874 schedule_timeout_interruptible(timeout);
875 ok = drbd_socket_okay(&sock.socket);
876 ok = drbd_socket_okay(&msock.socket) && ok;
882 s = drbd_wait_for_connect(tconn);
884 try = receive_first_packet(tconn, s);
885 drbd_socket_okay(&sock.socket);
886 drbd_socket_okay(&msock.socket);
890 conn_warn(tconn, "initial packet S crossed\n");
891 sock_release(sock.socket);
897 conn_warn(tconn, "initial packet M crossed\n");
898 sock_release(msock.socket);
901 set_bit(DISCARD_CONCURRENT, &tconn->flags);
904 conn_warn(tconn, "Error receiving initial packet\n");
911 if (tconn->cstate <= C_DISCONNECTING)
912 goto out_release_sockets;
913 if (signal_pending(current)) {
914 flush_signals(current);
916 if (get_t_state(&tconn->receiver) == EXITING)
917 goto out_release_sockets;
920 if (sock.socket && &msock.socket) {
921 ok = drbd_socket_okay(&sock.socket);
922 ok = drbd_socket_okay(&msock.socket) && ok;
928 sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
929 msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
931 sock.socket->sk->sk_allocation = GFP_NOIO;
932 msock.socket->sk->sk_allocation = GFP_NOIO;
934 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
935 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
938 * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
939 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
940 * first set it to the P_CONNECTION_FEATURES timeout,
941 * which we set to 4x the configured ping_timeout. */
943 nc = rcu_dereference(tconn->net_conf);
945 sock.socket->sk->sk_sndtimeo =
946 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
948 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
949 timeout = nc->timeout * HZ / 10;
950 discard_my_data = nc->discard_my_data;
953 msock.socket->sk->sk_sndtimeo = timeout;
955 /* we don't want delays.
956 * we use TCP_CORK where appropriate, though */
957 drbd_tcp_nodelay(sock.socket);
958 drbd_tcp_nodelay(msock.socket);
960 tconn->data.socket = sock.socket;
961 tconn->meta.socket = msock.socket;
962 tconn->last_received = jiffies;
964 h = drbd_do_features(tconn);
968 if (tconn->cram_hmac_tfm) {
969 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
970 switch (drbd_do_auth(tconn)) {
972 conn_err(tconn, "Authentication of peer failed\n");
975 conn_err(tconn, "Authentication of peer failed, trying again.\n");
980 tconn->data.socket->sk->sk_sndtimeo = timeout;
981 tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
983 if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
986 set_bit(STATE_SENT, &tconn->flags);
989 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
990 kref_get(&mdev->kref);
994 set_bit(DISCARD_MY_DATA, &mdev->flags);
996 clear_bit(DISCARD_MY_DATA, &mdev->flags);
998 drbd_connected(mdev);
999 kref_put(&mdev->kref, &drbd_minor_destroy);
1004 rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1005 if (rv < SS_SUCCESS) {
1006 clear_bit(STATE_SENT, &tconn->flags);
1010 drbd_thread_start(&tconn->asender);
1012 mutex_lock(&tconn->conf_update);
1013 /* The discard_my_data flag is a single-shot modifier to the next
1014 * connection attempt, the handshake of which is now well underway.
1015 * No need for rcu style copying of the whole struct
1016 * just to clear a single value. */
1017 tconn->net_conf->discard_my_data = 0;
1018 mutex_unlock(&tconn->conf_update);
1022 out_release_sockets:
1024 sock_release(sock.socket);
1026 sock_release(msock.socket);
1030 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1032 unsigned int header_size = drbd_header_size(tconn);
1034 if (header_size == sizeof(struct p_header100) &&
1035 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1036 struct p_header100 *h = header;
1038 conn_err(tconn, "Header padding is not zero\n");
1041 pi->vnr = be16_to_cpu(h->volume);
1042 pi->cmd = be16_to_cpu(h->command);
1043 pi->size = be32_to_cpu(h->length);
1044 } else if (header_size == sizeof(struct p_header95) &&
1045 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1046 struct p_header95 *h = header;
1047 pi->cmd = be16_to_cpu(h->command);
1048 pi->size = be32_to_cpu(h->length);
1050 } else if (header_size == sizeof(struct p_header80) &&
1051 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1052 struct p_header80 *h = header;
1053 pi->cmd = be16_to_cpu(h->command);
1054 pi->size = be16_to_cpu(h->length);
1057 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1058 be32_to_cpu(*(__be32 *)header),
1059 tconn->agreed_pro_version);
1062 pi->data = header + header_size;
1066 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1068 void *buffer = tconn->data.rbuf;
1071 err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1075 err = decode_header(tconn, buffer, pi);
1076 tconn->last_received = jiffies;
1081 static void drbd_flush(struct drbd_tconn *tconn)
1084 struct drbd_conf *mdev;
1087 if (tconn->write_ordering >= WO_bdev_flush) {
1089 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1090 if (!get_ldev(mdev))
1092 kref_get(&mdev->kref);
1095 rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1098 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1099 /* would rather check on EOPNOTSUPP, but that is not reliable.
1100 * don't try again for ANY return value != 0
1101 * if (rv == -EOPNOTSUPP) */
1102 drbd_bump_write_ordering(tconn, WO_drain_io);
1105 kref_put(&mdev->kref, &drbd_minor_destroy);
1116 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1117 * @mdev: DRBD device.
1118 * @epoch: Epoch object.
1121 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1122 struct drbd_epoch *epoch,
1123 enum epoch_event ev)
1126 struct drbd_epoch *next_epoch;
1127 enum finish_epoch rv = FE_STILL_LIVE;
1129 spin_lock(&tconn->epoch_lock);
1133 epoch_size = atomic_read(&epoch->epoch_size);
1135 switch (ev & ~EV_CLEANUP) {
1137 atomic_dec(&epoch->active);
1139 case EV_GOT_BARRIER_NR:
1140 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1142 case EV_BECAME_LAST:
1147 if (epoch_size != 0 &&
1148 atomic_read(&epoch->active) == 0 &&
1149 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1150 if (!(ev & EV_CLEANUP)) {
1151 spin_unlock(&tconn->epoch_lock);
1152 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1153 spin_lock(&tconn->epoch_lock);
1156 /* FIXME: dec unacked on connection, once we have
1157 * something to count pending connection packets in. */
1158 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1159 dec_unacked(epoch->tconn);
1162 if (tconn->current_epoch != epoch) {
1163 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1164 list_del(&epoch->list);
1165 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1169 if (rv == FE_STILL_LIVE)
1173 atomic_set(&epoch->epoch_size, 0);
1174 /* atomic_set(&epoch->active, 0); is already zero */
1175 if (rv == FE_STILL_LIVE)
1186 spin_unlock(&tconn->epoch_lock);
1192 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1193 * @tconn: DRBD connection.
1194 * @wo: Write ordering method to try.
1196 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1198 struct disk_conf *dc;
1199 struct drbd_conf *mdev;
1200 enum write_ordering_e pwo;
1202 static char *write_ordering_str[] = {
1204 [WO_drain_io] = "drain",
1205 [WO_bdev_flush] = "flush",
1208 pwo = tconn->write_ordering;
1211 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1212 if (!get_ldev_if_state(mdev, D_ATTACHING))
1214 dc = rcu_dereference(mdev->ldev->disk_conf);
1216 if (wo == WO_bdev_flush && !dc->disk_flushes)
1218 if (wo == WO_drain_io && !dc->disk_drain)
1223 tconn->write_ordering = wo;
1224 if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1225 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1229 * drbd_submit_peer_request()
1230 * @mdev: DRBD device.
1231 * @peer_req: peer request
1232 * @rw: flag field, see bio->bi_rw
1234 * May spread the pages to multiple bios,
1235 * depending on bio_add_page restrictions.
1237 * Returns 0 if all bios have been submitted,
1238 * -ENOMEM if we could not allocate enough bios,
1239 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1240 * single page to an empty bio (which should never happen and likely indicates
1241 * that the lower level IO stack is in some way broken). This has been observed
1242 * on certain Xen deployments.
1244 /* TODO allocate from our own bio_set. */
1245 int drbd_submit_peer_request(struct drbd_conf *mdev,
1246 struct drbd_peer_request *peer_req,
1247 const unsigned rw, const int fault_type)
1249 struct bio *bios = NULL;
1251 struct page *page = peer_req->pages;
1252 sector_t sector = peer_req->i.sector;
1253 unsigned ds = peer_req->i.size;
1254 unsigned n_bios = 0;
1255 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1258 /* In most cases, we will only need one bio. But in case the lower
1259 * level restrictions happen to be different at this offset on this
1260 * side than those of the sending peer, we may need to submit the
1261 * request in more than one bio.
1263 * Plain bio_alloc is good enough here, this is no DRBD internally
1264 * generated bio, but a bio allocated on behalf of the peer.
1267 bio = bio_alloc(GFP_NOIO, nr_pages);
1269 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1272 /* > peer_req->i.sector, unless this is the first bio */
1273 bio->bi_sector = sector;
1274 bio->bi_bdev = mdev->ldev->backing_bdev;
1276 bio->bi_private = peer_req;
1277 bio->bi_end_io = drbd_peer_request_endio;
1279 bio->bi_next = bios;
1283 page_chain_for_each(page) {
1284 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1285 if (!bio_add_page(bio, page, len, 0)) {
1286 /* A single page must always be possible!
1287 * But in case it fails anyways,
1288 * we deal with it, and complain (below). */
1289 if (bio->bi_vcnt == 0) {
1291 "bio_add_page failed for len=%u, "
1292 "bi_vcnt=0 (bi_sector=%llu)\n",
1293 len, (unsigned long long)bio->bi_sector);
1303 D_ASSERT(page == NULL);
1306 atomic_set(&peer_req->pending_bios, n_bios);
1309 bios = bios->bi_next;
1310 bio->bi_next = NULL;
1312 drbd_generic_make_request(mdev, fault_type, bio);
1319 bios = bios->bi_next;
1325 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1326 struct drbd_peer_request *peer_req)
1328 struct drbd_interval *i = &peer_req->i;
1330 drbd_remove_interval(&mdev->write_requests, i);
1331 drbd_clear_interval(i);
1333 /* Wake up any processes waiting for this peer request to complete. */
1335 wake_up(&mdev->misc_wait);
1338 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1340 struct drbd_conf *mdev;
1344 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1345 kref_get(&mdev->kref);
1347 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1348 kref_put(&mdev->kref, &drbd_minor_destroy);
1354 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1357 struct p_barrier *p = pi->data;
1358 struct drbd_epoch *epoch;
1360 /* FIXME these are unacked on connection,
1361 * not a specific (peer)device.
1363 tconn->current_epoch->barrier_nr = p->barrier;
1364 tconn->current_epoch->tconn = tconn;
1365 rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1367 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1368 * the activity log, which means it would not be resynced in case the
1369 * R_PRIMARY crashes now.
1370 * Therefore we must send the barrier_ack after the barrier request was
1372 switch (tconn->write_ordering) {
1374 if (rv == FE_RECYCLED)
1377 /* receiver context, in the writeout path of the other node.
1378 * avoid potential distributed deadlock */
1379 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1383 conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1388 conn_wait_active_ee_empty(tconn);
1391 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1392 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1399 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1404 atomic_set(&epoch->epoch_size, 0);
1405 atomic_set(&epoch->active, 0);
1407 spin_lock(&tconn->epoch_lock);
1408 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1409 list_add(&epoch->list, &tconn->current_epoch->list);
1410 tconn->current_epoch = epoch;
1413 /* The current_epoch got recycled while we allocated this one... */
1416 spin_unlock(&tconn->epoch_lock);
1421 /* used from receive_RSDataReply (recv_resync_read)
1422 * and from receive_Data */
1423 static struct drbd_peer_request *
1424 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1425 int data_size) __must_hold(local)
1427 const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1428 struct drbd_peer_request *peer_req;
1431 void *dig_in = mdev->tconn->int_dig_in;
1432 void *dig_vv = mdev->tconn->int_dig_vv;
1433 unsigned long *data;
1436 if (mdev->tconn->peer_integrity_tfm) {
1437 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1439 * FIXME: Receive the incoming digest into the receive buffer
1440 * here, together with its struct p_data?
1442 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1448 if (!expect(data_size != 0))
1450 if (!expect(IS_ALIGNED(data_size, 512)))
1452 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1455 /* even though we trust out peer,
1456 * we sometimes have to double check. */
1457 if (sector + (data_size>>9) > capacity) {
1458 dev_err(DEV, "request from peer beyond end of local disk: "
1459 "capacity: %llus < sector: %llus + size: %u\n",
1460 (unsigned long long)capacity,
1461 (unsigned long long)sector, data_size);
1465 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1466 * "criss-cross" setup, that might cause write-out on some other DRBD,
1467 * which in turn might block on the other node at this very place. */
1468 peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1473 page = peer_req->pages;
1474 page_chain_for_each(page) {
1475 unsigned len = min_t(int, ds, PAGE_SIZE);
1477 err = drbd_recv_all_warn(mdev->tconn, data, len);
1478 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1479 dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1480 data[0] = data[0] ^ (unsigned long)-1;
1484 drbd_free_peer_req(mdev, peer_req);
1491 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1492 if (memcmp(dig_in, dig_vv, dgs)) {
1493 dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1494 (unsigned long long)sector, data_size);
1495 drbd_free_peer_req(mdev, peer_req);
1499 mdev->recv_cnt += data_size>>9;
1503 /* drbd_drain_block() just takes a data block
1504 * out of the socket input buffer, and discards it.
1506 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1515 page = drbd_alloc_pages(mdev, 1, 1);
1519 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1521 err = drbd_recv_all_warn(mdev->tconn, data, len);
1527 drbd_free_pages(mdev, page, 0);
1531 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1532 sector_t sector, int data_size)
1534 struct bio_vec *bvec;
1536 int dgs, err, i, expect;
1537 void *dig_in = mdev->tconn->int_dig_in;
1538 void *dig_vv = mdev->tconn->int_dig_vv;
1541 if (mdev->tconn->peer_integrity_tfm) {
1542 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1543 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1549 /* optimistically update recv_cnt. if receiving fails below,
1550 * we disconnect anyways, and counters will be reset. */
1551 mdev->recv_cnt += data_size>>9;
1553 bio = req->master_bio;
1554 D_ASSERT(sector == bio->bi_sector);
1556 bio_for_each_segment(bvec, bio, i) {
1557 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1558 expect = min_t(int, data_size, bvec->bv_len);
1559 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1560 kunmap(bvec->bv_page);
1563 data_size -= expect;
1567 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1568 if (memcmp(dig_in, dig_vv, dgs)) {
1569 dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1574 D_ASSERT(data_size == 0);
1579 * e_end_resync_block() is called in asender context via
1580 * drbd_finish_peer_reqs().
1582 static int e_end_resync_block(struct drbd_work *w, int unused)
1584 struct drbd_peer_request *peer_req =
1585 container_of(w, struct drbd_peer_request, w);
1586 struct drbd_conf *mdev = w->mdev;
1587 sector_t sector = peer_req->i.sector;
1590 D_ASSERT(drbd_interval_empty(&peer_req->i));
1592 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1593 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1594 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1596 /* Record failure to sync */
1597 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1599 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1606 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1608 struct drbd_peer_request *peer_req;
1610 peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1614 dec_rs_pending(mdev);
1617 /* corresponding dec_unacked() in e_end_resync_block()
1618 * respective _drbd_clear_done_ee */
1620 peer_req->w.cb = e_end_resync_block;
1622 spin_lock_irq(&mdev->tconn->req_lock);
1623 list_add(&peer_req->w.list, &mdev->sync_ee);
1624 spin_unlock_irq(&mdev->tconn->req_lock);
1626 atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1627 if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1630 /* don't care for the reason here */
1631 dev_err(DEV, "submit failed, triggering re-connect\n");
1632 spin_lock_irq(&mdev->tconn->req_lock);
1633 list_del(&peer_req->w.list);
1634 spin_unlock_irq(&mdev->tconn->req_lock);
1636 drbd_free_peer_req(mdev, peer_req);
1642 static struct drbd_request *
1643 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1644 sector_t sector, bool missing_ok, const char *func)
1646 struct drbd_request *req;
1648 /* Request object according to our peer */
1649 req = (struct drbd_request *)(unsigned long)id;
1650 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1653 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1654 (unsigned long)id, (unsigned long long)sector);
1659 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1661 struct drbd_conf *mdev;
1662 struct drbd_request *req;
1665 struct p_data *p = pi->data;
1667 mdev = vnr_to_mdev(tconn, pi->vnr);
1671 sector = be64_to_cpu(p->sector);
1673 spin_lock_irq(&mdev->tconn->req_lock);
1674 req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1675 spin_unlock_irq(&mdev->tconn->req_lock);
1679 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1680 * special casing it there for the various failure cases.
1681 * still no race with drbd_fail_pending_reads */
1682 err = recv_dless_read(mdev, req, sector, pi->size);
1684 req_mod(req, DATA_RECEIVED);
1685 /* else: nothing. handled from drbd_disconnect...
1686 * I don't think we may complete this just yet
1687 * in case we are "on-disconnect: freeze" */
1692 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1694 struct drbd_conf *mdev;
1697 struct p_data *p = pi->data;
1699 mdev = vnr_to_mdev(tconn, pi->vnr);
1703 sector = be64_to_cpu(p->sector);
1704 D_ASSERT(p->block_id == ID_SYNCER);
1706 if (get_ldev(mdev)) {
1707 /* data is submitted to disk within recv_resync_read.
1708 * corresponding put_ldev done below on error,
1709 * or in drbd_peer_request_endio. */
1710 err = recv_resync_read(mdev, sector, pi->size);
1712 if (__ratelimit(&drbd_ratelimit_state))
1713 dev_err(DEV, "Can not write resync data to local disk.\n");
1715 err = drbd_drain_block(mdev, pi->size);
1717 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1720 atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1725 static void restart_conflicting_writes(struct drbd_conf *mdev,
1726 sector_t sector, int size)
1728 struct drbd_interval *i;
1729 struct drbd_request *req;
1731 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1734 req = container_of(i, struct drbd_request, i);
1735 if (req->rq_state & RQ_LOCAL_PENDING ||
1736 !(req->rq_state & RQ_POSTPONED))
1738 /* as it is RQ_POSTPONED, this will cause it to
1739 * be queued on the retry workqueue. */
1740 __req_mod(req, DISCARD_WRITE, NULL);
1745 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1747 static int e_end_block(struct drbd_work *w, int cancel)
1749 struct drbd_peer_request *peer_req =
1750 container_of(w, struct drbd_peer_request, w);
1751 struct drbd_conf *mdev = w->mdev;
1752 sector_t sector = peer_req->i.sector;
1755 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1756 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1757 pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1758 mdev->state.conn <= C_PAUSED_SYNC_T &&
1759 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1760 P_RS_WRITE_ACK : P_WRITE_ACK;
1761 err = drbd_send_ack(mdev, pcmd, peer_req);
1762 if (pcmd == P_RS_WRITE_ACK)
1763 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1765 err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1766 /* we expect it to be marked out of sync anyways...
1767 * maybe assert this? */
1771 /* we delete from the conflict detection hash _after_ we sent out the
1772 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1773 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1774 spin_lock_irq(&mdev->tconn->req_lock);
1775 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1776 drbd_remove_epoch_entry_interval(mdev, peer_req);
1777 if (peer_req->flags & EE_RESTART_REQUESTS)
1778 restart_conflicting_writes(mdev, sector, peer_req->i.size);
1779 spin_unlock_irq(&mdev->tconn->req_lock);
1781 D_ASSERT(drbd_interval_empty(&peer_req->i));
1783 drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1788 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1790 struct drbd_conf *mdev = w->mdev;
1791 struct drbd_peer_request *peer_req =
1792 container_of(w, struct drbd_peer_request, w);
1795 err = drbd_send_ack(mdev, ack, peer_req);
1801 static int e_send_discard_write(struct drbd_work *w, int unused)
1803 return e_send_ack(w, P_DISCARD_WRITE);
1806 static int e_send_retry_write(struct drbd_work *w, int unused)
1808 struct drbd_tconn *tconn = w->mdev->tconn;
1810 return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1811 P_RETRY_WRITE : P_DISCARD_WRITE);
1814 static bool seq_greater(u32 a, u32 b)
1817 * We assume 32-bit wrap-around here.
1818 * For 24-bit wrap-around, we would have to shift:
1821 return (s32)a - (s32)b > 0;
1824 static u32 seq_max(u32 a, u32 b)
1826 return seq_greater(a, b) ? a : b;
1829 static bool need_peer_seq(struct drbd_conf *mdev)
1831 struct drbd_tconn *tconn = mdev->tconn;
1835 * We only need to keep track of the last packet_seq number of our peer
1836 * if we are in dual-primary mode and we have the discard flag set; see
1837 * handle_write_conflicts().
1841 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1844 return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1847 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1849 unsigned int newest_peer_seq;
1851 if (need_peer_seq(mdev)) {
1852 spin_lock(&mdev->peer_seq_lock);
1853 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1854 mdev->peer_seq = newest_peer_seq;
1855 spin_unlock(&mdev->peer_seq_lock);
1856 /* wake up only if we actually changed mdev->peer_seq */
1857 if (peer_seq == newest_peer_seq)
1858 wake_up(&mdev->seq_wait);
1862 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1864 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1867 /* maybe change sync_ee into interval trees as well? */
1868 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1870 struct drbd_peer_request *rs_req;
1873 spin_lock_irq(&mdev->tconn->req_lock);
1874 list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1875 if (overlaps(peer_req->i.sector, peer_req->i.size,
1876 rs_req->i.sector, rs_req->i.size)) {
1881 spin_unlock_irq(&mdev->tconn->req_lock);
1886 /* Called from receive_Data.
1887 * Synchronize packets on sock with packets on msock.
1889 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1890 * packet traveling on msock, they are still processed in the order they have
1893 * Note: we don't care for Ack packets overtaking P_DATA packets.
1895 * In case packet_seq is larger than mdev->peer_seq number, there are
1896 * outstanding packets on the msock. We wait for them to arrive.
1897 * In case we are the logically next packet, we update mdev->peer_seq
1898 * ourselves. Correctly handles 32bit wrap around.
1900 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1901 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1902 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1903 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1905 * returns 0 if we may process the packet,
1906 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1907 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1913 if (!need_peer_seq(mdev))
1916 spin_lock(&mdev->peer_seq_lock);
1918 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1919 mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1923 if (signal_pending(current)) {
1927 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1928 spin_unlock(&mdev->peer_seq_lock);
1930 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1932 timeout = schedule_timeout(timeout);
1933 spin_lock(&mdev->peer_seq_lock);
1936 dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1940 spin_unlock(&mdev->peer_seq_lock);
1941 finish_wait(&mdev->seq_wait, &wait);
1945 /* see also bio_flags_to_wire()
1946 * DRBD_REQ_*, because we need to semantically map the flags to data packet
1947 * flags and back. We may replicate to other kernel versions. */
1948 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1950 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1951 (dpf & DP_FUA ? REQ_FUA : 0) |
1952 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1953 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1956 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1959 struct drbd_interval *i;
1962 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963 struct drbd_request *req;
1964 struct bio_and_error m;
1968 req = container_of(i, struct drbd_request, i);
1969 if (!(req->rq_state & RQ_POSTPONED))
1971 req->rq_state &= ~RQ_POSTPONED;
1972 __req_mod(req, NEG_ACKED, &m);
1973 spin_unlock_irq(&mdev->tconn->req_lock);
1975 complete_master_bio(mdev, &m);
1976 spin_lock_irq(&mdev->tconn->req_lock);
1981 static int handle_write_conflicts(struct drbd_conf *mdev,
1982 struct drbd_peer_request *peer_req)
1984 struct drbd_tconn *tconn = mdev->tconn;
1985 bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1986 sector_t sector = peer_req->i.sector;
1987 const unsigned int size = peer_req->i.size;
1988 struct drbd_interval *i;
1993 * Inserting the peer request into the write_requests tree will prevent
1994 * new conflicting local requests from being added.
1996 drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1999 drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2000 if (i == &peer_req->i)
2005 * Our peer has sent a conflicting remote request; this
2006 * should not happen in a two-node setup. Wait for the
2007 * earlier peer request to complete.
2009 err = drbd_wait_misc(mdev, i);
2015 equal = i->sector == sector && i->size == size;
2016 if (resolve_conflicts) {
2018 * If the peer request is fully contained within the
2019 * overlapping request, it can be discarded; otherwise,
2020 * it will be retried once all overlapping requests
2023 bool discard = i->sector <= sector && i->sector +
2024 (i->size >> 9) >= sector + (size >> 9);
2027 dev_alert(DEV, "Concurrent writes detected: "
2028 "local=%llus +%u, remote=%llus +%u, "
2029 "assuming %s came first\n",
2030 (unsigned long long)i->sector, i->size,
2031 (unsigned long long)sector, size,
2032 discard ? "local" : "remote");
2035 peer_req->w.cb = discard ? e_send_discard_write :
2037 list_add_tail(&peer_req->w.list, &mdev->done_ee);
2038 wake_asender(mdev->tconn);
2043 struct drbd_request *req =
2044 container_of(i, struct drbd_request, i);
2047 dev_alert(DEV, "Concurrent writes detected: "
2048 "local=%llus +%u, remote=%llus +%u\n",
2049 (unsigned long long)i->sector, i->size,
2050 (unsigned long long)sector, size);
2052 if (req->rq_state & RQ_LOCAL_PENDING ||
2053 !(req->rq_state & RQ_POSTPONED)) {
2055 * Wait for the node with the discard flag to
2056 * decide if this request will be discarded or
2057 * retried. Requests that are discarded will
2058 * disappear from the write_requests tree.
2060 * In addition, wait for the conflicting
2061 * request to finish locally before submitting
2062 * the conflicting peer request.
2064 err = drbd_wait_misc(mdev, &req->i);
2066 _conn_request_state(mdev->tconn,
2067 NS(conn, C_TIMEOUT),
2069 fail_postponed_requests(mdev, sector, size);
2075 * Remember to restart the conflicting requests after
2076 * the new peer request has completed.
2078 peer_req->flags |= EE_RESTART_REQUESTS;
2085 drbd_remove_epoch_entry_interval(mdev, peer_req);
2089 /* mirrored write */
2090 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2092 struct drbd_conf *mdev;
2094 struct drbd_peer_request *peer_req;
2095 struct p_data *p = pi->data;
2096 u32 peer_seq = be32_to_cpu(p->seq_num);
2101 mdev = vnr_to_mdev(tconn, pi->vnr);
2105 if (!get_ldev(mdev)) {
2108 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2109 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2110 atomic_inc(&tconn->current_epoch->epoch_size);
2111 err2 = drbd_drain_block(mdev, pi->size);
2118 * Corresponding put_ldev done either below (on various errors), or in
2119 * drbd_peer_request_endio, if we successfully submit the data at the
2120 * end of this function.
2123 sector = be64_to_cpu(p->sector);
2124 peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2130 peer_req->w.cb = e_end_block;
2132 dp_flags = be32_to_cpu(p->dp_flags);
2133 rw |= wire_flags_to_bio(mdev, dp_flags);
2135 if (dp_flags & DP_MAY_SET_IN_SYNC)
2136 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2138 spin_lock(&tconn->epoch_lock);
2139 peer_req->epoch = tconn->current_epoch;
2140 atomic_inc(&peer_req->epoch->epoch_size);
2141 atomic_inc(&peer_req->epoch->active);
2142 spin_unlock(&tconn->epoch_lock);
2145 tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2148 peer_req->flags |= EE_IN_INTERVAL_TREE;
2149 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2151 goto out_interrupted;
2152 spin_lock_irq(&mdev->tconn->req_lock);
2153 err = handle_write_conflicts(mdev, peer_req);
2155 spin_unlock_irq(&mdev->tconn->req_lock);
2156 if (err == -ENOENT) {
2160 goto out_interrupted;
2163 spin_lock_irq(&mdev->tconn->req_lock);
2164 list_add(&peer_req->w.list, &mdev->active_ee);
2165 spin_unlock_irq(&mdev->tconn->req_lock);
2167 if (mdev->state.conn == C_SYNC_TARGET)
2168 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2170 if (mdev->tconn->agreed_pro_version < 100) {
2172 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2174 dp_flags |= DP_SEND_WRITE_ACK;
2177 dp_flags |= DP_SEND_RECEIVE_ACK;
2183 if (dp_flags & DP_SEND_WRITE_ACK) {
2184 peer_req->flags |= EE_SEND_WRITE_ACK;
2186 /* corresponding dec_unacked() in e_end_block()
2187 * respective _drbd_clear_done_ee */
2190 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2191 /* I really don't like it that the receiver thread
2192 * sends on the msock, but anyways */
2193 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2196 if (mdev->state.pdsk < D_INCONSISTENT) {
2197 /* In case we have the only disk of the cluster, */
2198 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2199 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2200 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2201 drbd_al_begin_io(mdev, &peer_req->i);
2204 err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2208 /* don't care for the reason here */
2209 dev_err(DEV, "submit failed, triggering re-connect\n");
2210 spin_lock_irq(&mdev->tconn->req_lock);
2211 list_del(&peer_req->w.list);
2212 drbd_remove_epoch_entry_interval(mdev, peer_req);
2213 spin_unlock_irq(&mdev->tconn->req_lock);
2214 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2215 drbd_al_complete_io(mdev, &peer_req->i);
2218 drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2220 drbd_free_peer_req(mdev, peer_req);
2224 /* We may throttle resync, if the lower device seems to be busy,
2225 * and current sync rate is above c_min_rate.
2227 * To decide whether or not the lower device is busy, we use a scheme similar
2228 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2229 * (more than 64 sectors) of activity we cannot account for with our own resync
2230 * activity, it obviously is "busy".
2232 * The current sync rate used here uses only the most recent two step marks,
2233 * to have a short time average so we can react faster.
2235 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2237 struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2238 unsigned long db, dt, dbdt;
2239 struct lc_element *tmp;
2242 unsigned int c_min_rate;
2245 c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2248 /* feature disabled? */
2249 if (c_min_rate == 0)
2252 spin_lock_irq(&mdev->al_lock);
2253 tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2255 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2256 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2257 spin_unlock_irq(&mdev->al_lock);
2260 /* Do not slow down if app IO is already waiting for this extent */
2262 spin_unlock_irq(&mdev->al_lock);
2264 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2265 (int)part_stat_read(&disk->part0, sectors[1]) -
2266 atomic_read(&mdev->rs_sect_ev);
2268 if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2269 unsigned long rs_left;
2272 mdev->rs_last_events = curr_events;
2274 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2276 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2278 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2279 rs_left = mdev->ov_left;
2281 rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2283 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2286 db = mdev->rs_mark_left[i] - rs_left;
2287 dbdt = Bit2KB(db/dt);
2289 if (dbdt > c_min_rate)
2296 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2298 struct drbd_conf *mdev;
2301 struct drbd_peer_request *peer_req;
2302 struct digest_info *di = NULL;
2304 unsigned int fault_type;
2305 struct p_block_req *p = pi->data;
2307 mdev = vnr_to_mdev(tconn, pi->vnr);
2310 capacity = drbd_get_capacity(mdev->this_bdev);
2312 sector = be64_to_cpu(p->sector);
2313 size = be32_to_cpu(p->blksize);
2315 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2316 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2317 (unsigned long long)sector, size);
2320 if (sector + (size>>9) > capacity) {
2321 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2322 (unsigned long long)sector, size);
2326 if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2329 case P_DATA_REQUEST:
2330 drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2332 case P_RS_DATA_REQUEST:
2333 case P_CSUM_RS_REQUEST:
2335 drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2339 dec_rs_pending(mdev);
2340 drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2345 if (verb && __ratelimit(&drbd_ratelimit_state))
2346 dev_err(DEV, "Can not satisfy peer's read request, "
2347 "no local data.\n");
2349 /* drain possibly payload */
2350 return drbd_drain_block(mdev, pi->size);
2353 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2354 * "criss-cross" setup, that might cause write-out on some other DRBD,
2355 * which in turn might block on the other node at this very place. */
2356 peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2363 case P_DATA_REQUEST:
2364 peer_req->w.cb = w_e_end_data_req;
2365 fault_type = DRBD_FAULT_DT_RD;
2366 /* application IO, don't drbd_rs_begin_io */
2369 case P_RS_DATA_REQUEST:
2370 peer_req->w.cb = w_e_end_rsdata_req;
2371 fault_type = DRBD_FAULT_RS_RD;
2372 /* used in the sector offset progress display */
2373 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2377 case P_CSUM_RS_REQUEST:
2378 fault_type = DRBD_FAULT_RS_RD;
2379 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2383 di->digest_size = pi->size;
2384 di->digest = (((char *)di)+sizeof(struct digest_info));
2386 peer_req->digest = di;
2387 peer_req->flags |= EE_HAS_DIGEST;
2389 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2392 if (pi->cmd == P_CSUM_RS_REQUEST) {
2393 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2394 peer_req->w.cb = w_e_end_csum_rs_req;
2395 /* used in the sector offset progress display */
2396 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2397 } else if (pi->cmd == P_OV_REPLY) {
2398 /* track progress, we may need to throttle */
2399 atomic_add(size >> 9, &mdev->rs_sect_in);
2400 peer_req->w.cb = w_e_end_ov_reply;
2401 dec_rs_pending(mdev);
2402 /* drbd_rs_begin_io done when we sent this request,
2403 * but accounting still needs to be done. */
2404 goto submit_for_resync;
2409 if (mdev->ov_start_sector == ~(sector_t)0 &&
2410 mdev->tconn->agreed_pro_version >= 90) {
2411 unsigned long now = jiffies;
2413 mdev->ov_start_sector = sector;
2414 mdev->ov_position = sector;
2415 mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2416 mdev->rs_total = mdev->ov_left;
2417 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2418 mdev->rs_mark_left[i] = mdev->ov_left;
2419 mdev->rs_mark_time[i] = now;
2421 dev_info(DEV, "Online Verify start sector: %llu\n",
2422 (unsigned long long)sector);
2424 peer_req->w.cb = w_e_end_ov_req;
2425 fault_type = DRBD_FAULT_RS_RD;
2432 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2433 * wrt the receiver, but it is not as straightforward as it may seem.
2434 * Various places in the resync start and stop logic assume resync
2435 * requests are processed in order, requeuing this on the worker thread
2436 * introduces a bunch of new code for synchronization between threads.
2438 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2439 * "forever", throttling after drbd_rs_begin_io will lock that extent
2440 * for application writes for the same time. For now, just throttle
2441 * here, where the rest of the code expects the receiver to sleep for
2445 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2446 * this defers syncer requests for some time, before letting at least
2447 * on request through. The resync controller on the receiving side
2448 * will adapt to the incoming rate accordingly.
2450 * We cannot throttle here if remote is Primary/SyncTarget:
2451 * we would also throttle its application reads.
2452 * In that case, throttling is done on the SyncTarget only.
2454 if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2455 schedule_timeout_uninterruptible(HZ/10);
2456 if (drbd_rs_begin_io(mdev, sector))
2460 atomic_add(size >> 9, &mdev->rs_sect_ev);
2464 spin_lock_irq(&mdev->tconn->req_lock);
2465 list_add_tail(&peer_req->w.list, &mdev->read_ee);
2466 spin_unlock_irq(&mdev->tconn->req_lock);
2468 if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2471 /* don't care for the reason here */
2472 dev_err(DEV, "submit failed, triggering re-connect\n");
2473 spin_lock_irq(&mdev->tconn->req_lock);
2474 list_del(&peer_req->w.list);
2475 spin_unlock_irq(&mdev->tconn->req_lock);
2476 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2480 drbd_free_peer_req(mdev, peer_req);
2484 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2486 int self, peer, rv = -100;
2487 unsigned long ch_self, ch_peer;
2488 enum drbd_after_sb_p after_sb_0p;
2490 self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2491 peer = mdev->p_uuid[UI_BITMAP] & 1;
2493 ch_peer = mdev->p_uuid[UI_SIZE];
2494 ch_self = mdev->comm_bm_set;
2497 after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2499 switch (after_sb_0p) {
2501 case ASB_DISCARD_SECONDARY:
2502 case ASB_CALL_HELPER:
2504 dev_err(DEV, "Configuration error.\n");
2506 case ASB_DISCONNECT:
2508 case ASB_DISCARD_YOUNGER_PRI:
2509 if (self == 0 && peer == 1) {
2513 if (self == 1 && peer == 0) {
2517 /* Else fall through to one of the other strategies... */
2518 case ASB_DISCARD_OLDER_PRI:
2519 if (self == 0 && peer == 1) {
2523 if (self == 1 && peer == 0) {
2527 /* Else fall through to one of the other strategies... */
2528 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2529 "Using discard-least-changes instead\n");
2530 case ASB_DISCARD_ZERO_CHG:
2531 if (ch_peer == 0 && ch_self == 0) {
2532 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2536 if (ch_peer == 0) { rv = 1; break; }
2537 if (ch_self == 0) { rv = -1; break; }
2539 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2541 case ASB_DISCARD_LEAST_CHG:
2542 if (ch_self < ch_peer)
2544 else if (ch_self > ch_peer)
2546 else /* ( ch_self == ch_peer ) */
2547 /* Well, then use something else. */
2548 rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2551 case ASB_DISCARD_LOCAL:
2554 case ASB_DISCARD_REMOTE:
2561 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2564 enum drbd_after_sb_p after_sb_1p;
2567 after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2569 switch (after_sb_1p) {
2570 case ASB_DISCARD_YOUNGER_PRI:
2571 case ASB_DISCARD_OLDER_PRI:
2572 case ASB_DISCARD_LEAST_CHG:
2573 case ASB_DISCARD_LOCAL:
2574 case ASB_DISCARD_REMOTE:
2575 case ASB_DISCARD_ZERO_CHG:
2576 dev_err(DEV, "Configuration error.\n");
2578 case ASB_DISCONNECT:
2581 hg = drbd_asb_recover_0p(mdev);
2582 if (hg == -1 && mdev->state.role == R_SECONDARY)
2584 if (hg == 1 && mdev->state.role == R_PRIMARY)
2588 rv = drbd_asb_recover_0p(mdev);
2590 case ASB_DISCARD_SECONDARY:
2591 return mdev->state.role == R_PRIMARY ? 1 : -1;
2592 case ASB_CALL_HELPER:
2593 hg = drbd_asb_recover_0p(mdev);
2594 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2595 enum drbd_state_rv rv2;
2597 drbd_set_role(mdev, R_SECONDARY, 0);
2598 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2599 * we might be here in C_WF_REPORT_PARAMS which is transient.
2600 * we do not need to wait for the after state change work either. */
2601 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2602 if (rv2 != SS_SUCCESS) {
2603 drbd_khelper(mdev, "pri-lost-after-sb");
2605 dev_warn(DEV, "Successfully gave up primary role.\n");
2615 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2618 enum drbd_after_sb_p after_sb_2p;
2621 after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2623 switch (after_sb_2p) {
2624 case ASB_DISCARD_YOUNGER_PRI:
2625 case ASB_DISCARD_OLDER_PRI:
2626 case ASB_DISCARD_LEAST_CHG:
2627 case ASB_DISCARD_LOCAL:
2628 case ASB_DISCARD_REMOTE:
2630 case ASB_DISCARD_SECONDARY:
2631 case ASB_DISCARD_ZERO_CHG:
2632 dev_err(DEV, "Configuration error.\n");
2635 rv = drbd_asb_recover_0p(mdev);
2637 case ASB_DISCONNECT:
2639 case ASB_CALL_HELPER:
2640 hg = drbd_asb_recover_0p(mdev);
2642 enum drbd_state_rv rv2;
2644 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2645 * we might be here in C_WF_REPORT_PARAMS which is transient.
2646 * we do not need to wait for the after state change work either. */
2647 rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2648 if (rv2 != SS_SUCCESS) {
2649 drbd_khelper(mdev, "pri-lost-after-sb");
2651 dev_warn(DEV, "Successfully gave up primary role.\n");
2661 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2662 u64 bits, u64 flags)
2665 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2668 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2670 (unsigned long long)uuid[UI_CURRENT],
2671 (unsigned long long)uuid[UI_BITMAP],
2672 (unsigned long long)uuid[UI_HISTORY_START],
2673 (unsigned long long)uuid[UI_HISTORY_END],
2674 (unsigned long long)bits,
2675 (unsigned long long)flags);
2679 100 after split brain try auto recover
2680 2 C_SYNC_SOURCE set BitMap
2681 1 C_SYNC_SOURCE use BitMap
2683 -1 C_SYNC_TARGET use BitMap
2684 -2 C_SYNC_TARGET set BitMap
2685 -100 after split brain, disconnect
2686 -1000 unrelated data
2687 -1091 requires proto 91
2688 -1096 requires proto 96
2690 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2695 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2696 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2699 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2703 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2704 peer != UUID_JUST_CREATED)
2708 if (self != UUID_JUST_CREATED &&
2709 (peer == UUID_JUST_CREATED || peer == (u64)0))
2713 int rct, dc; /* roles at crash time */
2715 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2717 if (mdev->tconn->agreed_pro_version < 91)
2720 if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2721 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2722 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2723 drbd_uuid_set_bm(mdev, 0UL);
2725 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2726 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2729 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2736 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2738 if (mdev->tconn->agreed_pro_version < 91)
2741 if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2742 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2743 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2745 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2746 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2747 mdev->p_uuid[UI_BITMAP] = 0UL;
2749 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2752 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2759 /* Common power [off|failure] */
2760 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2761 (mdev->p_uuid[UI_FLAGS] & 2);
2762 /* lowest bit is set when we were primary,
2763 * next bit (weight 2) is set when peer was primary */
2767 case 0: /* !self_pri && !peer_pri */ return 0;
2768 case 1: /* self_pri && !peer_pri */ return 1;
2769 case 2: /* !self_pri && peer_pri */ return -1;
2770 case 3: /* self_pri && peer_pri */
2771 dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2777 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2782 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2784 if (mdev->tconn->agreed_pro_version < 96 ?
2785 (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2786 (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2787 peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2788 /* The last P_SYNC_UUID did not get though. Undo the last start of
2789 resync as sync source modifications of the peer's UUIDs. */
2791 if (mdev->tconn->agreed_pro_version < 91)
2794 mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2795 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2797 dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2798 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2805 self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2806 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2807 peer = mdev->p_uuid[i] & ~((u64)1);
2813 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2814 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2819 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2821 if (mdev->tconn->agreed_pro_version < 96 ?
2822 (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2823 (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2824 self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2825 /* The last P_SYNC_UUID did not get though. Undo the last start of
2826 resync as sync source modifications of our UUIDs. */
2828 if (mdev->tconn->agreed_pro_version < 91)
2831 _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2832 _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2834 dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2835 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2836 mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2844 peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2845 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2846 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2852 self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2853 peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2854 if (self == peer && self != ((u64)0))
2858 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2859 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2860 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2861 peer = mdev->p_uuid[j] & ~((u64)1);
2870 /* drbd_sync_handshake() returns the new conn state on success, or
2871 CONN_MASK (-1) on failure.
2873 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2874 enum drbd_disk_state peer_disk) __must_hold(local)
2876 enum drbd_conns rv = C_MASK;
2877 enum drbd_disk_state mydisk;
2878 struct net_conf *nc;
2879 int hg, rule_nr, rr_conflict, tentative;
2881 mydisk = mdev->state.disk;
2882 if (mydisk == D_NEGOTIATING)
2883 mydisk = mdev->new_state_tmp.disk;
2885 dev_info(DEV, "drbd_sync_handshake:\n");
2886 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2887 drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2888 mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2890 hg = drbd_uuid_compare(mdev, &rule_nr);
2892 dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2895 dev_alert(DEV, "Unrelated data, aborting!\n");
2899 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2903 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2904 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
2905 int f = (hg == -100) || abs(hg) == 2;
2906 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2909 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2910 hg > 0 ? "source" : "target");
2914 drbd_khelper(mdev, "initial-split-brain");
2917 nc = rcu_dereference(mdev->tconn->net_conf);
2919 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2920 int pcount = (mdev->state.role == R_PRIMARY)
2921 + (peer_role == R_PRIMARY);
2922 int forced = (hg == -100);
2926 hg = drbd_asb_recover_0p(mdev);
2929 hg = drbd_asb_recover_1p(mdev);
2932 hg = drbd_asb_recover_2p(mdev);
2935 if (abs(hg) < 100) {
2936 dev_warn(DEV, "Split-Brain detected, %d primaries, "
2937 "automatically solved. Sync from %s node\n",
2938 pcount, (hg < 0) ? "peer" : "this");
2940 dev_warn(DEV, "Doing a full sync, since"
2941 " UUIDs where ambiguous.\n");
2948 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2950 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2954 dev_warn(DEV, "Split-Brain detected, manually solved. "
2955 "Sync from %s node\n",
2956 (hg < 0) ? "peer" : "this");
2958 rr_conflict = nc->rr_conflict;
2959 tentative = nc->tentative;
2963 /* FIXME this log message is not correct if we end up here
2964 * after an attempted attach on a diskless node.
2965 * We just refuse to attach -- well, we drop the "connection"
2966 * to that disk, in a way... */
2967 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2968 drbd_khelper(mdev, "split-brain");
2972 if (hg > 0 && mydisk <= D_INCONSISTENT) {
2973 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2977 if (hg < 0 && /* by intention we do not use mydisk here. */
2978 mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2979 switch (rr_conflict) {
2980 case ASB_CALL_HELPER:
2981 drbd_khelper(mdev, "pri-lost");
2983 case ASB_DISCONNECT:
2984 dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2987 dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2992 if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2994 dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2996 dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2997 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2998 abs(hg) >= 2 ? "full" : "bit-map based");
3003 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3004 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3005 BM_LOCKED_SET_ALLOWED))
3009 if (hg > 0) { /* become sync source. */
3011 } else if (hg < 0) { /* become sync target */
3015 if (drbd_bm_total_weight(mdev)) {
3016 dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3017 drbd_bm_total_weight(mdev));
3024 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3026 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3027 if (peer == ASB_DISCARD_REMOTE)
3028 return ASB_DISCARD_LOCAL;
3030 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3031 if (peer == ASB_DISCARD_LOCAL)
3032 return ASB_DISCARD_REMOTE;
3034 /* everything else is valid if they are equal on both sides. */
3038 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3040 struct p_protocol *p = pi->data;
3041 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3042 int p_proto, p_discard_my_data, p_two_primaries, cf;
3043 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3044 char integrity_alg[SHARED_SECRET_MAX] = "";
3045 struct crypto_hash *peer_integrity_tfm = NULL;
3046 void *int_dig_in = NULL, *int_dig_vv = NULL;
3048 p_proto = be32_to_cpu(p->protocol);
3049 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3050 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3051 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3052 p_two_primaries = be32_to_cpu(p->two_primaries);
3053 cf = be32_to_cpu(p->conn_flags);
3054 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3056 if (tconn->agreed_pro_version >= 87) {
3059 if (pi->size > sizeof(integrity_alg))
3061 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3064 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3067 if (pi->cmd != P_PROTOCOL_UPDATE) {
3068 clear_bit(CONN_DRY_RUN, &tconn->flags);
3070 if (cf & CF_DRY_RUN)
3071 set_bit(CONN_DRY_RUN, &tconn->flags);
3074 nc = rcu_dereference(tconn->net_conf);
3076 if (p_proto != nc->wire_protocol) {
3077 conn_err(tconn, "incompatible %s settings\n", "protocol");
3078 goto disconnect_rcu_unlock;
3081 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3082 conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3083 goto disconnect_rcu_unlock;
3086 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3087 conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3088 goto disconnect_rcu_unlock;
3091 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3092 conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3093 goto disconnect_rcu_unlock;
3096 if (p_discard_my_data && nc->discard_my_data) {
3097 conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3098 goto disconnect_rcu_unlock;
3101 if (p_two_primaries != nc->two_primaries) {
3102 conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3103 goto disconnect_rcu_unlock;
3106 if (strcmp(integrity_alg, nc->integrity_alg)) {
3107 conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3108 goto disconnect_rcu_unlock;
3114 if (integrity_alg[0]) {
3118 * We can only change the peer data integrity algorithm
3119 * here. Changing our own data integrity algorithm
3120 * requires that we send a P_PROTOCOL_UPDATE packet at
3121 * the same time; otherwise, the peer has no way to
3122 * tell between which packets the algorithm should
3126 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3127 if (!peer_integrity_tfm) {
3128 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3133 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3134 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3135 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3136 if (!(int_dig_in && int_dig_vv)) {
3137 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3142 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3143 if (!new_net_conf) {
3144 conn_err(tconn, "Allocation of new net_conf failed\n");
3148 mutex_lock(&tconn->data.mutex);
3149 mutex_lock(&tconn->conf_update);
3150 old_net_conf = tconn->net_conf;
3151 *new_net_conf = *old_net_conf;
3153 new_net_conf->wire_protocol = p_proto;
3154 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3155 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3156 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3157 new_net_conf->two_primaries = p_two_primaries;
3159 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3160 mutex_unlock(&tconn->conf_update);
3161 mutex_unlock(&tconn->data.mutex);
3163 crypto_free_hash(tconn->peer_integrity_tfm);
3164 kfree(tconn->int_dig_in);
3165 kfree(tconn->int_dig_vv);
3166 tconn->peer_integrity_tfm = peer_integrity_tfm;
3167 tconn->int_dig_in = int_dig_in;
3168 tconn->int_dig_vv = int_dig_vv;
3170 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3171 conn_info(tconn, "peer data-integrity-alg: %s\n",
3172 integrity_alg[0] ? integrity_alg : "(none)");
3175 kfree(old_net_conf);
3178 disconnect_rcu_unlock:
3181 crypto_free_hash(peer_integrity_tfm);
3184 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3189 * input: alg name, feature name
3190 * return: NULL (alg name was "")
3191 * ERR_PTR(error) if something goes wrong
3192 * or the crypto hash ptr, if it worked out ok. */
3193 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3194 const char *alg, const char *name)
3196 struct crypto_hash *tfm;
3201 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3203 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3204 alg, name, PTR_ERR(tfm));
3210 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3212 void *buffer = tconn->data.rbuf;
3213 int size = pi->size;
3216 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3217 s = drbd_recv(tconn, buffer, s);
3231 * config_unknown_volume - device configuration command for unknown volume
3233 * When a device is added to an existing connection, the node on which the
3234 * device is added first will send configuration commands to its peer but the
3235 * peer will not know about the device yet. It will warn and ignore these
3236 * commands. Once the device is added on the second node, the second node will
3237 * send the same device configuration commands, but in the other direction.
3239 * (We can also end up here if drbd is misconfigured.)
3241 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3243 conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3244 cmdname(pi->cmd), pi->vnr);
3245 return ignore_remaining_packet(tconn, pi);
3248 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3250 struct drbd_conf *mdev;
3251 struct p_rs_param_95 *p;
3252 unsigned int header_size, data_size, exp_max_sz;
3253 struct crypto_hash *verify_tfm = NULL;
3254 struct crypto_hash *csums_tfm = NULL;
3255 struct net_conf *old_net_conf, *new_net_conf = NULL;
3256 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3257 const int apv = tconn->agreed_pro_version;
3258 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3262 mdev = vnr_to_mdev(tconn, pi->vnr);
3264 return config_unknown_volume(tconn, pi);
3266 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3267 : apv == 88 ? sizeof(struct p_rs_param)
3269 : apv <= 94 ? sizeof(struct p_rs_param_89)
3270 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3272 if (pi->size > exp_max_sz) {
3273 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3274 pi->size, exp_max_sz);
3279 header_size = sizeof(struct p_rs_param);
3280 data_size = pi->size - header_size;
3281 } else if (apv <= 94) {
3282 header_size = sizeof(struct p_rs_param_89);
3283 data_size = pi->size - header_size;
3284 D_ASSERT(data_size == 0);
3286 header_size = sizeof(struct p_rs_param_95);
3287 data_size = pi->size - header_size;
3288 D_ASSERT(data_size == 0);
3291 /* initialize verify_alg and csums_alg */
3293 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3295 err = drbd_recv_all(mdev->tconn, p, header_size);
3299 mutex_lock(&mdev->tconn->conf_update);
3300 old_net_conf = mdev->tconn->net_conf;
3301 if (get_ldev(mdev)) {
3302 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3303 if (!new_disk_conf) {
3305 mutex_unlock(&mdev->tconn->conf_update);
3306 dev_err(DEV, "Allocation of new disk_conf failed\n");
3310 old_disk_conf = mdev->ldev->disk_conf;
3311 *new_disk_conf = *old_disk_conf;
3313 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3318 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3319 dev_err(DEV, "verify-alg of wrong size, "
3320 "peer wants %u, accepting only up to %u byte\n",
3321 data_size, SHARED_SECRET_MAX);
3326 err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3329 /* we expect NUL terminated string */
3330 /* but just in case someone tries to be evil */
3331 D_ASSERT(p->verify_alg[data_size-1] == 0);
3332 p->verify_alg[data_size-1] = 0;
3334 } else /* apv >= 89 */ {
3335 /* we still expect NUL terminated strings */
3336 /* but just in case someone tries to be evil */
3337 D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3338 D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3339 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3340 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3343 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3344 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3345 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3346 old_net_conf->verify_alg, p->verify_alg);
3349 verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3350 p->verify_alg, "verify-alg");
3351 if (IS_ERR(verify_tfm)) {
3357 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3358 if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3359 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3360 old_net_conf->csums_alg, p->csums_alg);
3363 csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3364 p->csums_alg, "csums-alg");
3365 if (IS_ERR(csums_tfm)) {
3371 if (apv > 94 && new_disk_conf) {
3372 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3373 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3374 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3375 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3377 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3378 if (fifo_size != mdev->rs_plan_s->size) {
3379 new_plan = fifo_alloc(fifo_size);
3381 dev_err(DEV, "kmalloc of fifo_buffer failed");
3388 if (verify_tfm || csums_tfm) {
3389 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3390 if (!new_net_conf) {
3391 dev_err(DEV, "Allocation of new net_conf failed\n");
3395 *new_net_conf = *old_net_conf;
3398 strcpy(new_net_conf->verify_alg, p->verify_alg);
3399 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3400 crypto_free_hash(mdev->tconn->verify_tfm);
3401 mdev->tconn->verify_tfm = verify_tfm;
3402 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3405 strcpy(new_net_conf->csums_alg, p->csums_alg);
3406 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3407 crypto_free_hash(mdev->tconn->csums_tfm);
3408 mdev->tconn->csums_tfm = csums_tfm;
3409 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3411 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3415 if (new_disk_conf) {
3416 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3421 old_plan = mdev->rs_plan_s;
3422 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3425 mutex_unlock(&mdev->tconn->conf_update);
3428 kfree(old_net_conf);
3429 kfree(old_disk_conf);
3435 if (new_disk_conf) {
3437 kfree(new_disk_conf);
3439 mutex_unlock(&mdev->tconn->conf_update);
3444 if (new_disk_conf) {
3446 kfree(new_disk_conf);
3448 mutex_unlock(&mdev->tconn->conf_update);
3449 /* just for completeness: actually not needed,
3450 * as this is not reached if csums_tfm was ok. */
3451 crypto_free_hash(csums_tfm);
3452 /* but free the verify_tfm again, if csums_tfm did not work out */
3453 crypto_free_hash(verify_tfm);
3454 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3458 /* warn if the arguments differ by more than 12.5% */
3459 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3460 const char *s, sector_t a, sector_t b)
3463 if (a == 0 || b == 0)
3465 d = (a > b) ? (a - b) : (b - a);
3466 if (d > (a>>3) || d > (b>>3))
3467 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3468 (unsigned long long)a, (unsigned long long)b);
3471 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3473 struct drbd_conf *mdev;
3474 struct p_sizes *p = pi->data;
3475 enum determine_dev_size dd = unchanged;
3476 sector_t p_size, p_usize, my_usize;
3477 int ldsc = 0; /* local disk size changed */
3478 enum dds_flags ddsf;
3480 mdev = vnr_to_mdev(tconn, pi->vnr);
3482 return config_unknown_volume(tconn, pi);
3484 p_size = be64_to_cpu(p->d_size);
3485 p_usize = be64_to_cpu(p->u_size);
3487 /* just store the peer's disk size for now.
3488 * we still need to figure out whether we accept that. */
3489 mdev->p_size = p_size;
3491 if (get_ldev(mdev)) {
3493 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3496 warn_if_differ_considerably(mdev, "lower level device sizes",
3497 p_size, drbd_get_max_capacity(mdev->ldev));
3498 warn_if_differ_considerably(mdev, "user requested size",
3501 /* if this is the first connect, or an otherwise expected
3502 * param exchange, choose the minimum */
3503 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3504 p_usize = min_not_zero(my_usize, p_usize);
3506 /* Never shrink a device with usable data during connect.
3507 But allow online shrinking if we are connected. */
3508 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3509 drbd_get_capacity(mdev->this_bdev) &&
3510 mdev->state.disk >= D_OUTDATED &&
3511 mdev->state.conn < C_CONNECTED) {
3512 dev_err(DEV, "The peer's disk size is too small!\n");
3513 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3518 if (my_usize != p_usize) {
3519 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3521 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3522 if (!new_disk_conf) {
3523 dev_err(DEV, "Allocation of new disk_conf failed\n");
3528 mutex_lock(&mdev->tconn->conf_update);
3529 old_disk_conf = mdev->ldev->disk_conf;
3530 *new_disk_conf = *old_disk_conf;
3531 new_disk_conf->disk_size = p_usize;
3533 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3534 mutex_unlock(&mdev->tconn->conf_update);
3536 kfree(old_disk_conf);
3538 dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3539 (unsigned long)my_usize);
3545 ddsf = be16_to_cpu(p->dds_flags);
3546 if (get_ldev(mdev)) {
3547 dd = drbd_determine_dev_size(mdev, ddsf);
3549 if (dd == dev_size_error)
3553 /* I am diskless, need to accept the peer's size. */
3554 drbd_set_my_capacity(mdev, p_size);
3557 mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3558 drbd_reconsider_max_bio_size(mdev);
3560 if (get_ldev(mdev)) {
3561 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3562 mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3569 if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3570 if (be64_to_cpu(p->c_size) !=
3571 drbd_get_capacity(mdev->this_bdev) || ldsc) {
3572 /* we have different sizes, probably peer
3573 * needs to know my new size... */
3574 drbd_send_sizes(mdev, 0, ddsf);
3576 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3577 (dd == grew && mdev->state.conn == C_CONNECTED)) {
3578 if (mdev->state.pdsk >= D_INCONSISTENT &&
3579 mdev->state.disk >= D_INCONSISTENT) {
3580 if (ddsf & DDSF_NO_RESYNC)
3581 dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3583 resync_after_online_grow(mdev);
3585 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3592 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3594 struct drbd_conf *mdev;
3595 struct p_uuids *p = pi->data;
3597 int i, updated_uuids = 0;
3599 mdev = vnr_to_mdev(tconn, pi->vnr);
3601 return config_unknown_volume(tconn, pi);
3603 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3605 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3606 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3608 kfree(mdev->p_uuid);
3609 mdev->p_uuid = p_uuid;
3611 if (mdev->state.conn < C_CONNECTED &&
3612 mdev->state.disk < D_INCONSISTENT &&
3613 mdev->state.role == R_PRIMARY &&
3614 (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3615 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3616 (unsigned long long)mdev->ed_uuid);
3617 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3621 if (get_ldev(mdev)) {
3622 int skip_initial_sync =
3623 mdev->state.conn == C_CONNECTED &&
3624 mdev->tconn->agreed_pro_version >= 90 &&
3625 mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3626 (p_uuid[UI_FLAGS] & 8);
3627 if (skip_initial_sync) {
3628 dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3629 drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3630 "clear_n_write from receive_uuids",
3631 BM_LOCKED_TEST_ALLOWED);
3632 _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3633 _drbd_uuid_set(mdev, UI_BITMAP, 0);
3634 _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3640 } else if (mdev->state.disk < D_INCONSISTENT &&
3641 mdev->state.role == R_PRIMARY) {
3642 /* I am a diskless primary, the peer just created a new current UUID
3644 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3647 /* Before we test for the disk state, we should wait until an eventually
3648 ongoing cluster wide state change is finished. That is important if
3649 we are primary and are detaching from our disk. We need to see the
3650 new disk state... */
3651 mutex_lock(mdev->state_mutex);
3652 mutex_unlock(mdev->state_mutex);
3653 if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3654 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3657 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3663 * convert_state() - Converts the peer's view of the cluster state to our point of view
3664 * @ps: The state as seen by the peer.
3666 static union drbd_state convert_state(union drbd_state ps)
3668 union drbd_state ms;
3670 static enum drbd_conns c_tab[] = {
3671 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3672 [C_CONNECTED] = C_CONNECTED,
3674 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3675 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3676 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3677 [C_VERIFY_S] = C_VERIFY_T,
3683 ms.conn = c_tab[ps.conn];
3688 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3693 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3695 struct drbd_conf *mdev;
3696 struct p_req_state *p = pi->data;
3697 union drbd_state mask, val;
3698 enum drbd_state_rv rv;
3700 mdev = vnr_to_mdev(tconn, pi->vnr);
3704 mask.i = be32_to_cpu(p->mask);
3705 val.i = be32_to_cpu(p->val);
3707 if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3708 mutex_is_locked(mdev->state_mutex)) {
3709 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3713 mask = convert_state(mask);
3714 val = convert_state(val);
3716 rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3717 drbd_send_sr_reply(mdev, rv);
3724 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3726 struct p_req_state *p = pi->data;
3727 union drbd_state mask, val;
3728 enum drbd_state_rv rv;
3730 mask.i = be32_to_cpu(p->mask);
3731 val.i = be32_to_cpu(p->val);
3733 if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3734 mutex_is_locked(&tconn->cstate_mutex)) {
3735 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3739 mask = convert_state(mask);
3740 val = convert_state(val);
3742 rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3743 conn_send_sr_reply(tconn, rv);
3748 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3750 struct drbd_conf *mdev;
3751 struct p_state *p = pi->data;
3752 union drbd_state os, ns, peer_state;
3753 enum drbd_disk_state real_peer_disk;
3754 enum chg_state_flags cs_flags;
3757 mdev = vnr_to_mdev(tconn, pi->vnr);
3759 return config_unknown_volume(tconn, pi);
3761 peer_state.i = be32_to_cpu(p->state);
3763 real_peer_disk = peer_state.disk;
3764 if (peer_state.disk == D_NEGOTIATING) {
3765 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3766 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3769 spin_lock_irq(&mdev->tconn->req_lock);
3771 os = ns = drbd_read_state(mdev);
3772 spin_unlock_irq(&mdev->tconn->req_lock);
3774 /* If some other part of the code (asender thread, timeout)
3775 * already decided to close the connection again,
3776 * we must not "re-establish" it here. */
3777 if (os.conn <= C_TEAR_DOWN)
3780 /* If this is the "end of sync" confirmation, usually the peer disk
3781 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3782 * set) resync started in PausedSyncT, or if the timing of pause-/
3783 * unpause-sync events has been "just right", the peer disk may
3784 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3786 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3787 real_peer_disk == D_UP_TO_DATE &&
3788 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3789 /* If we are (becoming) SyncSource, but peer is still in sync
3790 * preparation, ignore its uptodate-ness to avoid flapping, it
3791 * will change to inconsistent once the peer reaches active
3793 * It may have changed syncer-paused flags, however, so we
3794 * cannot ignore this completely. */
3795 if (peer_state.conn > C_CONNECTED &&
3796 peer_state.conn < C_SYNC_SOURCE)
3797 real_peer_disk = D_INCONSISTENT;
3799 /* if peer_state changes to connected at the same time,
3800 * it explicitly notifies us that it finished resync.
3801 * Maybe we should finish it up, too? */
3802 else if (os.conn >= C_SYNC_SOURCE &&
3803 peer_state.conn == C_CONNECTED) {
3804 if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3805 drbd_resync_finished(mdev);
3810 /* peer says his disk is inconsistent, while we think it is uptodate,
3811 * and this happens while the peer still thinks we have a sync going on,
3812 * but we think we are already done with the sync.
3813 * We ignore this to avoid flapping pdsk.
3814 * This should not happen, if the peer is a recent version of drbd. */
3815 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3816 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3817 real_peer_disk = D_UP_TO_DATE;
3819 if (ns.conn == C_WF_REPORT_PARAMS)
3820 ns.conn = C_CONNECTED;
3822 if (peer_state.conn == C_AHEAD)
3825 if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3826 get_ldev_if_state(mdev, D_NEGOTIATING)) {
3827 int cr; /* consider resync */
3829 /* if we established a new connection */
3830 cr = (os.conn < C_CONNECTED);
3831 /* if we had an established connection
3832 * and one of the nodes newly attaches a disk */
3833 cr |= (os.conn == C_CONNECTED &&
3834 (peer_state.disk == D_NEGOTIATING ||
3835 os.disk == D_NEGOTIATING));
3836 /* if we have both been inconsistent, and the peer has been
3837 * forced to be UpToDate with --overwrite-data */
3838 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3839 /* if we had been plain connected, and the admin requested to
3840 * start a sync by "invalidate" or "invalidate-remote" */
3841 cr |= (os.conn == C_CONNECTED &&
3842 (peer_state.conn >= C_STARTING_SYNC_S &&
3843 peer_state.conn <= C_WF_BITMAP_T));
3846 ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3849 if (ns.conn == C_MASK) {
3850 ns.conn = C_CONNECTED;
3851 if (mdev->state.disk == D_NEGOTIATING) {
3852 drbd_force_state(mdev, NS(disk, D_FAILED));
3853 } else if (peer_state.disk == D_NEGOTIATING) {
3854 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3855 peer_state.disk = D_DISKLESS;
3856 real_peer_disk = D_DISKLESS;
3858 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3860 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3861 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3867 spin_lock_irq(&mdev->tconn->req_lock);
3868 if (os.i != drbd_read_state(mdev).i)
3870 clear_bit(CONSIDER_RESYNC, &mdev->flags);
3871 ns.peer = peer_state.role;
3872 ns.pdsk = real_peer_disk;
3873 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3874 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3875 ns.disk = mdev->new_state_tmp.disk;
3876 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3877 if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3878 test_bit(NEW_CUR_UUID, &mdev->flags)) {
3879 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3880 for temporal network outages! */
3881 spin_unlock_irq(&mdev->tconn->req_lock);
3882 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3883 tl_clear(mdev->tconn);
3884 drbd_uuid_new_current(mdev);
3885 clear_bit(NEW_CUR_UUID, &mdev->flags);
3886 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3889 rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3890 ns = drbd_read_state(mdev);
3891 spin_unlock_irq(&mdev->tconn->req_lock);
3893 if (rv < SS_SUCCESS) {
3894 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3898 if (os.conn > C_WF_REPORT_PARAMS) {
3899 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3900 peer_state.disk != D_NEGOTIATING ) {
3901 /* we want resync, peer has not yet decided to sync... */
3902 /* Nowadays only used when forcing a node into primary role and
3903 setting its disk to UpToDate with that */
3904 drbd_send_uuids(mdev);
3905 drbd_send_current_state(mdev);
3909 clear_bit(DISCARD_MY_DATA, &mdev->flags);
3911 drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3916 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3918 struct drbd_conf *mdev;
3919 struct p_rs_uuid *p = pi->data;
3921 mdev = vnr_to_mdev(tconn, pi->vnr);
3925 wait_event(mdev->misc_wait,
3926 mdev->state.conn == C_WF_SYNC_UUID ||
3927 mdev->state.conn == C_BEHIND ||
3928 mdev->state.conn < C_CONNECTED ||
3929 mdev->state.disk < D_NEGOTIATING);
3931 /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3933 /* Here the _drbd_uuid_ functions are right, current should
3934 _not_ be rotated into the history */
3935 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3936 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3937 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3939 drbd_print_uuids(mdev, "updated sync uuid");
3940 drbd_start_resync(mdev, C_SYNC_TARGET);
3944 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3950 * receive_bitmap_plain
3952 * Return 0 when done, 1 when another iteration is needed, and a negative error
3953 * code upon failure.
3956 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3957 unsigned long *p, struct bm_xfer_ctx *c)
3959 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3960 drbd_header_size(mdev->tconn);
3961 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3962 c->bm_words - c->word_offset);
3963 unsigned int want = num_words * sizeof(*p);
3967 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3972 err = drbd_recv_all(mdev->tconn, p, want);
3976 drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3978 c->word_offset += num_words;
3979 c->bit_offset = c->word_offset * BITS_PER_LONG;
3980 if (c->bit_offset > c->bm_bits)
3981 c->bit_offset = c->bm_bits;
3986 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3988 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3991 static int dcbp_get_start(struct p_compressed_bm *p)
3993 return (p->encoding & 0x80) != 0;
3996 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3998 return (p->encoding >> 4) & 0x7;
4004 * Return 0 when done, 1 when another iteration is needed, and a negative error
4005 * code upon failure.
4008 recv_bm_rle_bits(struct drbd_conf *mdev,
4009 struct p_compressed_bm *p,
4010 struct bm_xfer_ctx *c,
4013 struct bitstream bs;
4017 unsigned long s = c->bit_offset;
4019 int toggle = dcbp_get_start(p);
4023 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4025 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4029 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4030 bits = vli_decode_bits(&rl, look_ahead);
4036 if (e >= c->bm_bits) {
4037 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4040 _drbd_bm_set_bits(mdev, s, e);
4044 dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4045 have, bits, look_ahead,
4046 (unsigned int)(bs.cur.b - p->code),
4047 (unsigned int)bs.buf_len);
4050 look_ahead >>= bits;
4053 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4056 look_ahead |= tmp << have;
4061 bm_xfer_ctx_bit_to_word_offset(c);
4063 return (s != c->bm_bits);
4069 * Return 0 when done, 1 when another iteration is needed, and a negative error
4070 * code upon failure.
4073 decode_bitmap_c(struct drbd_conf *mdev,
4074 struct p_compressed_bm *p,
4075 struct bm_xfer_ctx *c,
4078 if (dcbp_get_code(p) == RLE_VLI_Bits)
4079 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4081 /* other variants had been implemented for evaluation,
4082 * but have been dropped as this one turned out to be "best"
4083 * during all our tests. */
4085 dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4086 conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4090 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4091 const char *direction, struct bm_xfer_ctx *c)
4093 /* what would it take to transfer it "plaintext" */
4094 unsigned int header_size = drbd_header_size(mdev->tconn);
4095 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4096 unsigned int plain =
4097 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4098 c->bm_words * sizeof(unsigned long);
4099 unsigned int total = c->bytes[0] + c->bytes[1];
4102 /* total can not be zero. but just in case: */
4106 /* don't report if not compressed */
4110 /* total < plain. check for overflow, still */
4111 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4112 : (1000 * total / plain);
4118 dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4119 "total %u; compression: %u.%u%%\n",
4121 c->bytes[1], c->packets[1],
4122 c->bytes[0], c->packets[0],
4123 total, r/10, r % 10);
4126 /* Since we are processing the bitfield from lower addresses to higher,
4127 it does not matter if the process it in 32 bit chunks or 64 bit
4128 chunks as long as it is little endian. (Understand it as byte stream,
4129 beginning with the lowest byte...) If we would use big endian
4130 we would need to process it from the highest address to the lowest,
4131 in order to be agnostic to the 32 vs 64 bits issue.
4133 returns 0 on failure, 1 if we successfully received it. */
4134 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4136 struct drbd_conf *mdev;
4137 struct bm_xfer_ctx c;
4140 mdev = vnr_to_mdev(tconn, pi->vnr);
4144 drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4145 /* you are supposed to send additional out-of-sync information
4146 * if you actually set bits during this phase */
4148 c = (struct bm_xfer_ctx) {
4149 .bm_bits = drbd_bm_bits(mdev),
4150 .bm_words = drbd_bm_words(mdev),
4154 if (pi->cmd == P_BITMAP)
4155 err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4156 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4157 /* MAYBE: sanity check that we speak proto >= 90,
4158 * and the feature is enabled! */
4159 struct p_compressed_bm *p = pi->data;
4161 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4162 dev_err(DEV, "ReportCBitmap packet too large\n");
4166 if (pi->size <= sizeof(*p)) {
4167 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4171 err = drbd_recv_all(mdev->tconn, p, pi->size);
4174 err = decode_bitmap_c(mdev, p, &c, pi->size);
4176 dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4181 c.packets[pi->cmd == P_BITMAP]++;
4182 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4189 err = drbd_recv_header(mdev->tconn, pi);
4194 INFO_bm_xfer_stats(mdev, "receive", &c);
4196 if (mdev->state.conn == C_WF_BITMAP_T) {
4197 enum drbd_state_rv rv;
4199 err = drbd_send_bitmap(mdev);
4202 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4203 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4204 D_ASSERT(rv == SS_SUCCESS);
4205 } else if (mdev->state.conn != C_WF_BITMAP_S) {
4206 /* admin may have requested C_DISCONNECTING,
4207 * other threads may have noticed network errors */
4208 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4209 drbd_conn_str(mdev->state.conn));
4214 drbd_bm_unlock(mdev);
4215 if (!err && mdev->state.conn == C_WF_BITMAP_S)
4216 drbd_start_resync(mdev, C_SYNC_SOURCE);
4220 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4222 conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4225 return ignore_remaining_packet(tconn, pi);
4228 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4230 /* Make sure we've acked all the TCP data associated
4231 * with the data requests being unplugged */
4232 drbd_tcp_quickack(tconn->data.socket);
4237 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4239 struct drbd_conf *mdev;
4240 struct p_block_desc *p = pi->data;
4242 mdev = vnr_to_mdev(tconn, pi->vnr);
4246 switch (mdev->state.conn) {
4247 case C_WF_SYNC_UUID:
4252 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4253 drbd_conn_str(mdev->state.conn));
4256 drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4264 int (*fn)(struct drbd_tconn *, struct packet_info *);
4267 static struct data_cmd drbd_cmd_handler[] = {
4268 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4269 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4270 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4271 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4272 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4273 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4274 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4275 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4276 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4277 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4278 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4279 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4280 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4281 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4282 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4283 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4284 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4285 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4286 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4287 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4288 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4289 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4290 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4291 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4294 static void drbdd(struct drbd_tconn *tconn)
4296 struct packet_info pi;
4297 size_t shs; /* sub header size */
4300 while (get_t_state(&tconn->receiver) == RUNNING) {
4301 struct data_cmd *cmd;
4303 drbd_thread_current_set_cpu(&tconn->receiver);
4304 if (drbd_recv_header(tconn, &pi))
4307 cmd = &drbd_cmd_handler[pi.cmd];
4308 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4309 conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4310 cmdname(pi.cmd), pi.cmd);
4314 shs = cmd->pkt_size;
4315 if (pi.size > shs && !cmd->expect_payload) {
4316 conn_err(tconn, "No payload expected %s l:%d\n",
4317 cmdname(pi.cmd), pi.size);
4322 err = drbd_recv_all_warn(tconn, pi.data, shs);
4328 err = cmd->fn(tconn, &pi);
4330 conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4331 cmdname(pi.cmd), err, pi.size);
4338 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4341 void conn_flush_workqueue(struct drbd_tconn *tconn)
4343 struct drbd_wq_barrier barr;
4345 barr.w.cb = w_prev_work_done;
4346 barr.w.tconn = tconn;
4347 init_completion(&barr.done);
4348 drbd_queue_work(&tconn->data.work, &barr.w);
4349 wait_for_completion(&barr.done);
4352 static void conn_disconnect(struct drbd_tconn *tconn)
4354 struct drbd_conf *mdev;
4358 if (tconn->cstate == C_STANDALONE)
4361 /* We are about to start the cleanup after connection loss.
4362 * Make sure drbd_make_request knows about that.
4363 * Usually we should be in some network failure state already,
4364 * but just in case we are not, we fix it up here.
4366 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4368 /* asender does not clean up anything. it must not interfere, either */
4369 drbd_thread_stop(&tconn->asender);
4370 drbd_free_sock(tconn);
4373 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4374 kref_get(&mdev->kref);
4376 drbd_disconnected(mdev);
4377 kref_put(&mdev->kref, &drbd_minor_destroy);
4382 if (!list_empty(&tconn->current_epoch->list))
4383 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4384 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4385 atomic_set(&tconn->current_epoch->epoch_size, 0);
4387 conn_info(tconn, "Connection closed\n");
4389 if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4390 conn_try_outdate_peer_async(tconn);
4392 spin_lock_irq(&tconn->req_lock);
4394 if (oc >= C_UNCONNECTED)
4395 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4397 spin_unlock_irq(&tconn->req_lock);
4399 if (oc == C_DISCONNECTING)
4400 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4403 static int drbd_disconnected(struct drbd_conf *mdev)
4407 /* wait for current activity to cease. */
4408 spin_lock_irq(&mdev->tconn->req_lock);
4409 _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4410 _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4411 _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4412 spin_unlock_irq(&mdev->tconn->req_lock);
4414 /* We do not have data structures that would allow us to
4415 * get the rs_pending_cnt down to 0 again.
4416 * * On C_SYNC_TARGET we do not have any data structures describing
4417 * the pending RSDataRequest's we have sent.
4418 * * On C_SYNC_SOURCE there is no data structure that tracks
4419 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4420 * And no, it is not the sum of the reference counts in the
4421 * resync_LRU. The resync_LRU tracks the whole operation including
4422 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4424 drbd_rs_cancel_all(mdev);
4426 mdev->rs_failed = 0;
4427 atomic_set(&mdev->rs_pending_cnt, 0);
4428 wake_up(&mdev->misc_wait);
4430 del_timer_sync(&mdev->resync_timer);
4431 resync_timer_fn((unsigned long)mdev);
4433 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4434 * w_make_resync_request etc. which may still be on the worker queue
4435 * to be "canceled" */
4436 drbd_flush_workqueue(mdev);
4438 drbd_finish_peer_reqs(mdev);
4440 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4441 might have issued a work again. The one before drbd_finish_peer_reqs() is
4442 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4443 drbd_flush_workqueue(mdev);
4445 kfree(mdev->p_uuid);
4446 mdev->p_uuid = NULL;
4448 if (!drbd_suspended(mdev))
4449 tl_clear(mdev->tconn);
4453 /* serialize with bitmap writeout triggered by the state change,
4455 wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4457 /* tcp_close and release of sendpage pages can be deferred. I don't
4458 * want to use SO_LINGER, because apparently it can be deferred for
4459 * more than 20 seconds (longest time I checked).
4461 * Actually we don't care for exactly when the network stack does its
4462 * put_page(), but release our reference on these pages right here.
4464 i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4466 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4467 i = atomic_read(&mdev->pp_in_use_by_net);
4469 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4470 i = atomic_read(&mdev->pp_in_use);
4472 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4474 D_ASSERT(list_empty(&mdev->read_ee));
4475 D_ASSERT(list_empty(&mdev->active_ee));
4476 D_ASSERT(list_empty(&mdev->sync_ee));
4477 D_ASSERT(list_empty(&mdev->done_ee));
4483 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4484 * we can agree on is stored in agreed_pro_version.
4486 * feature flags and the reserved array should be enough room for future
4487 * enhancements of the handshake protocol, and possible plugins...
4489 * for now, they are expected to be zero, but ignored.
4491 static int drbd_send_features(struct drbd_tconn *tconn)
4493 struct drbd_socket *sock;
4494 struct p_connection_features *p;
4496 sock = &tconn->data;
4497 p = conn_prepare_command(tconn, sock);
4500 memset(p, 0, sizeof(*p));
4501 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4502 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4503 return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4508 * 1 yes, we have a valid connection
4509 * 0 oops, did not work out, please try again
4510 * -1 peer talks different language,
4511 * no point in trying again, please go standalone.
4513 static int drbd_do_features(struct drbd_tconn *tconn)
4515 /* ASSERT current == tconn->receiver ... */
4516 struct p_connection_features *p;
4517 const int expect = sizeof(struct p_connection_features);
4518 struct packet_info pi;
4521 err = drbd_send_features(tconn);
4525 err = drbd_recv_header(tconn, &pi);
4529 if (pi.cmd != P_CONNECTION_FEATURES) {
4530 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4531 cmdname(pi.cmd), pi.cmd);
4535 if (pi.size != expect) {
4536 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4542 err = drbd_recv_all_warn(tconn, p, expect);
4546 p->protocol_min = be32_to_cpu(p->protocol_min);
4547 p->protocol_max = be32_to_cpu(p->protocol_max);
4548 if (p->protocol_max == 0)
4549 p->protocol_max = p->protocol_min;
4551 if (PRO_VERSION_MAX < p->protocol_min ||
4552 PRO_VERSION_MIN > p->protocol_max)
4555 tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4557 conn_info(tconn, "Handshake successful: "
4558 "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4563 conn_err(tconn, "incompatible DRBD dialects: "
4564 "I support %d-%d, peer supports %d-%d\n",
4565 PRO_VERSION_MIN, PRO_VERSION_MAX,
4566 p->protocol_min, p->protocol_max);
4570 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4571 static int drbd_do_auth(struct drbd_tconn *tconn)
4573 dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4574 dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4578 #define CHALLENGE_LEN 64
4582 0 - failed, try again (network error),
4583 -1 - auth failed, don't try again.
4586 static int drbd_do_auth(struct drbd_tconn *tconn)
4588 struct drbd_socket *sock;
4589 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4590 struct scatterlist sg;
4591 char *response = NULL;
4592 char *right_response = NULL;
4593 char *peers_ch = NULL;
4594 unsigned int key_len;
4595 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4596 unsigned int resp_size;
4597 struct hash_desc desc;
4598 struct packet_info pi;
4599 struct net_conf *nc;
4602 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4605 nc = rcu_dereference(tconn->net_conf);
4606 key_len = strlen(nc->shared_secret);
4607 memcpy(secret, nc->shared_secret, key_len);
4610 desc.tfm = tconn->cram_hmac_tfm;
4613 rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4615 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4620 get_random_bytes(my_challenge, CHALLENGE_LEN);
4622 sock = &tconn->data;
4623 if (!conn_prepare_command(tconn, sock)) {
4627 rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4628 my_challenge, CHALLENGE_LEN);
4632 err = drbd_recv_header(tconn, &pi);
4638 if (pi.cmd != P_AUTH_CHALLENGE) {
4639 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4640 cmdname(pi.cmd), pi.cmd);
4645 if (pi.size > CHALLENGE_LEN * 2) {
4646 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4651 peers_ch = kmalloc(pi.size, GFP_NOIO);
4652 if (peers_ch == NULL) {
4653 conn_err(tconn, "kmalloc of peers_ch failed\n");
4658 err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4664 resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4665 response = kmalloc(resp_size, GFP_NOIO);
4666 if (response == NULL) {
4667 conn_err(tconn, "kmalloc of response failed\n");
4672 sg_init_table(&sg, 1);
4673 sg_set_buf(&sg, peers_ch, pi.size);
4675 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4677 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4682 if (!conn_prepare_command(tconn, sock)) {
4686 rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4687 response, resp_size);
4691 err = drbd_recv_header(tconn, &pi);
4697 if (pi.cmd != P_AUTH_RESPONSE) {
4698 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4699 cmdname(pi.cmd), pi.cmd);
4704 if (pi.size != resp_size) {
4705 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4710 err = drbd_recv_all_warn(tconn, response , resp_size);
4716 right_response = kmalloc(resp_size, GFP_NOIO);
4717 if (right_response == NULL) {
4718 conn_err(tconn, "kmalloc of right_response failed\n");
4723 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4725 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4727 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4732 rv = !memcmp(response, right_response, resp_size);
4735 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4743 kfree(right_response);
4749 int drbdd_init(struct drbd_thread *thi)
4751 struct drbd_tconn *tconn = thi->tconn;
4754 conn_info(tconn, "receiver (re)started\n");
4757 h = conn_connect(tconn);
4759 conn_disconnect(tconn);
4760 schedule_timeout_interruptible(HZ);
4763 conn_warn(tconn, "Discarding network configuration.\n");
4764 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4771 conn_disconnect(tconn);
4773 conn_info(tconn, "receiver terminated\n");
4777 /* ********* acknowledge sender ******** */
4779 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4781 struct p_req_state_reply *p = pi->data;
4782 int retcode = be32_to_cpu(p->retcode);
4784 if (retcode >= SS_SUCCESS) {
4785 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4787 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4788 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4789 drbd_set_st_err_str(retcode), retcode);
4791 wake_up(&tconn->ping_wait);
4796 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4798 struct drbd_conf *mdev;
4799 struct p_req_state_reply *p = pi->data;
4800 int retcode = be32_to_cpu(p->retcode);
4802 mdev = vnr_to_mdev(tconn, pi->vnr);
4806 if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4807 D_ASSERT(tconn->agreed_pro_version < 100);
4808 return got_conn_RqSReply(tconn, pi);
4811 if (retcode >= SS_SUCCESS) {
4812 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4814 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4815 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4816 drbd_set_st_err_str(retcode), retcode);
4818 wake_up(&mdev->state_wait);
4823 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4825 return drbd_send_ping_ack(tconn);
4829 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4831 /* restore idle timeout */
4832 tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4833 if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4834 wake_up(&tconn->ping_wait);
4839 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4841 struct drbd_conf *mdev;
4842 struct p_block_ack *p = pi->data;
4843 sector_t sector = be64_to_cpu(p->sector);
4844 int blksize = be32_to_cpu(p->blksize);
4846 mdev = vnr_to_mdev(tconn, pi->vnr);
4850 D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4852 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4854 if (get_ldev(mdev)) {
4855 drbd_rs_complete_io(mdev, sector);
4856 drbd_set_in_sync(mdev, sector, blksize);
4857 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4858 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4861 dec_rs_pending(mdev);
4862 atomic_add(blksize >> 9, &mdev->rs_sect_in);
4868 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4869 struct rb_root *root, const char *func,
4870 enum drbd_req_event what, bool missing_ok)
4872 struct drbd_request *req;
4873 struct bio_and_error m;
4875 spin_lock_irq(&mdev->tconn->req_lock);
4876 req = find_request(mdev, root, id, sector, missing_ok, func);
4877 if (unlikely(!req)) {
4878 spin_unlock_irq(&mdev->tconn->req_lock);
4881 __req_mod(req, what, &m);
4882 spin_unlock_irq(&mdev->tconn->req_lock);
4885 complete_master_bio(mdev, &m);
4889 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4891 struct drbd_conf *mdev;
4892 struct p_block_ack *p = pi->data;
4893 sector_t sector = be64_to_cpu(p->sector);
4894 int blksize = be32_to_cpu(p->blksize);
4895 enum drbd_req_event what;
4897 mdev = vnr_to_mdev(tconn, pi->vnr);
4901 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4903 if (p->block_id == ID_SYNCER) {
4904 drbd_set_in_sync(mdev, sector, blksize);
4905 dec_rs_pending(mdev);
4909 case P_RS_WRITE_ACK:
4910 what = WRITE_ACKED_BY_PEER_AND_SIS;
4913 what = WRITE_ACKED_BY_PEER;
4916 what = RECV_ACKED_BY_PEER;
4918 case P_DISCARD_WRITE:
4919 what = DISCARD_WRITE;
4922 what = POSTPONE_WRITE;
4928 return validate_req_change_req_state(mdev, p->block_id, sector,
4929 &mdev->write_requests, __func__,
4933 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4935 struct drbd_conf *mdev;
4936 struct p_block_ack *p = pi->data;
4937 sector_t sector = be64_to_cpu(p->sector);
4938 int size = be32_to_cpu(p->blksize);
4941 mdev = vnr_to_mdev(tconn, pi->vnr);
4945 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4947 if (p->block_id == ID_SYNCER) {
4948 dec_rs_pending(mdev);
4949 drbd_rs_failed_io(mdev, sector, size);
4953 err = validate_req_change_req_state(mdev, p->block_id, sector,
4954 &mdev->write_requests, __func__,
4957 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4958 The master bio might already be completed, therefore the
4959 request is no longer in the collision hash. */
4960 /* In Protocol B we might already have got a P_RECV_ACK
4961 but then get a P_NEG_ACK afterwards. */
4962 drbd_set_out_of_sync(mdev, sector, size);
4967 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4969 struct drbd_conf *mdev;
4970 struct p_block_ack *p = pi->data;
4971 sector_t sector = be64_to_cpu(p->sector);
4973 mdev = vnr_to_mdev(tconn, pi->vnr);
4977 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4979 dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
4980 (unsigned long long)sector, be32_to_cpu(p->blksize));
4982 return validate_req_change_req_state(mdev, p->block_id, sector,
4983 &mdev->read_requests, __func__,
4987 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4989 struct drbd_conf *mdev;
4992 struct p_block_ack *p = pi->data;
4994 mdev = vnr_to_mdev(tconn, pi->vnr);
4998 sector = be64_to_cpu(p->sector);
4999 size = be32_to_cpu(p->blksize);
5001 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5003 dec_rs_pending(mdev);
5005 if (get_ldev_if_state(mdev, D_FAILED)) {
5006 drbd_rs_complete_io(mdev, sector);
5008 case P_NEG_RS_DREPLY:
5009 drbd_rs_failed_io(mdev, sector, size);
5021 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5023 struct p_barrier_ack *p = pi->data;
5024 struct drbd_conf *mdev;
5027 tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5030 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5031 if (mdev->state.conn == C_AHEAD &&
5032 atomic_read(&mdev->ap_in_flight) == 0 &&
5033 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5034 mdev->start_resync_timer.expires = jiffies + HZ;
5035 add_timer(&mdev->start_resync_timer);
5043 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5045 struct drbd_conf *mdev;
5046 struct p_block_ack *p = pi->data;
5047 struct drbd_work *w;
5051 mdev = vnr_to_mdev(tconn, pi->vnr);
5055 sector = be64_to_cpu(p->sector);
5056 size = be32_to_cpu(p->blksize);
5058 update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5060 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5061 drbd_ov_out_of_sync_found(mdev, sector, size);
5063 ov_out_of_sync_print(mdev);
5065 if (!get_ldev(mdev))
5068 drbd_rs_complete_io(mdev, sector);
5069 dec_rs_pending(mdev);
5073 /* let's advance progress step marks only for every other megabyte */
5074 if ((mdev->ov_left & 0x200) == 0x200)
5075 drbd_advance_rs_marks(mdev, mdev->ov_left);
5077 if (mdev->ov_left == 0) {
5078 w = kmalloc(sizeof(*w), GFP_NOIO);
5080 w->cb = w_ov_finished;
5082 drbd_queue_work_front(&mdev->tconn->data.work, w);
5084 dev_err(DEV, "kmalloc(w) failed.");
5085 ov_out_of_sync_print(mdev);
5086 drbd_resync_finished(mdev);
5093 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5098 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5100 struct drbd_conf *mdev;
5101 int vnr, not_empty = 0;
5104 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5105 flush_signals(current);
5108 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5109 kref_get(&mdev->kref);
5111 if (drbd_finish_peer_reqs(mdev)) {
5112 kref_put(&mdev->kref, &drbd_minor_destroy);
5115 kref_put(&mdev->kref, &drbd_minor_destroy);
5118 set_bit(SIGNAL_ASENDER, &tconn->flags);
5120 spin_lock_irq(&tconn->req_lock);
5121 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5122 not_empty = !list_empty(&mdev->done_ee);
5126 spin_unlock_irq(&tconn->req_lock);
5128 } while (not_empty);
5133 struct asender_cmd {
5135 int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5138 static struct asender_cmd asender_tbl[] = {
5139 [P_PING] = { 0, got_Ping },
5140 [P_PING_ACK] = { 0, got_PingAck },
5141 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5142 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5143 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5144 [P_DISCARD_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5145 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5146 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5147 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5148 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5149 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5150 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5151 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5152 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5153 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5154 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5155 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5158 int drbd_asender(struct drbd_thread *thi)
5160 struct drbd_tconn *tconn = thi->tconn;
5161 struct asender_cmd *cmd = NULL;
5162 struct packet_info pi;
5164 void *buf = tconn->meta.rbuf;
5166 unsigned int header_size = drbd_header_size(tconn);
5167 int expect = header_size;
5168 bool ping_timeout_active = false;
5169 struct net_conf *nc;
5170 int ping_timeo, tcp_cork, ping_int;
5172 current->policy = SCHED_RR; /* Make this a realtime task! */
5173 current->rt_priority = 2; /* more important than all other tasks */
5175 while (get_t_state(thi) == RUNNING) {
5176 drbd_thread_current_set_cpu(thi);
5179 nc = rcu_dereference(tconn->net_conf);
5180 ping_timeo = nc->ping_timeo;
5181 tcp_cork = nc->tcp_cork;
5182 ping_int = nc->ping_int;
5185 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5186 if (drbd_send_ping(tconn)) {
5187 conn_err(tconn, "drbd_send_ping has failed\n");
5190 tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5191 ping_timeout_active = true;
5194 /* TODO: conditionally cork; it may hurt latency if we cork without
5197 drbd_tcp_cork(tconn->meta.socket);
5198 if (tconn_finish_peer_reqs(tconn)) {
5199 conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5202 /* but unconditionally uncork unless disabled */
5204 drbd_tcp_uncork(tconn->meta.socket);
5206 /* short circuit, recv_msg would return EINTR anyways. */
5207 if (signal_pending(current))
5210 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5211 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5213 flush_signals(current);
5216 * -EINTR (on meta) we got a signal
5217 * -EAGAIN (on meta) rcvtimeo expired
5218 * -ECONNRESET other side closed the connection
5219 * -ERESTARTSYS (on data) we got a signal
5220 * rv < 0 other than above: unexpected error!
5221 * rv == expected: full header or command
5222 * rv < expected: "woken" by signal during receive
5223 * rv == 0 : "connection shut down by peer"
5225 if (likely(rv > 0)) {
5228 } else if (rv == 0) {
5229 conn_err(tconn, "meta connection shut down by peer.\n");
5231 } else if (rv == -EAGAIN) {
5232 /* If the data socket received something meanwhile,
5233 * that is good enough: peer is still alive. */
5234 if (time_after(tconn->last_received,
5235 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5237 if (ping_timeout_active) {
5238 conn_err(tconn, "PingAck did not arrive in time.\n");
5241 set_bit(SEND_PING, &tconn->flags);
5243 } else if (rv == -EINTR) {
5246 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5250 if (received == expect && cmd == NULL) {
5251 if (decode_header(tconn, tconn->meta.rbuf, &pi))
5253 cmd = &asender_tbl[pi.cmd];
5254 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5255 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5256 cmdname(pi.cmd), pi.cmd);
5259 expect = header_size + cmd->pkt_size;
5260 if (pi.size != expect - header_size) {
5261 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5266 if (received == expect) {
5269 err = cmd->fn(tconn, &pi);
5271 conn_err(tconn, "%pf failed\n", cmd->fn);
5275 tconn->last_received = jiffies;
5277 if (cmd == &asender_tbl[P_PING_ACK]) {
5278 /* restore idle timeout */
5279 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5280 ping_timeout_active = false;
5283 buf = tconn->meta.rbuf;
5285 expect = header_size;
5292 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5296 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5298 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5300 conn_info(tconn, "asender terminated\n");