drbd: receive_protocol(): We cannot change our own data-integrity-alg setting here
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849
850         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
851                 return -2;
852
853         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
854
855         /* Assume that the peer only understands protocol 80 until we know better.  */
856         tconn->agreed_pro_version = 80;
857
858         do {
859                 struct socket *s;
860
861                 for (try = 0;;) {
862                         /* 3 tries, this should take less than a second! */
863                         s = drbd_try_connect(tconn);
864                         if (s || ++try >= 3)
865                                 break;
866                         /* give the other side time to call bind() & listen() */
867                         schedule_timeout_interruptible(HZ / 10);
868                 }
869
870                 if (s) {
871                         if (!tconn->data.socket) {
872                                 tconn->data.socket = s;
873                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
874                         } else if (!tconn->meta.socket) {
875                                 tconn->meta.socket = s;
876                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
877                         } else {
878                                 conn_err(tconn, "Logic error in conn_connect()\n");
879                                 goto out_release_sockets;
880                         }
881                 }
882
883                 if (tconn->data.socket && tconn->meta.socket) {
884                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
885                         ok = drbd_socket_okay(&tconn->data.socket);
886                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
887                         if (ok)
888                                 break;
889                 }
890
891 retry:
892                 s = drbd_wait_for_connect(tconn);
893                 if (s) {
894                         try = receive_first_packet(tconn, s);
895                         drbd_socket_okay(&tconn->data.socket);
896                         drbd_socket_okay(&tconn->meta.socket);
897                         switch (try) {
898                         case P_INITIAL_DATA:
899                                 if (tconn->data.socket) {
900                                         conn_warn(tconn, "initial packet S crossed\n");
901                                         sock_release(tconn->data.socket);
902                                 }
903                                 tconn->data.socket = s;
904                                 break;
905                         case P_INITIAL_META:
906                                 if (tconn->meta.socket) {
907                                         conn_warn(tconn, "initial packet M crossed\n");
908                                         sock_release(tconn->meta.socket);
909                                 }
910                                 tconn->meta.socket = s;
911                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
912                                 break;
913                         default:
914                                 conn_warn(tconn, "Error receiving initial packet\n");
915                                 sock_release(s);
916                                 if (random32() & 1)
917                                         goto retry;
918                         }
919                 }
920
921                 if (tconn->cstate <= C_DISCONNECTING)
922                         goto out_release_sockets;
923                 if (signal_pending(current)) {
924                         flush_signals(current);
925                         smp_rmb();
926                         if (get_t_state(&tconn->receiver) == EXITING)
927                                 goto out_release_sockets;
928                 }
929
930                 if (tconn->data.socket && &tconn->meta.socket) {
931                         ok = drbd_socket_okay(&tconn->data.socket);
932                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
933                         if (ok)
934                                 break;
935                 }
936         } while (1);
937
938         sock  = tconn->data.socket;
939         msock = tconn->meta.socket;
940
941         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
942         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943
944         sock->sk->sk_allocation = GFP_NOIO;
945         msock->sk->sk_allocation = GFP_NOIO;
946
947         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
948         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
949
950         /* NOT YET ...
951          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
952          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
953          * first set it to the P_CONNECTION_FEATURES timeout,
954          * which we set to 4x the configured ping_timeout. */
955         rcu_read_lock();
956         nc = rcu_dereference(tconn->net_conf);
957
958         sock->sk->sk_sndtimeo =
959         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
960
961         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
962         timeout = nc->timeout * HZ / 10;
963         rcu_read_unlock();
964
965         msock->sk->sk_sndtimeo = timeout;
966
967         /* we don't want delays.
968          * we use TCP_CORK where appropriate, though */
969         drbd_tcp_nodelay(sock);
970         drbd_tcp_nodelay(msock);
971
972         tconn->last_received = jiffies;
973
974         h = drbd_do_features(tconn);
975         if (h <= 0)
976                 return h;
977
978         if (tconn->cram_hmac_tfm) {
979                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
980                 switch (drbd_do_auth(tconn)) {
981                 case -1:
982                         conn_err(tconn, "Authentication of peer failed\n");
983                         return -1;
984                 case 0:
985                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
986                         return 0;
987                 }
988         }
989
990         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
991                 return 0;
992
993         sock->sk->sk_sndtimeo = timeout;
994         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
995
996         drbd_thread_start(&tconn->asender);
997
998         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
999                 return -1;
1000
1001         rcu_read_lock();
1002         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1003                 kref_get(&mdev->kref);
1004                 rcu_read_unlock();
1005                 drbd_connected(mdev);
1006                 kref_put(&mdev->kref, &drbd_minor_destroy);
1007                 rcu_read_lock();
1008         }
1009         rcu_read_unlock();
1010
1011         return h;
1012
1013 out_release_sockets:
1014         if (tconn->data.socket) {
1015                 sock_release(tconn->data.socket);
1016                 tconn->data.socket = NULL;
1017         }
1018         if (tconn->meta.socket) {
1019                 sock_release(tconn->meta.socket);
1020                 tconn->meta.socket = NULL;
1021         }
1022         return -1;
1023 }
1024
1025 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1026 {
1027         unsigned int header_size = drbd_header_size(tconn);
1028
1029         if (header_size == sizeof(struct p_header100) &&
1030             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1031                 struct p_header100 *h = header;
1032                 if (h->pad != 0) {
1033                         conn_err(tconn, "Header padding is not zero\n");
1034                         return -EINVAL;
1035                 }
1036                 pi->vnr = be16_to_cpu(h->volume);
1037                 pi->cmd = be16_to_cpu(h->command);
1038                 pi->size = be32_to_cpu(h->length);
1039         } else if (header_size == sizeof(struct p_header95) &&
1040                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1041                 struct p_header95 *h = header;
1042                 pi->cmd = be16_to_cpu(h->command);
1043                 pi->size = be32_to_cpu(h->length);
1044                 pi->vnr = 0;
1045         } else if (header_size == sizeof(struct p_header80) &&
1046                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1047                 struct p_header80 *h = header;
1048                 pi->cmd = be16_to_cpu(h->command);
1049                 pi->size = be16_to_cpu(h->length);
1050                 pi->vnr = 0;
1051         } else {
1052                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1053                          be32_to_cpu(*(__be32 *)header),
1054                          tconn->agreed_pro_version);
1055                 return -EINVAL;
1056         }
1057         pi->data = header + header_size;
1058         return 0;
1059 }
1060
1061 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1062 {
1063         void *buffer = tconn->data.rbuf;
1064         int err;
1065
1066         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1067         if (err)
1068                 return err;
1069
1070         err = decode_header(tconn, buffer, pi);
1071         tconn->last_received = jiffies;
1072
1073         return err;
1074 }
1075
1076 static void drbd_flush(struct drbd_conf *mdev)
1077 {
1078         int rv;
1079
1080         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1081                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1082                                         NULL);
1083                 if (rv) {
1084                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1085                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1086                          * don't try again for ANY return value != 0
1087                          * if (rv == -EOPNOTSUPP) */
1088                         drbd_bump_write_ordering(mdev, WO_drain_io);
1089                 }
1090                 put_ldev(mdev);
1091         }
1092 }
1093
1094 /**
1095  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1096  * @mdev:       DRBD device.
1097  * @epoch:      Epoch object.
1098  * @ev:         Epoch event.
1099  */
1100 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1101                                                struct drbd_epoch *epoch,
1102                                                enum epoch_event ev)
1103 {
1104         int epoch_size;
1105         struct drbd_epoch *next_epoch;
1106         enum finish_epoch rv = FE_STILL_LIVE;
1107
1108         spin_lock(&mdev->epoch_lock);
1109         do {
1110                 next_epoch = NULL;
1111
1112                 epoch_size = atomic_read(&epoch->epoch_size);
1113
1114                 switch (ev & ~EV_CLEANUP) {
1115                 case EV_PUT:
1116                         atomic_dec(&epoch->active);
1117                         break;
1118                 case EV_GOT_BARRIER_NR:
1119                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1120                         break;
1121                 case EV_BECAME_LAST:
1122                         /* nothing to do*/
1123                         break;
1124                 }
1125
1126                 if (epoch_size != 0 &&
1127                     atomic_read(&epoch->active) == 0 &&
1128                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1129                         if (!(ev & EV_CLEANUP)) {
1130                                 spin_unlock(&mdev->epoch_lock);
1131                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1132                                 spin_lock(&mdev->epoch_lock);
1133                         }
1134                         dec_unacked(mdev);
1135
1136                         if (mdev->current_epoch != epoch) {
1137                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1138                                 list_del(&epoch->list);
1139                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1140                                 mdev->epochs--;
1141                                 kfree(epoch);
1142
1143                                 if (rv == FE_STILL_LIVE)
1144                                         rv = FE_DESTROYED;
1145                         } else {
1146                                 epoch->flags = 0;
1147                                 atomic_set(&epoch->epoch_size, 0);
1148                                 /* atomic_set(&epoch->active, 0); is already zero */
1149                                 if (rv == FE_STILL_LIVE)
1150                                         rv = FE_RECYCLED;
1151                                 wake_up(&mdev->ee_wait);
1152                         }
1153                 }
1154
1155                 if (!next_epoch)
1156                         break;
1157
1158                 epoch = next_epoch;
1159         } while (1);
1160
1161         spin_unlock(&mdev->epoch_lock);
1162
1163         return rv;
1164 }
1165
1166 /**
1167  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1168  * @mdev:       DRBD device.
1169  * @wo:         Write ordering method to try.
1170  */
1171 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1172 {
1173         struct disk_conf *dc;
1174         enum write_ordering_e pwo;
1175         static char *write_ordering_str[] = {
1176                 [WO_none] = "none",
1177                 [WO_drain_io] = "drain",
1178                 [WO_bdev_flush] = "flush",
1179         };
1180
1181         pwo = mdev->write_ordering;
1182         wo = min(pwo, wo);
1183         rcu_read_lock();
1184         dc = rcu_dereference(mdev->ldev->disk_conf);
1185
1186         if (wo == WO_bdev_flush && !dc->disk_flushes)
1187                 wo = WO_drain_io;
1188         if (wo == WO_drain_io && !dc->disk_drain)
1189                 wo = WO_none;
1190         rcu_read_unlock();
1191         mdev->write_ordering = wo;
1192         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1193                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1194 }
1195
1196 /**
1197  * drbd_submit_peer_request()
1198  * @mdev:       DRBD device.
1199  * @peer_req:   peer request
1200  * @rw:         flag field, see bio->bi_rw
1201  *
1202  * May spread the pages to multiple bios,
1203  * depending on bio_add_page restrictions.
1204  *
1205  * Returns 0 if all bios have been submitted,
1206  * -ENOMEM if we could not allocate enough bios,
1207  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1208  *  single page to an empty bio (which should never happen and likely indicates
1209  *  that the lower level IO stack is in some way broken). This has been observed
1210  *  on certain Xen deployments.
1211  */
1212 /* TODO allocate from our own bio_set. */
1213 int drbd_submit_peer_request(struct drbd_conf *mdev,
1214                              struct drbd_peer_request *peer_req,
1215                              const unsigned rw, const int fault_type)
1216 {
1217         struct bio *bios = NULL;
1218         struct bio *bio;
1219         struct page *page = peer_req->pages;
1220         sector_t sector = peer_req->i.sector;
1221         unsigned ds = peer_req->i.size;
1222         unsigned n_bios = 0;
1223         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1224         int err = -ENOMEM;
1225
1226         /* In most cases, we will only need one bio.  But in case the lower
1227          * level restrictions happen to be different at this offset on this
1228          * side than those of the sending peer, we may need to submit the
1229          * request in more than one bio.
1230          *
1231          * Plain bio_alloc is good enough here, this is no DRBD internally
1232          * generated bio, but a bio allocated on behalf of the peer.
1233          */
1234 next_bio:
1235         bio = bio_alloc(GFP_NOIO, nr_pages);
1236         if (!bio) {
1237                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1238                 goto fail;
1239         }
1240         /* > peer_req->i.sector, unless this is the first bio */
1241         bio->bi_sector = sector;
1242         bio->bi_bdev = mdev->ldev->backing_bdev;
1243         bio->bi_rw = rw;
1244         bio->bi_private = peer_req;
1245         bio->bi_end_io = drbd_peer_request_endio;
1246
1247         bio->bi_next = bios;
1248         bios = bio;
1249         ++n_bios;
1250
1251         page_chain_for_each(page) {
1252                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1253                 if (!bio_add_page(bio, page, len, 0)) {
1254                         /* A single page must always be possible!
1255                          * But in case it fails anyways,
1256                          * we deal with it, and complain (below). */
1257                         if (bio->bi_vcnt == 0) {
1258                                 dev_err(DEV,
1259                                         "bio_add_page failed for len=%u, "
1260                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1261                                         len, (unsigned long long)bio->bi_sector);
1262                                 err = -ENOSPC;
1263                                 goto fail;
1264                         }
1265                         goto next_bio;
1266                 }
1267                 ds -= len;
1268                 sector += len >> 9;
1269                 --nr_pages;
1270         }
1271         D_ASSERT(page == NULL);
1272         D_ASSERT(ds == 0);
1273
1274         atomic_set(&peer_req->pending_bios, n_bios);
1275         do {
1276                 bio = bios;
1277                 bios = bios->bi_next;
1278                 bio->bi_next = NULL;
1279
1280                 drbd_generic_make_request(mdev, fault_type, bio);
1281         } while (bios);
1282         return 0;
1283
1284 fail:
1285         while (bios) {
1286                 bio = bios;
1287                 bios = bios->bi_next;
1288                 bio_put(bio);
1289         }
1290         return err;
1291 }
1292
1293 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1294                                              struct drbd_peer_request *peer_req)
1295 {
1296         struct drbd_interval *i = &peer_req->i;
1297
1298         drbd_remove_interval(&mdev->write_requests, i);
1299         drbd_clear_interval(i);
1300
1301         /* Wake up any processes waiting for this peer request to complete.  */
1302         if (i->waiting)
1303                 wake_up(&mdev->misc_wait);
1304 }
1305
1306 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1307 {
1308         struct drbd_conf *mdev;
1309         int rv;
1310         struct p_barrier *p = pi->data;
1311         struct drbd_epoch *epoch;
1312
1313         mdev = vnr_to_mdev(tconn, pi->vnr);
1314         if (!mdev)
1315                 return -EIO;
1316
1317         inc_unacked(mdev);
1318
1319         mdev->current_epoch->barrier_nr = p->barrier;
1320         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1321
1322         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1323          * the activity log, which means it would not be resynced in case the
1324          * R_PRIMARY crashes now.
1325          * Therefore we must send the barrier_ack after the barrier request was
1326          * completed. */
1327         switch (mdev->write_ordering) {
1328         case WO_none:
1329                 if (rv == FE_RECYCLED)
1330                         return 0;
1331
1332                 /* receiver context, in the writeout path of the other node.
1333                  * avoid potential distributed deadlock */
1334                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1335                 if (epoch)
1336                         break;
1337                 else
1338                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1339                         /* Fall through */
1340
1341         case WO_bdev_flush:
1342         case WO_drain_io:
1343                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1344                 drbd_flush(mdev);
1345
1346                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1347                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1348                         if (epoch)
1349                                 break;
1350                 }
1351
1352                 epoch = mdev->current_epoch;
1353                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1354
1355                 D_ASSERT(atomic_read(&epoch->active) == 0);
1356                 D_ASSERT(epoch->flags == 0);
1357
1358                 return 0;
1359         default:
1360                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1361                 return -EIO;
1362         }
1363
1364         epoch->flags = 0;
1365         atomic_set(&epoch->epoch_size, 0);
1366         atomic_set(&epoch->active, 0);
1367
1368         spin_lock(&mdev->epoch_lock);
1369         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1370                 list_add(&epoch->list, &mdev->current_epoch->list);
1371                 mdev->current_epoch = epoch;
1372                 mdev->epochs++;
1373         } else {
1374                 /* The current_epoch got recycled while we allocated this one... */
1375                 kfree(epoch);
1376         }
1377         spin_unlock(&mdev->epoch_lock);
1378
1379         return 0;
1380 }
1381
1382 /* used from receive_RSDataReply (recv_resync_read)
1383  * and from receive_Data */
1384 static struct drbd_peer_request *
1385 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1386               int data_size) __must_hold(local)
1387 {
1388         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1389         struct drbd_peer_request *peer_req;
1390         struct page *page;
1391         int dgs, ds, err;
1392         void *dig_in = mdev->tconn->int_dig_in;
1393         void *dig_vv = mdev->tconn->int_dig_vv;
1394         unsigned long *data;
1395
1396         dgs = 0;
1397         if (mdev->tconn->peer_integrity_tfm) {
1398                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1399                 /*
1400                  * FIXME: Receive the incoming digest into the receive buffer
1401                  *        here, together with its struct p_data?
1402                  */
1403                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1404                 if (err)
1405                         return NULL;
1406                 data_size -= dgs;
1407         }
1408
1409         if (!expect(data_size != 0))
1410                 return NULL;
1411         if (!expect(IS_ALIGNED(data_size, 512)))
1412                 return NULL;
1413         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1414                 return NULL;
1415
1416         /* even though we trust out peer,
1417          * we sometimes have to double check. */
1418         if (sector + (data_size>>9) > capacity) {
1419                 dev_err(DEV, "request from peer beyond end of local disk: "
1420                         "capacity: %llus < sector: %llus + size: %u\n",
1421                         (unsigned long long)capacity,
1422                         (unsigned long long)sector, data_size);
1423                 return NULL;
1424         }
1425
1426         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1427          * "criss-cross" setup, that might cause write-out on some other DRBD,
1428          * which in turn might block on the other node at this very place.  */
1429         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1430         if (!peer_req)
1431                 return NULL;
1432
1433         ds = data_size;
1434         page = peer_req->pages;
1435         page_chain_for_each(page) {
1436                 unsigned len = min_t(int, ds, PAGE_SIZE);
1437                 data = kmap(page);
1438                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1439                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1440                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1441                         data[0] = data[0] ^ (unsigned long)-1;
1442                 }
1443                 kunmap(page);
1444                 if (err) {
1445                         drbd_free_peer_req(mdev, peer_req);
1446                         return NULL;
1447                 }
1448                 ds -= len;
1449         }
1450
1451         if (dgs) {
1452                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1453                 if (memcmp(dig_in, dig_vv, dgs)) {
1454                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1455                                 (unsigned long long)sector, data_size);
1456                         drbd_free_peer_req(mdev, peer_req);
1457                         return NULL;
1458                 }
1459         }
1460         mdev->recv_cnt += data_size>>9;
1461         return peer_req;
1462 }
1463
1464 /* drbd_drain_block() just takes a data block
1465  * out of the socket input buffer, and discards it.
1466  */
1467 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1468 {
1469         struct page *page;
1470         int err = 0;
1471         void *data;
1472
1473         if (!data_size)
1474                 return 0;
1475
1476         page = drbd_alloc_pages(mdev, 1, 1);
1477
1478         data = kmap(page);
1479         while (data_size) {
1480                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1481
1482                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1483                 if (err)
1484                         break;
1485                 data_size -= len;
1486         }
1487         kunmap(page);
1488         drbd_free_pages(mdev, page, 0);
1489         return err;
1490 }
1491
1492 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1493                            sector_t sector, int data_size)
1494 {
1495         struct bio_vec *bvec;
1496         struct bio *bio;
1497         int dgs, err, i, expect;
1498         void *dig_in = mdev->tconn->int_dig_in;
1499         void *dig_vv = mdev->tconn->int_dig_vv;
1500
1501         dgs = 0;
1502         if (mdev->tconn->peer_integrity_tfm) {
1503                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1504                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1505                 if (err)
1506                         return err;
1507                 data_size -= dgs;
1508         }
1509
1510         /* optimistically update recv_cnt.  if receiving fails below,
1511          * we disconnect anyways, and counters will be reset. */
1512         mdev->recv_cnt += data_size>>9;
1513
1514         bio = req->master_bio;
1515         D_ASSERT(sector == bio->bi_sector);
1516
1517         bio_for_each_segment(bvec, bio, i) {
1518                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1519                 expect = min_t(int, data_size, bvec->bv_len);
1520                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1521                 kunmap(bvec->bv_page);
1522                 if (err)
1523                         return err;
1524                 data_size -= expect;
1525         }
1526
1527         if (dgs) {
1528                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1529                 if (memcmp(dig_in, dig_vv, dgs)) {
1530                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1531                         return -EINVAL;
1532                 }
1533         }
1534
1535         D_ASSERT(data_size == 0);
1536         return 0;
1537 }
1538
1539 /*
1540  * e_end_resync_block() is called in asender context via
1541  * drbd_finish_peer_reqs().
1542  */
1543 static int e_end_resync_block(struct drbd_work *w, int unused)
1544 {
1545         struct drbd_peer_request *peer_req =
1546                 container_of(w, struct drbd_peer_request, w);
1547         struct drbd_conf *mdev = w->mdev;
1548         sector_t sector = peer_req->i.sector;
1549         int err;
1550
1551         D_ASSERT(drbd_interval_empty(&peer_req->i));
1552
1553         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1554                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1555                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1556         } else {
1557                 /* Record failure to sync */
1558                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1559
1560                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1561         }
1562         dec_unacked(mdev);
1563
1564         return err;
1565 }
1566
1567 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1568 {
1569         struct drbd_peer_request *peer_req;
1570
1571         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1572         if (!peer_req)
1573                 goto fail;
1574
1575         dec_rs_pending(mdev);
1576
1577         inc_unacked(mdev);
1578         /* corresponding dec_unacked() in e_end_resync_block()
1579          * respective _drbd_clear_done_ee */
1580
1581         peer_req->w.cb = e_end_resync_block;
1582
1583         spin_lock_irq(&mdev->tconn->req_lock);
1584         list_add(&peer_req->w.list, &mdev->sync_ee);
1585         spin_unlock_irq(&mdev->tconn->req_lock);
1586
1587         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1588         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1589                 return 0;
1590
1591         /* don't care for the reason here */
1592         dev_err(DEV, "submit failed, triggering re-connect\n");
1593         spin_lock_irq(&mdev->tconn->req_lock);
1594         list_del(&peer_req->w.list);
1595         spin_unlock_irq(&mdev->tconn->req_lock);
1596
1597         drbd_free_peer_req(mdev, peer_req);
1598 fail:
1599         put_ldev(mdev);
1600         return -EIO;
1601 }
1602
1603 static struct drbd_request *
1604 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1605              sector_t sector, bool missing_ok, const char *func)
1606 {
1607         struct drbd_request *req;
1608
1609         /* Request object according to our peer */
1610         req = (struct drbd_request *)(unsigned long)id;
1611         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1612                 return req;
1613         if (!missing_ok) {
1614                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1615                         (unsigned long)id, (unsigned long long)sector);
1616         }
1617         return NULL;
1618 }
1619
1620 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1621 {
1622         struct drbd_conf *mdev;
1623         struct drbd_request *req;
1624         sector_t sector;
1625         int err;
1626         struct p_data *p = pi->data;
1627
1628         mdev = vnr_to_mdev(tconn, pi->vnr);
1629         if (!mdev)
1630                 return -EIO;
1631
1632         sector = be64_to_cpu(p->sector);
1633
1634         spin_lock_irq(&mdev->tconn->req_lock);
1635         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1636         spin_unlock_irq(&mdev->tconn->req_lock);
1637         if (unlikely(!req))
1638                 return -EIO;
1639
1640         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1641          * special casing it there for the various failure cases.
1642          * still no race with drbd_fail_pending_reads */
1643         err = recv_dless_read(mdev, req, sector, pi->size);
1644         if (!err)
1645                 req_mod(req, DATA_RECEIVED);
1646         /* else: nothing. handled from drbd_disconnect...
1647          * I don't think we may complete this just yet
1648          * in case we are "on-disconnect: freeze" */
1649
1650         return err;
1651 }
1652
1653 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1654 {
1655         struct drbd_conf *mdev;
1656         sector_t sector;
1657         int err;
1658         struct p_data *p = pi->data;
1659
1660         mdev = vnr_to_mdev(tconn, pi->vnr);
1661         if (!mdev)
1662                 return -EIO;
1663
1664         sector = be64_to_cpu(p->sector);
1665         D_ASSERT(p->block_id == ID_SYNCER);
1666
1667         if (get_ldev(mdev)) {
1668                 /* data is submitted to disk within recv_resync_read.
1669                  * corresponding put_ldev done below on error,
1670                  * or in drbd_peer_request_endio. */
1671                 err = recv_resync_read(mdev, sector, pi->size);
1672         } else {
1673                 if (__ratelimit(&drbd_ratelimit_state))
1674                         dev_err(DEV, "Can not write resync data to local disk.\n");
1675
1676                 err = drbd_drain_block(mdev, pi->size);
1677
1678                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1679         }
1680
1681         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1682
1683         return err;
1684 }
1685
1686 static int w_restart_write(struct drbd_work *w, int cancel)
1687 {
1688         struct drbd_request *req = container_of(w, struct drbd_request, w);
1689         struct drbd_conf *mdev = w->mdev;
1690         struct bio *bio;
1691         unsigned long start_time;
1692         unsigned long flags;
1693
1694         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1695         if (!expect(req->rq_state & RQ_POSTPONED)) {
1696                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1697                 return -EIO;
1698         }
1699         bio = req->master_bio;
1700         start_time = req->start_time;
1701         /* Postponed requests will not have their master_bio completed!  */
1702         __req_mod(req, DISCARD_WRITE, NULL);
1703         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1704
1705         while (__drbd_make_request(mdev, bio, start_time))
1706                 /* retry */ ;
1707         return 0;
1708 }
1709
1710 static void restart_conflicting_writes(struct drbd_conf *mdev,
1711                                        sector_t sector, int size)
1712 {
1713         struct drbd_interval *i;
1714         struct drbd_request *req;
1715
1716         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1717                 if (!i->local)
1718                         continue;
1719                 req = container_of(i, struct drbd_request, i);
1720                 if (req->rq_state & RQ_LOCAL_PENDING ||
1721                     !(req->rq_state & RQ_POSTPONED))
1722                         continue;
1723                 if (expect(list_empty(&req->w.list))) {
1724                         req->w.mdev = mdev;
1725                         req->w.cb = w_restart_write;
1726                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1727                 }
1728         }
1729 }
1730
1731 /*
1732  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1733  */
1734 static int e_end_block(struct drbd_work *w, int cancel)
1735 {
1736         struct drbd_peer_request *peer_req =
1737                 container_of(w, struct drbd_peer_request, w);
1738         struct drbd_conf *mdev = w->mdev;
1739         sector_t sector = peer_req->i.sector;
1740         int err = 0, pcmd;
1741
1742         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1743                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1744                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1745                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1746                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1747                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1748                         err = drbd_send_ack(mdev, pcmd, peer_req);
1749                         if (pcmd == P_RS_WRITE_ACK)
1750                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1751                 } else {
1752                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1753                         /* we expect it to be marked out of sync anyways...
1754                          * maybe assert this?  */
1755                 }
1756                 dec_unacked(mdev);
1757         }
1758         /* we delete from the conflict detection hash _after_ we sent out the
1759          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1760         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1761                 spin_lock_irq(&mdev->tconn->req_lock);
1762                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1763                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1764                 if (peer_req->flags & EE_RESTART_REQUESTS)
1765                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1766                 spin_unlock_irq(&mdev->tconn->req_lock);
1767         } else
1768                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1769
1770         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1771
1772         return err;
1773 }
1774
1775 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1776 {
1777         struct drbd_conf *mdev = w->mdev;
1778         struct drbd_peer_request *peer_req =
1779                 container_of(w, struct drbd_peer_request, w);
1780         int err;
1781
1782         err = drbd_send_ack(mdev, ack, peer_req);
1783         dec_unacked(mdev);
1784
1785         return err;
1786 }
1787
1788 static int e_send_discard_write(struct drbd_work *w, int unused)
1789 {
1790         return e_send_ack(w, P_DISCARD_WRITE);
1791 }
1792
1793 static int e_send_retry_write(struct drbd_work *w, int unused)
1794 {
1795         struct drbd_tconn *tconn = w->mdev->tconn;
1796
1797         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1798                              P_RETRY_WRITE : P_DISCARD_WRITE);
1799 }
1800
1801 static bool seq_greater(u32 a, u32 b)
1802 {
1803         /*
1804          * We assume 32-bit wrap-around here.
1805          * For 24-bit wrap-around, we would have to shift:
1806          *  a <<= 8; b <<= 8;
1807          */
1808         return (s32)a - (s32)b > 0;
1809 }
1810
1811 static u32 seq_max(u32 a, u32 b)
1812 {
1813         return seq_greater(a, b) ? a : b;
1814 }
1815
1816 static bool need_peer_seq(struct drbd_conf *mdev)
1817 {
1818         struct drbd_tconn *tconn = mdev->tconn;
1819         int tp;
1820
1821         /*
1822          * We only need to keep track of the last packet_seq number of our peer
1823          * if we are in dual-primary mode and we have the discard flag set; see
1824          * handle_write_conflicts().
1825          */
1826
1827         rcu_read_lock();
1828         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1829         rcu_read_unlock();
1830
1831         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1832 }
1833
1834 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1835 {
1836         unsigned int newest_peer_seq;
1837
1838         if (need_peer_seq(mdev)) {
1839                 spin_lock(&mdev->peer_seq_lock);
1840                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1841                 mdev->peer_seq = newest_peer_seq;
1842                 spin_unlock(&mdev->peer_seq_lock);
1843                 /* wake up only if we actually changed mdev->peer_seq */
1844                 if (peer_seq == newest_peer_seq)
1845                         wake_up(&mdev->seq_wait);
1846         }
1847 }
1848
1849 /* Called from receive_Data.
1850  * Synchronize packets on sock with packets on msock.
1851  *
1852  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1853  * packet traveling on msock, they are still processed in the order they have
1854  * been sent.
1855  *
1856  * Note: we don't care for Ack packets overtaking P_DATA packets.
1857  *
1858  * In case packet_seq is larger than mdev->peer_seq number, there are
1859  * outstanding packets on the msock. We wait for them to arrive.
1860  * In case we are the logically next packet, we update mdev->peer_seq
1861  * ourselves. Correctly handles 32bit wrap around.
1862  *
1863  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1864  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1865  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1866  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1867  *
1868  * returns 0 if we may process the packet,
1869  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1870 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1871 {
1872         DEFINE_WAIT(wait);
1873         long timeout;
1874         int ret;
1875
1876         if (!need_peer_seq(mdev))
1877                 return 0;
1878
1879         spin_lock(&mdev->peer_seq_lock);
1880         for (;;) {
1881                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1882                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1883                         ret = 0;
1884                         break;
1885                 }
1886                 if (signal_pending(current)) {
1887                         ret = -ERESTARTSYS;
1888                         break;
1889                 }
1890                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1891                 spin_unlock(&mdev->peer_seq_lock);
1892                 rcu_read_lock();
1893                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1894                 rcu_read_unlock();
1895                 timeout = schedule_timeout(timeout);
1896                 spin_lock(&mdev->peer_seq_lock);
1897                 if (!timeout) {
1898                         ret = -ETIMEDOUT;
1899                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1900                         break;
1901                 }
1902         }
1903         spin_unlock(&mdev->peer_seq_lock);
1904         finish_wait(&mdev->seq_wait, &wait);
1905         return ret;
1906 }
1907
1908 /* see also bio_flags_to_wire()
1909  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1910  * flags and back. We may replicate to other kernel versions. */
1911 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1912 {
1913         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1914                 (dpf & DP_FUA ? REQ_FUA : 0) |
1915                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1916                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1917 }
1918
1919 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1920                                     unsigned int size)
1921 {
1922         struct drbd_interval *i;
1923
1924     repeat:
1925         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1926                 struct drbd_request *req;
1927                 struct bio_and_error m;
1928
1929                 if (!i->local)
1930                         continue;
1931                 req = container_of(i, struct drbd_request, i);
1932                 if (!(req->rq_state & RQ_POSTPONED))
1933                         continue;
1934                 req->rq_state &= ~RQ_POSTPONED;
1935                 __req_mod(req, NEG_ACKED, &m);
1936                 spin_unlock_irq(&mdev->tconn->req_lock);
1937                 if (m.bio)
1938                         complete_master_bio(mdev, &m);
1939                 spin_lock_irq(&mdev->tconn->req_lock);
1940                 goto repeat;
1941         }
1942 }
1943
1944 static int handle_write_conflicts(struct drbd_conf *mdev,
1945                                   struct drbd_peer_request *peer_req)
1946 {
1947         struct drbd_tconn *tconn = mdev->tconn;
1948         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1949         sector_t sector = peer_req->i.sector;
1950         const unsigned int size = peer_req->i.size;
1951         struct drbd_interval *i;
1952         bool equal;
1953         int err;
1954
1955         /*
1956          * Inserting the peer request into the write_requests tree will prevent
1957          * new conflicting local requests from being added.
1958          */
1959         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1960
1961     repeat:
1962         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963                 if (i == &peer_req->i)
1964                         continue;
1965
1966                 if (!i->local) {
1967                         /*
1968                          * Our peer has sent a conflicting remote request; this
1969                          * should not happen in a two-node setup.  Wait for the
1970                          * earlier peer request to complete.
1971                          */
1972                         err = drbd_wait_misc(mdev, i);
1973                         if (err)
1974                                 goto out;
1975                         goto repeat;
1976                 }
1977
1978                 equal = i->sector == sector && i->size == size;
1979                 if (resolve_conflicts) {
1980                         /*
1981                          * If the peer request is fully contained within the
1982                          * overlapping request, it can be discarded; otherwise,
1983                          * it will be retried once all overlapping requests
1984                          * have completed.
1985                          */
1986                         bool discard = i->sector <= sector && i->sector +
1987                                        (i->size >> 9) >= sector + (size >> 9);
1988
1989                         if (!equal)
1990                                 dev_alert(DEV, "Concurrent writes detected: "
1991                                                "local=%llus +%u, remote=%llus +%u, "
1992                                                "assuming %s came first\n",
1993                                           (unsigned long long)i->sector, i->size,
1994                                           (unsigned long long)sector, size,
1995                                           discard ? "local" : "remote");
1996
1997                         inc_unacked(mdev);
1998                         peer_req->w.cb = discard ? e_send_discard_write :
1999                                                    e_send_retry_write;
2000                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2001                         wake_asender(mdev->tconn);
2002
2003                         err = -ENOENT;
2004                         goto out;
2005                 } else {
2006                         struct drbd_request *req =
2007                                 container_of(i, struct drbd_request, i);
2008
2009                         if (!equal)
2010                                 dev_alert(DEV, "Concurrent writes detected: "
2011                                                "local=%llus +%u, remote=%llus +%u\n",
2012                                           (unsigned long long)i->sector, i->size,
2013                                           (unsigned long long)sector, size);
2014
2015                         if (req->rq_state & RQ_LOCAL_PENDING ||
2016                             !(req->rq_state & RQ_POSTPONED)) {
2017                                 /*
2018                                  * Wait for the node with the discard flag to
2019                                  * decide if this request will be discarded or
2020                                  * retried.  Requests that are discarded will
2021                                  * disappear from the write_requests tree.
2022                                  *
2023                                  * In addition, wait for the conflicting
2024                                  * request to finish locally before submitting
2025                                  * the conflicting peer request.
2026                                  */
2027                                 err = drbd_wait_misc(mdev, &req->i);
2028                                 if (err) {
2029                                         _conn_request_state(mdev->tconn,
2030                                                             NS(conn, C_TIMEOUT),
2031                                                             CS_HARD);
2032                                         fail_postponed_requests(mdev, sector, size);
2033                                         goto out;
2034                                 }
2035                                 goto repeat;
2036                         }
2037                         /*
2038                          * Remember to restart the conflicting requests after
2039                          * the new peer request has completed.
2040                          */
2041                         peer_req->flags |= EE_RESTART_REQUESTS;
2042                 }
2043         }
2044         err = 0;
2045
2046     out:
2047         if (err)
2048                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2049         return err;
2050 }
2051
2052 /* mirrored write */
2053 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2054 {
2055         struct drbd_conf *mdev;
2056         sector_t sector;
2057         struct drbd_peer_request *peer_req;
2058         struct p_data *p = pi->data;
2059         u32 peer_seq = be32_to_cpu(p->seq_num);
2060         int rw = WRITE;
2061         u32 dp_flags;
2062         int err, tp;
2063
2064         mdev = vnr_to_mdev(tconn, pi->vnr);
2065         if (!mdev)
2066                 return -EIO;
2067
2068         if (!get_ldev(mdev)) {
2069                 int err2;
2070
2071                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2072                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2073                 atomic_inc(&mdev->current_epoch->epoch_size);
2074                 err2 = drbd_drain_block(mdev, pi->size);
2075                 if (!err)
2076                         err = err2;
2077                 return err;
2078         }
2079
2080         /*
2081          * Corresponding put_ldev done either below (on various errors), or in
2082          * drbd_peer_request_endio, if we successfully submit the data at the
2083          * end of this function.
2084          */
2085
2086         sector = be64_to_cpu(p->sector);
2087         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2088         if (!peer_req) {
2089                 put_ldev(mdev);
2090                 return -EIO;
2091         }
2092
2093         peer_req->w.cb = e_end_block;
2094
2095         dp_flags = be32_to_cpu(p->dp_flags);
2096         rw |= wire_flags_to_bio(mdev, dp_flags);
2097
2098         if (dp_flags & DP_MAY_SET_IN_SYNC)
2099                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2100
2101         spin_lock(&mdev->epoch_lock);
2102         peer_req->epoch = mdev->current_epoch;
2103         atomic_inc(&peer_req->epoch->epoch_size);
2104         atomic_inc(&peer_req->epoch->active);
2105         spin_unlock(&mdev->epoch_lock);
2106
2107         rcu_read_lock();
2108         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2109         rcu_read_unlock();
2110         if (tp) {
2111                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2112                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2113                 if (err)
2114                         goto out_interrupted;
2115                 spin_lock_irq(&mdev->tconn->req_lock);
2116                 err = handle_write_conflicts(mdev, peer_req);
2117                 if (err) {
2118                         spin_unlock_irq(&mdev->tconn->req_lock);
2119                         if (err == -ENOENT) {
2120                                 put_ldev(mdev);
2121                                 return 0;
2122                         }
2123                         goto out_interrupted;
2124                 }
2125         } else
2126                 spin_lock_irq(&mdev->tconn->req_lock);
2127         list_add(&peer_req->w.list, &mdev->active_ee);
2128         spin_unlock_irq(&mdev->tconn->req_lock);
2129
2130         if (mdev->tconn->agreed_pro_version < 100) {
2131                 rcu_read_lock();
2132                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2133                 case DRBD_PROT_C:
2134                         dp_flags |= DP_SEND_WRITE_ACK;
2135                         break;
2136                 case DRBD_PROT_B:
2137                         dp_flags |= DP_SEND_RECEIVE_ACK;
2138                         break;
2139                 }
2140                 rcu_read_unlock();
2141         }
2142
2143         if (dp_flags & DP_SEND_WRITE_ACK) {
2144                 peer_req->flags |= EE_SEND_WRITE_ACK;
2145                 inc_unacked(mdev);
2146                 /* corresponding dec_unacked() in e_end_block()
2147                  * respective _drbd_clear_done_ee */
2148         }
2149
2150         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2151                 /* I really don't like it that the receiver thread
2152                  * sends on the msock, but anyways */
2153                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2154         }
2155
2156         if (mdev->state.pdsk < D_INCONSISTENT) {
2157                 /* In case we have the only disk of the cluster, */
2158                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2159                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2160                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2161                 drbd_al_begin_io(mdev, &peer_req->i);
2162         }
2163
2164         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2165         if (!err)
2166                 return 0;
2167
2168         /* don't care for the reason here */
2169         dev_err(DEV, "submit failed, triggering re-connect\n");
2170         spin_lock_irq(&mdev->tconn->req_lock);
2171         list_del(&peer_req->w.list);
2172         drbd_remove_epoch_entry_interval(mdev, peer_req);
2173         spin_unlock_irq(&mdev->tconn->req_lock);
2174         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2175                 drbd_al_complete_io(mdev, &peer_req->i);
2176
2177 out_interrupted:
2178         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2179         put_ldev(mdev);
2180         drbd_free_peer_req(mdev, peer_req);
2181         return err;
2182 }
2183
2184 /* We may throttle resync, if the lower device seems to be busy,
2185  * and current sync rate is above c_min_rate.
2186  *
2187  * To decide whether or not the lower device is busy, we use a scheme similar
2188  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2189  * (more than 64 sectors) of activity we cannot account for with our own resync
2190  * activity, it obviously is "busy".
2191  *
2192  * The current sync rate used here uses only the most recent two step marks,
2193  * to have a short time average so we can react faster.
2194  */
2195 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2196 {
2197         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2198         unsigned long db, dt, dbdt;
2199         struct lc_element *tmp;
2200         int curr_events;
2201         int throttle = 0;
2202         unsigned int c_min_rate;
2203
2204         rcu_read_lock();
2205         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2206         rcu_read_unlock();
2207
2208         /* feature disabled? */
2209         if (c_min_rate == 0)
2210                 return 0;
2211
2212         spin_lock_irq(&mdev->al_lock);
2213         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2214         if (tmp) {
2215                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2216                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2217                         spin_unlock_irq(&mdev->al_lock);
2218                         return 0;
2219                 }
2220                 /* Do not slow down if app IO is already waiting for this extent */
2221         }
2222         spin_unlock_irq(&mdev->al_lock);
2223
2224         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2225                       (int)part_stat_read(&disk->part0, sectors[1]) -
2226                         atomic_read(&mdev->rs_sect_ev);
2227
2228         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2229                 unsigned long rs_left;
2230                 int i;
2231
2232                 mdev->rs_last_events = curr_events;
2233
2234                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2235                  * approx. */
2236                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2237
2238                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2239                         rs_left = mdev->ov_left;
2240                 else
2241                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2242
2243                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2244                 if (!dt)
2245                         dt++;
2246                 db = mdev->rs_mark_left[i] - rs_left;
2247                 dbdt = Bit2KB(db/dt);
2248
2249                 if (dbdt > c_min_rate)
2250                         throttle = 1;
2251         }
2252         return throttle;
2253 }
2254
2255
2256 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2257 {
2258         struct drbd_conf *mdev;
2259         sector_t sector;
2260         sector_t capacity;
2261         struct drbd_peer_request *peer_req;
2262         struct digest_info *di = NULL;
2263         int size, verb;
2264         unsigned int fault_type;
2265         struct p_block_req *p = pi->data;
2266
2267         mdev = vnr_to_mdev(tconn, pi->vnr);
2268         if (!mdev)
2269                 return -EIO;
2270         capacity = drbd_get_capacity(mdev->this_bdev);
2271
2272         sector = be64_to_cpu(p->sector);
2273         size   = be32_to_cpu(p->blksize);
2274
2275         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2276                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2277                                 (unsigned long long)sector, size);
2278                 return -EINVAL;
2279         }
2280         if (sector + (size>>9) > capacity) {
2281                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2282                                 (unsigned long long)sector, size);
2283                 return -EINVAL;
2284         }
2285
2286         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2287                 verb = 1;
2288                 switch (pi->cmd) {
2289                 case P_DATA_REQUEST:
2290                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2291                         break;
2292                 case P_RS_DATA_REQUEST:
2293                 case P_CSUM_RS_REQUEST:
2294                 case P_OV_REQUEST:
2295                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2296                         break;
2297                 case P_OV_REPLY:
2298                         verb = 0;
2299                         dec_rs_pending(mdev);
2300                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2301                         break;
2302                 default:
2303                         BUG();
2304                 }
2305                 if (verb && __ratelimit(&drbd_ratelimit_state))
2306                         dev_err(DEV, "Can not satisfy peer's read request, "
2307                             "no local data.\n");
2308
2309                 /* drain possibly payload */
2310                 return drbd_drain_block(mdev, pi->size);
2311         }
2312
2313         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2314          * "criss-cross" setup, that might cause write-out on some other DRBD,
2315          * which in turn might block on the other node at this very place.  */
2316         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2317         if (!peer_req) {
2318                 put_ldev(mdev);
2319                 return -ENOMEM;
2320         }
2321
2322         switch (pi->cmd) {
2323         case P_DATA_REQUEST:
2324                 peer_req->w.cb = w_e_end_data_req;
2325                 fault_type = DRBD_FAULT_DT_RD;
2326                 /* application IO, don't drbd_rs_begin_io */
2327                 goto submit;
2328
2329         case P_RS_DATA_REQUEST:
2330                 peer_req->w.cb = w_e_end_rsdata_req;
2331                 fault_type = DRBD_FAULT_RS_RD;
2332                 /* used in the sector offset progress display */
2333                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2334                 break;
2335
2336         case P_OV_REPLY:
2337         case P_CSUM_RS_REQUEST:
2338                 fault_type = DRBD_FAULT_RS_RD;
2339                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2340                 if (!di)
2341                         goto out_free_e;
2342
2343                 di->digest_size = pi->size;
2344                 di->digest = (((char *)di)+sizeof(struct digest_info));
2345
2346                 peer_req->digest = di;
2347                 peer_req->flags |= EE_HAS_DIGEST;
2348
2349                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2350                         goto out_free_e;
2351
2352                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2353                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2354                         peer_req->w.cb = w_e_end_csum_rs_req;
2355                         /* used in the sector offset progress display */
2356                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2357                 } else if (pi->cmd == P_OV_REPLY) {
2358                         /* track progress, we may need to throttle */
2359                         atomic_add(size >> 9, &mdev->rs_sect_in);
2360                         peer_req->w.cb = w_e_end_ov_reply;
2361                         dec_rs_pending(mdev);
2362                         /* drbd_rs_begin_io done when we sent this request,
2363                          * but accounting still needs to be done. */
2364                         goto submit_for_resync;
2365                 }
2366                 break;
2367
2368         case P_OV_REQUEST:
2369                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2370                     mdev->tconn->agreed_pro_version >= 90) {
2371                         unsigned long now = jiffies;
2372                         int i;
2373                         mdev->ov_start_sector = sector;
2374                         mdev->ov_position = sector;
2375                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2376                         mdev->rs_total = mdev->ov_left;
2377                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2378                                 mdev->rs_mark_left[i] = mdev->ov_left;
2379                                 mdev->rs_mark_time[i] = now;
2380                         }
2381                         dev_info(DEV, "Online Verify start sector: %llu\n",
2382                                         (unsigned long long)sector);
2383                 }
2384                 peer_req->w.cb = w_e_end_ov_req;
2385                 fault_type = DRBD_FAULT_RS_RD;
2386                 break;
2387
2388         default:
2389                 BUG();
2390         }
2391
2392         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2393          * wrt the receiver, but it is not as straightforward as it may seem.
2394          * Various places in the resync start and stop logic assume resync
2395          * requests are processed in order, requeuing this on the worker thread
2396          * introduces a bunch of new code for synchronization between threads.
2397          *
2398          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2399          * "forever", throttling after drbd_rs_begin_io will lock that extent
2400          * for application writes for the same time.  For now, just throttle
2401          * here, where the rest of the code expects the receiver to sleep for
2402          * a while, anyways.
2403          */
2404
2405         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2406          * this defers syncer requests for some time, before letting at least
2407          * on request through.  The resync controller on the receiving side
2408          * will adapt to the incoming rate accordingly.
2409          *
2410          * We cannot throttle here if remote is Primary/SyncTarget:
2411          * we would also throttle its application reads.
2412          * In that case, throttling is done on the SyncTarget only.
2413          */
2414         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2415                 schedule_timeout_uninterruptible(HZ/10);
2416         if (drbd_rs_begin_io(mdev, sector))
2417                 goto out_free_e;
2418
2419 submit_for_resync:
2420         atomic_add(size >> 9, &mdev->rs_sect_ev);
2421
2422 submit:
2423         inc_unacked(mdev);
2424         spin_lock_irq(&mdev->tconn->req_lock);
2425         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2426         spin_unlock_irq(&mdev->tconn->req_lock);
2427
2428         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2429                 return 0;
2430
2431         /* don't care for the reason here */
2432         dev_err(DEV, "submit failed, triggering re-connect\n");
2433         spin_lock_irq(&mdev->tconn->req_lock);
2434         list_del(&peer_req->w.list);
2435         spin_unlock_irq(&mdev->tconn->req_lock);
2436         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2437
2438 out_free_e:
2439         put_ldev(mdev);
2440         drbd_free_peer_req(mdev, peer_req);
2441         return -EIO;
2442 }
2443
2444 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2445 {
2446         int self, peer, rv = -100;
2447         unsigned long ch_self, ch_peer;
2448         enum drbd_after_sb_p after_sb_0p;
2449
2450         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2451         peer = mdev->p_uuid[UI_BITMAP] & 1;
2452
2453         ch_peer = mdev->p_uuid[UI_SIZE];
2454         ch_self = mdev->comm_bm_set;
2455
2456         rcu_read_lock();
2457         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2458         rcu_read_unlock();
2459         switch (after_sb_0p) {
2460         case ASB_CONSENSUS:
2461         case ASB_DISCARD_SECONDARY:
2462         case ASB_CALL_HELPER:
2463         case ASB_VIOLENTLY:
2464                 dev_err(DEV, "Configuration error.\n");
2465                 break;
2466         case ASB_DISCONNECT:
2467                 break;
2468         case ASB_DISCARD_YOUNGER_PRI:
2469                 if (self == 0 && peer == 1) {
2470                         rv = -1;
2471                         break;
2472                 }
2473                 if (self == 1 && peer == 0) {
2474                         rv =  1;
2475                         break;
2476                 }
2477                 /* Else fall through to one of the other strategies... */
2478         case ASB_DISCARD_OLDER_PRI:
2479                 if (self == 0 && peer == 1) {
2480                         rv = 1;
2481                         break;
2482                 }
2483                 if (self == 1 && peer == 0) {
2484                         rv = -1;
2485                         break;
2486                 }
2487                 /* Else fall through to one of the other strategies... */
2488                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2489                      "Using discard-least-changes instead\n");
2490         case ASB_DISCARD_ZERO_CHG:
2491                 if (ch_peer == 0 && ch_self == 0) {
2492                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2493                                 ? -1 : 1;
2494                         break;
2495                 } else {
2496                         if (ch_peer == 0) { rv =  1; break; }
2497                         if (ch_self == 0) { rv = -1; break; }
2498                 }
2499                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2500                         break;
2501         case ASB_DISCARD_LEAST_CHG:
2502                 if      (ch_self < ch_peer)
2503                         rv = -1;
2504                 else if (ch_self > ch_peer)
2505                         rv =  1;
2506                 else /* ( ch_self == ch_peer ) */
2507                      /* Well, then use something else. */
2508                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2509                                 ? -1 : 1;
2510                 break;
2511         case ASB_DISCARD_LOCAL:
2512                 rv = -1;
2513                 break;
2514         case ASB_DISCARD_REMOTE:
2515                 rv =  1;
2516         }
2517
2518         return rv;
2519 }
2520
2521 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2522 {
2523         int hg, rv = -100;
2524         enum drbd_after_sb_p after_sb_1p;
2525
2526         rcu_read_lock();
2527         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2528         rcu_read_unlock();
2529         switch (after_sb_1p) {
2530         case ASB_DISCARD_YOUNGER_PRI:
2531         case ASB_DISCARD_OLDER_PRI:
2532         case ASB_DISCARD_LEAST_CHG:
2533         case ASB_DISCARD_LOCAL:
2534         case ASB_DISCARD_REMOTE:
2535         case ASB_DISCARD_ZERO_CHG:
2536                 dev_err(DEV, "Configuration error.\n");
2537                 break;
2538         case ASB_DISCONNECT:
2539                 break;
2540         case ASB_CONSENSUS:
2541                 hg = drbd_asb_recover_0p(mdev);
2542                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2543                         rv = hg;
2544                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2545                         rv = hg;
2546                 break;
2547         case ASB_VIOLENTLY:
2548                 rv = drbd_asb_recover_0p(mdev);
2549                 break;
2550         case ASB_DISCARD_SECONDARY:
2551                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2552         case ASB_CALL_HELPER:
2553                 hg = drbd_asb_recover_0p(mdev);
2554                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2555                         enum drbd_state_rv rv2;
2556
2557                         drbd_set_role(mdev, R_SECONDARY, 0);
2558                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2559                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2560                           * we do not need to wait for the after state change work either. */
2561                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2562                         if (rv2 != SS_SUCCESS) {
2563                                 drbd_khelper(mdev, "pri-lost-after-sb");
2564                         } else {
2565                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2566                                 rv = hg;
2567                         }
2568                 } else
2569                         rv = hg;
2570         }
2571
2572         return rv;
2573 }
2574
2575 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2576 {
2577         int hg, rv = -100;
2578         enum drbd_after_sb_p after_sb_2p;
2579
2580         rcu_read_lock();
2581         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2582         rcu_read_unlock();
2583         switch (after_sb_2p) {
2584         case ASB_DISCARD_YOUNGER_PRI:
2585         case ASB_DISCARD_OLDER_PRI:
2586         case ASB_DISCARD_LEAST_CHG:
2587         case ASB_DISCARD_LOCAL:
2588         case ASB_DISCARD_REMOTE:
2589         case ASB_CONSENSUS:
2590         case ASB_DISCARD_SECONDARY:
2591         case ASB_DISCARD_ZERO_CHG:
2592                 dev_err(DEV, "Configuration error.\n");
2593                 break;
2594         case ASB_VIOLENTLY:
2595                 rv = drbd_asb_recover_0p(mdev);
2596                 break;
2597         case ASB_DISCONNECT:
2598                 break;
2599         case ASB_CALL_HELPER:
2600                 hg = drbd_asb_recover_0p(mdev);
2601                 if (hg == -1) {
2602                         enum drbd_state_rv rv2;
2603
2604                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2605                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2606                           * we do not need to wait for the after state change work either. */
2607                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2608                         if (rv2 != SS_SUCCESS) {
2609                                 drbd_khelper(mdev, "pri-lost-after-sb");
2610                         } else {
2611                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2612                                 rv = hg;
2613                         }
2614                 } else
2615                         rv = hg;
2616         }
2617
2618         return rv;
2619 }
2620
2621 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2622                            u64 bits, u64 flags)
2623 {
2624         if (!uuid) {
2625                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2626                 return;
2627         }
2628         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2629              text,
2630              (unsigned long long)uuid[UI_CURRENT],
2631              (unsigned long long)uuid[UI_BITMAP],
2632              (unsigned long long)uuid[UI_HISTORY_START],
2633              (unsigned long long)uuid[UI_HISTORY_END],
2634              (unsigned long long)bits,
2635              (unsigned long long)flags);
2636 }
2637
2638 /*
2639   100   after split brain try auto recover
2640     2   C_SYNC_SOURCE set BitMap
2641     1   C_SYNC_SOURCE use BitMap
2642     0   no Sync
2643    -1   C_SYNC_TARGET use BitMap
2644    -2   C_SYNC_TARGET set BitMap
2645  -100   after split brain, disconnect
2646 -1000   unrelated data
2647 -1091   requires proto 91
2648 -1096   requires proto 96
2649  */
2650 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2651 {
2652         u64 self, peer;
2653         int i, j;
2654
2655         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2656         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2657
2658         *rule_nr = 10;
2659         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2660                 return 0;
2661
2662         *rule_nr = 20;
2663         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2664              peer != UUID_JUST_CREATED)
2665                 return -2;
2666
2667         *rule_nr = 30;
2668         if (self != UUID_JUST_CREATED &&
2669             (peer == UUID_JUST_CREATED || peer == (u64)0))
2670                 return 2;
2671
2672         if (self == peer) {
2673                 int rct, dc; /* roles at crash time */
2674
2675                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2676
2677                         if (mdev->tconn->agreed_pro_version < 91)
2678                                 return -1091;
2679
2680                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2681                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2682                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2683                                 drbd_uuid_set_bm(mdev, 0UL);
2684
2685                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2686                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2687                                 *rule_nr = 34;
2688                         } else {
2689                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2690                                 *rule_nr = 36;
2691                         }
2692
2693                         return 1;
2694                 }
2695
2696                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2697
2698                         if (mdev->tconn->agreed_pro_version < 91)
2699                                 return -1091;
2700
2701                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2702                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2703                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2704
2705                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2706                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2707                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2708
2709                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2710                                 *rule_nr = 35;
2711                         } else {
2712                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2713                                 *rule_nr = 37;
2714                         }
2715
2716                         return -1;
2717                 }
2718
2719                 /* Common power [off|failure] */
2720                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2721                         (mdev->p_uuid[UI_FLAGS] & 2);
2722                 /* lowest bit is set when we were primary,
2723                  * next bit (weight 2) is set when peer was primary */
2724                 *rule_nr = 40;
2725
2726                 switch (rct) {
2727                 case 0: /* !self_pri && !peer_pri */ return 0;
2728                 case 1: /*  self_pri && !peer_pri */ return 1;
2729                 case 2: /* !self_pri &&  peer_pri */ return -1;
2730                 case 3: /*  self_pri &&  peer_pri */
2731                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2732                         return dc ? -1 : 1;
2733                 }
2734         }
2735
2736         *rule_nr = 50;
2737         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2738         if (self == peer)
2739                 return -1;
2740
2741         *rule_nr = 51;
2742         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2743         if (self == peer) {
2744                 if (mdev->tconn->agreed_pro_version < 96 ?
2745                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2746                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2747                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2748                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2749                            resync as sync source modifications of the peer's UUIDs. */
2750
2751                         if (mdev->tconn->agreed_pro_version < 91)
2752                                 return -1091;
2753
2754                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2755                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2756
2757                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2758                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2759
2760                         return -1;
2761                 }
2762         }
2763
2764         *rule_nr = 60;
2765         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2766         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2767                 peer = mdev->p_uuid[i] & ~((u64)1);
2768                 if (self == peer)
2769                         return -2;
2770         }
2771
2772         *rule_nr = 70;
2773         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2774         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2775         if (self == peer)
2776                 return 1;
2777
2778         *rule_nr = 71;
2779         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2780         if (self == peer) {
2781                 if (mdev->tconn->agreed_pro_version < 96 ?
2782                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2783                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2784                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2785                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2786                            resync as sync source modifications of our UUIDs. */
2787
2788                         if (mdev->tconn->agreed_pro_version < 91)
2789                                 return -1091;
2790
2791                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2792                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2793
2794                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2795                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2796                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2797
2798                         return 1;
2799                 }
2800         }
2801
2802
2803         *rule_nr = 80;
2804         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2805         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2806                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2807                 if (self == peer)
2808                         return 2;
2809         }
2810
2811         *rule_nr = 90;
2812         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2813         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2814         if (self == peer && self != ((u64)0))
2815                 return 100;
2816
2817         *rule_nr = 100;
2818         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2819                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2820                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2821                         peer = mdev->p_uuid[j] & ~((u64)1);
2822                         if (self == peer)
2823                                 return -100;
2824                 }
2825         }
2826
2827         return -1000;
2828 }
2829
2830 /* drbd_sync_handshake() returns the new conn state on success, or
2831    CONN_MASK (-1) on failure.
2832  */
2833 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2834                                            enum drbd_disk_state peer_disk) __must_hold(local)
2835 {
2836         enum drbd_conns rv = C_MASK;
2837         enum drbd_disk_state mydisk;
2838         struct net_conf *nc;
2839         int hg, rule_nr, rr_conflict, tentative;
2840
2841         mydisk = mdev->state.disk;
2842         if (mydisk == D_NEGOTIATING)
2843                 mydisk = mdev->new_state_tmp.disk;
2844
2845         dev_info(DEV, "drbd_sync_handshake:\n");
2846         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2847         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2848                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2849
2850         hg = drbd_uuid_compare(mdev, &rule_nr);
2851
2852         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2853
2854         if (hg == -1000) {
2855                 dev_alert(DEV, "Unrelated data, aborting!\n");
2856                 return C_MASK;
2857         }
2858         if (hg < -1000) {
2859                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2860                 return C_MASK;
2861         }
2862
2863         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2864             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2865                 int f = (hg == -100) || abs(hg) == 2;
2866                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2867                 if (f)
2868                         hg = hg*2;
2869                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2870                      hg > 0 ? "source" : "target");
2871         }
2872
2873         if (abs(hg) == 100)
2874                 drbd_khelper(mdev, "initial-split-brain");
2875
2876         rcu_read_lock();
2877         nc = rcu_dereference(mdev->tconn->net_conf);
2878
2879         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2880                 int pcount = (mdev->state.role == R_PRIMARY)
2881                            + (peer_role == R_PRIMARY);
2882                 int forced = (hg == -100);
2883
2884                 switch (pcount) {
2885                 case 0:
2886                         hg = drbd_asb_recover_0p(mdev);
2887                         break;
2888                 case 1:
2889                         hg = drbd_asb_recover_1p(mdev);
2890                         break;
2891                 case 2:
2892                         hg = drbd_asb_recover_2p(mdev);
2893                         break;
2894                 }
2895                 if (abs(hg) < 100) {
2896                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2897                              "automatically solved. Sync from %s node\n",
2898                              pcount, (hg < 0) ? "peer" : "this");
2899                         if (forced) {
2900                                 dev_warn(DEV, "Doing a full sync, since"
2901                                      " UUIDs where ambiguous.\n");
2902                                 hg = hg*2;
2903                         }
2904                 }
2905         }
2906
2907         if (hg == -100) {
2908                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2909                         hg = -1;
2910                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2911                         hg = 1;
2912
2913                 if (abs(hg) < 100)
2914                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2915                              "Sync from %s node\n",
2916                              (hg < 0) ? "peer" : "this");
2917         }
2918         rr_conflict = nc->rr_conflict;
2919         tentative = nc->tentative;
2920         rcu_read_unlock();
2921
2922         if (hg == -100) {
2923                 /* FIXME this log message is not correct if we end up here
2924                  * after an attempted attach on a diskless node.
2925                  * We just refuse to attach -- well, we drop the "connection"
2926                  * to that disk, in a way... */
2927                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2928                 drbd_khelper(mdev, "split-brain");
2929                 return C_MASK;
2930         }
2931
2932         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2933                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2934                 return C_MASK;
2935         }
2936
2937         if (hg < 0 && /* by intention we do not use mydisk here. */
2938             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2939                 switch (rr_conflict) {
2940                 case ASB_CALL_HELPER:
2941                         drbd_khelper(mdev, "pri-lost");
2942                         /* fall through */
2943                 case ASB_DISCONNECT:
2944                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2945                         return C_MASK;
2946                 case ASB_VIOLENTLY:
2947                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2948                              "assumption\n");
2949                 }
2950         }
2951
2952         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2953                 if (hg == 0)
2954                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2955                 else
2956                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2957                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2958                                  abs(hg) >= 2 ? "full" : "bit-map based");
2959                 return C_MASK;
2960         }
2961
2962         if (abs(hg) >= 2) {
2963                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2964                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2965                                         BM_LOCKED_SET_ALLOWED))
2966                         return C_MASK;
2967         }
2968
2969         if (hg > 0) { /* become sync source. */
2970                 rv = C_WF_BITMAP_S;
2971         } else if (hg < 0) { /* become sync target */
2972                 rv = C_WF_BITMAP_T;
2973         } else {
2974                 rv = C_CONNECTED;
2975                 if (drbd_bm_total_weight(mdev)) {
2976                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2977                              drbd_bm_total_weight(mdev));
2978                 }
2979         }
2980
2981         return rv;
2982 }
2983
2984 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
2985 {
2986         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2987         if (peer == ASB_DISCARD_REMOTE)
2988                 return ASB_DISCARD_LOCAL;
2989
2990         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2991         if (peer == ASB_DISCARD_LOCAL)
2992                 return ASB_DISCARD_REMOTE;
2993
2994         /* everything else is valid if they are equal on both sides. */
2995         return peer;
2996 }
2997
2998 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2999 {
3000         struct p_protocol *p = pi->data;
3001         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3002         int p_proto, p_discard_my_data, p_two_primaries, cf;
3003         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3004         char integrity_alg[SHARED_SECRET_MAX] = "";
3005         struct crypto_hash *peer_integrity_tfm = NULL;
3006         void *int_dig_in = NULL, *int_dig_vv = NULL;
3007
3008         p_proto         = be32_to_cpu(p->protocol);
3009         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3010         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3011         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3012         p_two_primaries = be32_to_cpu(p->two_primaries);
3013         cf              = be32_to_cpu(p->conn_flags);
3014         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3015
3016         if (tconn->agreed_pro_version >= 87) {
3017                 int err;
3018
3019                 if (pi->size > sizeof(integrity_alg))
3020                         return -EIO;
3021                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3022                 if (err)
3023                         return err;
3024                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3025         }
3026
3027         if (pi->cmd == P_PROTOCOL_UPDATE) {
3028                 if (integrity_alg[0]) {
3029                         int hash_size;
3030
3031                         /*
3032                          * We can only change the peer data integrity algorithm
3033                          * here.  Changing our own data integrity algorithm
3034                          * requires that we send a P_PROTOCOL_UPDATE packet at
3035                          * the same time; otherwise, the peer has no way to
3036                          * tell between which packets the algorithm should
3037                          * change.
3038                          */
3039
3040                         peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3041                         if (!peer_integrity_tfm) {
3042                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3043                                          integrity_alg);
3044                                 goto disconnect;
3045                         }
3046
3047                         hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3048                         int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3049                         int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3050                         if (!(int_dig_in && int_dig_vv)) {
3051                                 conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3052                                 goto disconnect;
3053                         }
3054                 }
3055
3056                 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3057                 if (!new_net_conf) {
3058                         conn_err(tconn, "Allocation of new net_conf failed\n");
3059                         goto disconnect;
3060                 }
3061
3062                 mutex_lock(&tconn->data.mutex);
3063                 mutex_lock(&tconn->conf_update);
3064                 old_net_conf = tconn->net_conf;
3065                 *new_net_conf = *old_net_conf;
3066
3067                 new_net_conf->wire_protocol = p_proto;
3068                 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3069                 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3070                 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3071                 new_net_conf->two_primaries = p_two_primaries;
3072                 strcpy(new_net_conf->integrity_alg, integrity_alg);
3073                 new_net_conf->integrity_alg_len = strlen(integrity_alg) + 1;
3074
3075                 rcu_assign_pointer(tconn->net_conf, new_net_conf);
3076                 mutex_unlock(&tconn->conf_update);
3077                 mutex_unlock(&tconn->data.mutex);
3078
3079                 crypto_free_hash(tconn->peer_integrity_tfm);
3080                 kfree(tconn->int_dig_in);
3081                 kfree(tconn->int_dig_vv);
3082                 tconn->peer_integrity_tfm = peer_integrity_tfm;
3083                 tconn->int_dig_in = int_dig_in;
3084                 tconn->int_dig_vv = int_dig_vv;
3085
3086                 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3087                         conn_info(tconn, "peer data-integrity-alg: %s\n",
3088                                   integrity_alg[0] ? integrity_alg : "(none)");
3089
3090                 synchronize_rcu();
3091                 kfree(old_net_conf);
3092         } else {
3093                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3094
3095                 if (cf & CF_DRY_RUN)
3096                         set_bit(CONN_DRY_RUN, &tconn->flags);
3097
3098                 rcu_read_lock();
3099                 nc = rcu_dereference(tconn->net_conf);
3100
3101                 if (p_proto != nc->wire_protocol) {
3102                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3103                         goto disconnect_rcu_unlock;
3104                 }
3105
3106                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3107                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3108                         goto disconnect_rcu_unlock;
3109                 }
3110
3111                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3112                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3113                         goto disconnect_rcu_unlock;
3114                 }
3115
3116                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3117                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3118                         goto disconnect_rcu_unlock;
3119                 }
3120
3121                 if (p_discard_my_data && nc->discard_my_data) {
3122                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3123                         goto disconnect_rcu_unlock;
3124                 }
3125
3126                 if (p_two_primaries != nc->two_primaries) {
3127                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3128                         goto disconnect_rcu_unlock;
3129                 }
3130
3131                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3132                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3133                         goto disconnect_rcu_unlock;
3134                 }
3135
3136                 rcu_read_unlock();
3137         }
3138         return 0;
3139
3140 disconnect_rcu_unlock:
3141         rcu_read_unlock();
3142 disconnect:
3143         crypto_free_hash(peer_integrity_tfm);
3144         kfree(int_dig_in);
3145         kfree(int_dig_vv);
3146         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3147         return -EIO;
3148 }
3149
3150 /* helper function
3151  * input: alg name, feature name
3152  * return: NULL (alg name was "")
3153  *         ERR_PTR(error) if something goes wrong
3154  *         or the crypto hash ptr, if it worked out ok. */
3155 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3156                 const char *alg, const char *name)
3157 {
3158         struct crypto_hash *tfm;
3159
3160         if (!alg[0])
3161                 return NULL;
3162
3163         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3164         if (IS_ERR(tfm)) {
3165                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3166                         alg, name, PTR_ERR(tfm));
3167                 return tfm;
3168         }
3169         return tfm;
3170 }
3171
3172 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3173 {
3174         void *buffer = tconn->data.rbuf;
3175         int size = pi->size;
3176
3177         while (size) {
3178                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3179                 s = drbd_recv(tconn, buffer, s);
3180                 if (s <= 0) {
3181                         if (s < 0)
3182                                 return s;
3183                         break;
3184                 }
3185                 size -= s;
3186         }
3187         if (size)
3188                 return -EIO;
3189         return 0;
3190 }
3191
3192 /*
3193  * config_unknown_volume  -  device configuration command for unknown volume
3194  *
3195  * When a device is added to an existing connection, the node on which the
3196  * device is added first will send configuration commands to its peer but the
3197  * peer will not know about the device yet.  It will warn and ignore these
3198  * commands.  Once the device is added on the second node, the second node will
3199  * send the same device configuration commands, but in the other direction.
3200  *
3201  * (We can also end up here if drbd is misconfigured.)
3202  */
3203 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3204 {
3205         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3206                   cmdname(pi->cmd), pi->vnr);
3207         return ignore_remaining_packet(tconn, pi);
3208 }
3209
3210 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3211 {
3212         struct drbd_conf *mdev;
3213         struct p_rs_param_95 *p;
3214         unsigned int header_size, data_size, exp_max_sz;
3215         struct crypto_hash *verify_tfm = NULL;
3216         struct crypto_hash *csums_tfm = NULL;
3217         struct net_conf *old_net_conf, *new_net_conf = NULL;
3218         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3219         const int apv = tconn->agreed_pro_version;
3220         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3221         int fifo_size = 0;
3222         int err;
3223
3224         mdev = vnr_to_mdev(tconn, pi->vnr);
3225         if (!mdev)
3226                 return config_unknown_volume(tconn, pi);
3227
3228         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3229                     : apv == 88 ? sizeof(struct p_rs_param)
3230                                         + SHARED_SECRET_MAX
3231                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3232                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3233
3234         if (pi->size > exp_max_sz) {
3235                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3236                     pi->size, exp_max_sz);
3237                 return -EIO;
3238         }
3239
3240         if (apv <= 88) {
3241                 header_size = sizeof(struct p_rs_param);
3242                 data_size = pi->size - header_size;
3243         } else if (apv <= 94) {
3244                 header_size = sizeof(struct p_rs_param_89);
3245                 data_size = pi->size - header_size;
3246                 D_ASSERT(data_size == 0);
3247         } else {
3248                 header_size = sizeof(struct p_rs_param_95);
3249                 data_size = pi->size - header_size;
3250                 D_ASSERT(data_size == 0);
3251         }
3252
3253         /* initialize verify_alg and csums_alg */
3254         p = pi->data;
3255         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3256
3257         err = drbd_recv_all(mdev->tconn, p, header_size);
3258         if (err)
3259                 return err;
3260
3261         mutex_lock(&mdev->tconn->conf_update);
3262         old_net_conf = mdev->tconn->net_conf;
3263         if (get_ldev(mdev)) {
3264                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3265                 if (!new_disk_conf) {
3266                         put_ldev(mdev);
3267                         mutex_unlock(&mdev->tconn->conf_update);
3268                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3269                         return -ENOMEM;
3270                 }
3271
3272                 old_disk_conf = mdev->ldev->disk_conf;
3273                 *new_disk_conf = *old_disk_conf;
3274
3275                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3276         }
3277
3278         if (apv >= 88) {
3279                 if (apv == 88) {
3280                         if (data_size > SHARED_SECRET_MAX) {
3281                                 dev_err(DEV, "verify-alg too long, "
3282                                     "peer wants %u, accepting only %u byte\n",
3283                                                 data_size, SHARED_SECRET_MAX);
3284                                 err = -EIO;
3285                                 goto reconnect;
3286                         }
3287
3288                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3289                         if (err)
3290                                 goto reconnect;
3291                         /* we expect NUL terminated string */
3292                         /* but just in case someone tries to be evil */
3293                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3294                         p->verify_alg[data_size-1] = 0;
3295
3296                 } else /* apv >= 89 */ {
3297                         /* we still expect NUL terminated strings */
3298                         /* but just in case someone tries to be evil */
3299                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3300                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3301                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3302                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3303                 }
3304
3305                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3306                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3307                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3308                                     old_net_conf->verify_alg, p->verify_alg);
3309                                 goto disconnect;
3310                         }
3311                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3312                                         p->verify_alg, "verify-alg");
3313                         if (IS_ERR(verify_tfm)) {
3314                                 verify_tfm = NULL;
3315                                 goto disconnect;
3316                         }
3317                 }
3318
3319                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3320                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3321                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3322                                     old_net_conf->csums_alg, p->csums_alg);
3323                                 goto disconnect;
3324                         }
3325                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3326                                         p->csums_alg, "csums-alg");
3327                         if (IS_ERR(csums_tfm)) {
3328                                 csums_tfm = NULL;
3329                                 goto disconnect;
3330                         }
3331                 }
3332
3333                 if (apv > 94 && new_disk_conf) {
3334                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3335                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3336                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3337                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3338
3339                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3340                         if (fifo_size != mdev->rs_plan_s->size) {
3341                                 new_plan = fifo_alloc(fifo_size);
3342                                 if (!new_plan) {
3343                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3344                                         put_ldev(mdev);
3345                                         goto disconnect;
3346                                 }
3347                         }
3348                 }
3349
3350                 if (verify_tfm || csums_tfm) {
3351                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3352                         if (!new_net_conf) {
3353                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3354                                 goto disconnect;
3355                         }
3356
3357                         *new_net_conf = *old_net_conf;
3358
3359                         if (verify_tfm) {
3360                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3361                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3362                                 crypto_free_hash(mdev->tconn->verify_tfm);
3363                                 mdev->tconn->verify_tfm = verify_tfm;
3364                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3365                         }
3366                         if (csums_tfm) {
3367                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3368                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3369                                 crypto_free_hash(mdev->tconn->csums_tfm);
3370                                 mdev->tconn->csums_tfm = csums_tfm;
3371                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3372                         }
3373                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3374                 }
3375         }
3376
3377         if (new_disk_conf) {
3378                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3379                 put_ldev(mdev);
3380         }
3381
3382         if (new_plan) {
3383                 old_plan = mdev->rs_plan_s;
3384                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3385         }
3386
3387         mutex_unlock(&mdev->tconn->conf_update);
3388         synchronize_rcu();
3389         if (new_net_conf)
3390                 kfree(old_net_conf);
3391         kfree(old_disk_conf);
3392         kfree(old_plan);
3393
3394         return 0;
3395
3396 reconnect:
3397         if (new_disk_conf) {
3398                 put_ldev(mdev);
3399                 kfree(new_disk_conf);
3400         }
3401         mutex_unlock(&mdev->tconn->conf_update);
3402         return -EIO;
3403
3404 disconnect:
3405         kfree(new_plan);
3406         if (new_disk_conf) {
3407                 put_ldev(mdev);
3408                 kfree(new_disk_conf);
3409         }
3410         mutex_unlock(&mdev->tconn->conf_update);
3411         /* just for completeness: actually not needed,
3412          * as this is not reached if csums_tfm was ok. */
3413         crypto_free_hash(csums_tfm);
3414         /* but free the verify_tfm again, if csums_tfm did not work out */
3415         crypto_free_hash(verify_tfm);
3416         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3417         return -EIO;
3418 }
3419
3420 /* warn if the arguments differ by more than 12.5% */
3421 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3422         const char *s, sector_t a, sector_t b)
3423 {
3424         sector_t d;
3425         if (a == 0 || b == 0)
3426                 return;
3427         d = (a > b) ? (a - b) : (b - a);
3428         if (d > (a>>3) || d > (b>>3))
3429                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3430                      (unsigned long long)a, (unsigned long long)b);
3431 }
3432
3433 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3434 {
3435         struct drbd_conf *mdev;
3436         struct p_sizes *p = pi->data;
3437         enum determine_dev_size dd = unchanged;
3438         sector_t p_size, p_usize, my_usize;
3439         int ldsc = 0; /* local disk size changed */
3440         enum dds_flags ddsf;
3441
3442         mdev = vnr_to_mdev(tconn, pi->vnr);
3443         if (!mdev)
3444                 return config_unknown_volume(tconn, pi);
3445
3446         p_size = be64_to_cpu(p->d_size);
3447         p_usize = be64_to_cpu(p->u_size);
3448
3449         /* just store the peer's disk size for now.
3450          * we still need to figure out whether we accept that. */
3451         mdev->p_size = p_size;
3452
3453         if (get_ldev(mdev)) {
3454                 rcu_read_lock();
3455                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3456                 rcu_read_unlock();
3457
3458                 warn_if_differ_considerably(mdev, "lower level device sizes",
3459                            p_size, drbd_get_max_capacity(mdev->ldev));
3460                 warn_if_differ_considerably(mdev, "user requested size",
3461                                             p_usize, my_usize);
3462
3463                 /* if this is the first connect, or an otherwise expected
3464                  * param exchange, choose the minimum */
3465                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3466                         p_usize = min_not_zero(my_usize, p_usize);
3467
3468                 /* Never shrink a device with usable data during connect.
3469                    But allow online shrinking if we are connected. */
3470                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3471                     drbd_get_capacity(mdev->this_bdev) &&
3472                     mdev->state.disk >= D_OUTDATED &&
3473                     mdev->state.conn < C_CONNECTED) {
3474                         dev_err(DEV, "The peer's disk size is too small!\n");
3475                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3476                         put_ldev(mdev);
3477                         return -EIO;
3478                 }
3479
3480                 if (my_usize != p_usize) {
3481                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3482
3483                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3484                         if (!new_disk_conf) {
3485                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3486                                 put_ldev(mdev);
3487                                 return -ENOMEM;
3488                         }
3489
3490                         mutex_lock(&mdev->tconn->conf_update);
3491                         old_disk_conf = mdev->ldev->disk_conf;
3492                         *new_disk_conf = *old_disk_conf;
3493                         new_disk_conf->disk_size = p_usize;
3494
3495                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3496                         mutex_unlock(&mdev->tconn->conf_update);
3497                         synchronize_rcu();
3498                         kfree(old_disk_conf);
3499
3500                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3501                                  (unsigned long)my_usize);
3502                 }
3503
3504                 put_ldev(mdev);
3505         }
3506
3507         ddsf = be16_to_cpu(p->dds_flags);
3508         if (get_ldev(mdev)) {
3509                 dd = drbd_determine_dev_size(mdev, ddsf);
3510                 put_ldev(mdev);
3511                 if (dd == dev_size_error)
3512                         return -EIO;
3513                 drbd_md_sync(mdev);
3514         } else {
3515                 /* I am diskless, need to accept the peer's size. */
3516                 drbd_set_my_capacity(mdev, p_size);
3517         }
3518
3519         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3520         drbd_reconsider_max_bio_size(mdev);
3521
3522         if (get_ldev(mdev)) {
3523                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3524                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3525                         ldsc = 1;
3526                 }
3527
3528                 put_ldev(mdev);
3529         }
3530
3531         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3532                 if (be64_to_cpu(p->c_size) !=
3533                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3534                         /* we have different sizes, probably peer
3535                          * needs to know my new size... */
3536                         drbd_send_sizes(mdev, 0, ddsf);
3537                 }
3538                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3539                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3540                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3541                             mdev->state.disk >= D_INCONSISTENT) {
3542                                 if (ddsf & DDSF_NO_RESYNC)
3543                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3544                                 else
3545                                         resync_after_online_grow(mdev);
3546                         } else
3547                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3548                 }
3549         }
3550
3551         return 0;
3552 }
3553
3554 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3555 {
3556         struct drbd_conf *mdev;
3557         struct p_uuids *p = pi->data;
3558         u64 *p_uuid;
3559         int i, updated_uuids = 0;
3560
3561         mdev = vnr_to_mdev(tconn, pi->vnr);
3562         if (!mdev)
3563                 return config_unknown_volume(tconn, pi);
3564
3565         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3566
3567         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3568                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3569
3570         kfree(mdev->p_uuid);
3571         mdev->p_uuid = p_uuid;
3572
3573         if (mdev->state.conn < C_CONNECTED &&
3574             mdev->state.disk < D_INCONSISTENT &&
3575             mdev->state.role == R_PRIMARY &&
3576             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3577                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3578                     (unsigned long long)mdev->ed_uuid);
3579                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3580                 return -EIO;
3581         }
3582
3583         if (get_ldev(mdev)) {
3584                 int skip_initial_sync =
3585                         mdev->state.conn == C_CONNECTED &&
3586                         mdev->tconn->agreed_pro_version >= 90 &&
3587                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3588                         (p_uuid[UI_FLAGS] & 8);
3589                 if (skip_initial_sync) {
3590                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3591                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3592                                         "clear_n_write from receive_uuids",
3593                                         BM_LOCKED_TEST_ALLOWED);
3594                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3595                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3596                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3597                                         CS_VERBOSE, NULL);
3598                         drbd_md_sync(mdev);
3599                         updated_uuids = 1;
3600                 }
3601                 put_ldev(mdev);
3602         } else if (mdev->state.disk < D_INCONSISTENT &&
3603                    mdev->state.role == R_PRIMARY) {
3604                 /* I am a diskless primary, the peer just created a new current UUID
3605                    for me. */
3606                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3607         }
3608
3609         /* Before we test for the disk state, we should wait until an eventually
3610            ongoing cluster wide state change is finished. That is important if
3611            we are primary and are detaching from our disk. We need to see the
3612            new disk state... */
3613         mutex_lock(mdev->state_mutex);
3614         mutex_unlock(mdev->state_mutex);
3615         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3616                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3617
3618         if (updated_uuids)
3619                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3620
3621         return 0;
3622 }
3623
3624 /**
3625  * convert_state() - Converts the peer's view of the cluster state to our point of view
3626  * @ps:         The state as seen by the peer.
3627  */
3628 static union drbd_state convert_state(union drbd_state ps)
3629 {
3630         union drbd_state ms;
3631
3632         static enum drbd_conns c_tab[] = {
3633                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3634                 [C_CONNECTED] = C_CONNECTED,
3635
3636                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3637                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3638                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3639                 [C_VERIFY_S]       = C_VERIFY_T,
3640                 [C_MASK]   = C_MASK,
3641         };
3642
3643         ms.i = ps.i;
3644
3645         ms.conn = c_tab[ps.conn];
3646         ms.peer = ps.role;
3647         ms.role = ps.peer;
3648         ms.pdsk = ps.disk;
3649         ms.disk = ps.pdsk;
3650         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3651
3652         return ms;
3653 }
3654
3655 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3656 {
3657         struct drbd_conf *mdev;
3658         struct p_req_state *p = pi->data;
3659         union drbd_state mask, val;
3660         enum drbd_state_rv rv;
3661
3662         mdev = vnr_to_mdev(tconn, pi->vnr);
3663         if (!mdev)
3664                 return -EIO;
3665
3666         mask.i = be32_to_cpu(p->mask);
3667         val.i = be32_to_cpu(p->val);
3668
3669         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3670             mutex_is_locked(mdev->state_mutex)) {
3671                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3672                 return 0;
3673         }
3674
3675         mask = convert_state(mask);
3676         val = convert_state(val);
3677
3678         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3679         drbd_send_sr_reply(mdev, rv);
3680
3681         drbd_md_sync(mdev);
3682
3683         return 0;
3684 }
3685
3686 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3687 {
3688         struct p_req_state *p = pi->data;
3689         union drbd_state mask, val;
3690         enum drbd_state_rv rv;
3691
3692         mask.i = be32_to_cpu(p->mask);
3693         val.i = be32_to_cpu(p->val);
3694
3695         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3696             mutex_is_locked(&tconn->cstate_mutex)) {
3697                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3698                 return 0;
3699         }
3700
3701         mask = convert_state(mask);
3702         val = convert_state(val);
3703
3704         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3705         conn_send_sr_reply(tconn, rv);
3706
3707         return 0;
3708 }
3709
3710 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3711 {
3712         struct drbd_conf *mdev;
3713         struct p_state *p = pi->data;
3714         union drbd_state os, ns, peer_state;
3715         enum drbd_disk_state real_peer_disk;
3716         enum chg_state_flags cs_flags;
3717         int rv;
3718
3719         mdev = vnr_to_mdev(tconn, pi->vnr);
3720         if (!mdev)
3721                 return config_unknown_volume(tconn, pi);
3722
3723         peer_state.i = be32_to_cpu(p->state);
3724
3725         real_peer_disk = peer_state.disk;
3726         if (peer_state.disk == D_NEGOTIATING) {
3727                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3728                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3729         }
3730
3731         spin_lock_irq(&mdev->tconn->req_lock);
3732  retry:
3733         os = ns = drbd_read_state(mdev);
3734         spin_unlock_irq(&mdev->tconn->req_lock);
3735
3736         /* peer says his disk is uptodate, while we think it is inconsistent,
3737          * and this happens while we think we have a sync going on. */
3738         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3739             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3740                 /* If we are (becoming) SyncSource, but peer is still in sync
3741                  * preparation, ignore its uptodate-ness to avoid flapping, it
3742                  * will change to inconsistent once the peer reaches active
3743                  * syncing states.
3744                  * It may have changed syncer-paused flags, however, so we
3745                  * cannot ignore this completely. */
3746                 if (peer_state.conn > C_CONNECTED &&
3747                     peer_state.conn < C_SYNC_SOURCE)
3748                         real_peer_disk = D_INCONSISTENT;
3749
3750                 /* if peer_state changes to connected at the same time,
3751                  * it explicitly notifies us that it finished resync.
3752                  * Maybe we should finish it up, too? */
3753                 else if (os.conn >= C_SYNC_SOURCE &&
3754                          peer_state.conn == C_CONNECTED) {
3755                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3756                                 drbd_resync_finished(mdev);
3757                         return 0;
3758                 }
3759         }
3760
3761         /* peer says his disk is inconsistent, while we think it is uptodate,
3762          * and this happens while the peer still thinks we have a sync going on,
3763          * but we think we are already done with the sync.
3764          * We ignore this to avoid flapping pdsk.
3765          * This should not happen, if the peer is a recent version of drbd. */
3766         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3767             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3768                 real_peer_disk = D_UP_TO_DATE;
3769
3770         if (ns.conn == C_WF_REPORT_PARAMS)
3771                 ns.conn = C_CONNECTED;
3772
3773         if (peer_state.conn == C_AHEAD)
3774                 ns.conn = C_BEHIND;
3775
3776         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3777             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3778                 int cr; /* consider resync */
3779
3780                 /* if we established a new connection */
3781                 cr  = (os.conn < C_CONNECTED);
3782                 /* if we had an established connection
3783                  * and one of the nodes newly attaches a disk */
3784                 cr |= (os.conn == C_CONNECTED &&
3785                        (peer_state.disk == D_NEGOTIATING ||
3786                         os.disk == D_NEGOTIATING));
3787                 /* if we have both been inconsistent, and the peer has been
3788                  * forced to be UpToDate with --overwrite-data */
3789                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3790                 /* if we had been plain connected, and the admin requested to
3791                  * start a sync by "invalidate" or "invalidate-remote" */
3792                 cr |= (os.conn == C_CONNECTED &&
3793                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3794                                  peer_state.conn <= C_WF_BITMAP_T));
3795
3796                 if (cr)
3797                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3798
3799                 put_ldev(mdev);
3800                 if (ns.conn == C_MASK) {
3801                         ns.conn = C_CONNECTED;
3802                         if (mdev->state.disk == D_NEGOTIATING) {
3803                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3804                         } else if (peer_state.disk == D_NEGOTIATING) {
3805                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3806                                 peer_state.disk = D_DISKLESS;
3807                                 real_peer_disk = D_DISKLESS;
3808                         } else {
3809                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3810                                         return -EIO;
3811                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3812                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3813                                 return -EIO;
3814                         }
3815                 }
3816         }
3817
3818         spin_lock_irq(&mdev->tconn->req_lock);
3819         if (os.i != drbd_read_state(mdev).i)
3820                 goto retry;
3821         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3822         ns.peer = peer_state.role;
3823         ns.pdsk = real_peer_disk;
3824         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3825         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3826                 ns.disk = mdev->new_state_tmp.disk;
3827         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3828         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3829             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3830                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3831                    for temporal network outages! */
3832                 spin_unlock_irq(&mdev->tconn->req_lock);
3833                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3834                 tl_clear(mdev->tconn);
3835                 drbd_uuid_new_current(mdev);
3836                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3837                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3838                 return -EIO;
3839         }
3840         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3841         ns = drbd_read_state(mdev);
3842         spin_unlock_irq(&mdev->tconn->req_lock);
3843
3844         if (rv < SS_SUCCESS) {
3845                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3846                 return -EIO;
3847         }
3848
3849         if (os.conn > C_WF_REPORT_PARAMS) {
3850                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3851                     peer_state.disk != D_NEGOTIATING ) {
3852                         /* we want resync, peer has not yet decided to sync... */
3853                         /* Nowadays only used when forcing a node into primary role and
3854                            setting its disk to UpToDate with that */
3855                         drbd_send_uuids(mdev);
3856                         drbd_send_state(mdev);
3857                 }
3858         }
3859
3860         mutex_lock(&mdev->tconn->conf_update);
3861         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3862         mutex_unlock(&mdev->tconn->conf_update);
3863
3864         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3865
3866         return 0;
3867 }
3868
3869 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3870 {
3871         struct drbd_conf *mdev;
3872         struct p_rs_uuid *p = pi->data;
3873
3874         mdev = vnr_to_mdev(tconn, pi->vnr);
3875         if (!mdev)
3876                 return -EIO;
3877
3878         wait_event(mdev->misc_wait,
3879                    mdev->state.conn == C_WF_SYNC_UUID ||
3880                    mdev->state.conn == C_BEHIND ||
3881                    mdev->state.conn < C_CONNECTED ||
3882                    mdev->state.disk < D_NEGOTIATING);
3883
3884         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3885
3886         /* Here the _drbd_uuid_ functions are right, current should
3887            _not_ be rotated into the history */
3888         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3889                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3890                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3891
3892                 drbd_print_uuids(mdev, "updated sync uuid");
3893                 drbd_start_resync(mdev, C_SYNC_TARGET);
3894
3895                 put_ldev(mdev);
3896         } else
3897                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3898
3899         return 0;
3900 }
3901
3902 /**
3903  * receive_bitmap_plain
3904  *
3905  * Return 0 when done, 1 when another iteration is needed, and a negative error
3906  * code upon failure.
3907  */
3908 static int
3909 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3910                      unsigned long *p, struct bm_xfer_ctx *c)
3911 {
3912         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3913                                  drbd_header_size(mdev->tconn);
3914         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3915                                        c->bm_words - c->word_offset);
3916         unsigned int want = num_words * sizeof(*p);
3917         int err;
3918
3919         if (want != size) {
3920                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3921                 return -EIO;
3922         }
3923         if (want == 0)
3924                 return 0;
3925         err = drbd_recv_all(mdev->tconn, p, want);
3926         if (err)
3927                 return err;
3928
3929         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3930
3931         c->word_offset += num_words;
3932         c->bit_offset = c->word_offset * BITS_PER_LONG;
3933         if (c->bit_offset > c->bm_bits)
3934                 c->bit_offset = c->bm_bits;
3935
3936         return 1;
3937 }
3938
3939 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3940 {
3941         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3942 }
3943
3944 static int dcbp_get_start(struct p_compressed_bm *p)
3945 {
3946         return (p->encoding & 0x80) != 0;
3947 }
3948
3949 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3950 {
3951         return (p->encoding >> 4) & 0x7;
3952 }
3953
3954 /**
3955  * recv_bm_rle_bits
3956  *
3957  * Return 0 when done, 1 when another iteration is needed, and a negative error
3958  * code upon failure.
3959  */
3960 static int
3961 recv_bm_rle_bits(struct drbd_conf *mdev,
3962                 struct p_compressed_bm *p,
3963                  struct bm_xfer_ctx *c,
3964                  unsigned int len)
3965 {
3966         struct bitstream bs;
3967         u64 look_ahead;
3968         u64 rl;
3969         u64 tmp;
3970         unsigned long s = c->bit_offset;
3971         unsigned long e;
3972         int toggle = dcbp_get_start(p);
3973         int have;
3974         int bits;
3975
3976         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3977
3978         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3979         if (bits < 0)
3980                 return -EIO;
3981
3982         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3983                 bits = vli_decode_bits(&rl, look_ahead);
3984                 if (bits <= 0)
3985                         return -EIO;
3986
3987                 if (toggle) {
3988                         e = s + rl -1;
3989                         if (e >= c->bm_bits) {
3990                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3991                                 return -EIO;
3992                         }
3993                         _drbd_bm_set_bits(mdev, s, e);
3994                 }
3995
3996                 if (have < bits) {
3997                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3998                                 have, bits, look_ahead,
3999                                 (unsigned int)(bs.cur.b - p->code),
4000                                 (unsigned int)bs.buf_len);
4001                         return -EIO;
4002                 }
4003                 look_ahead >>= bits;
4004                 have -= bits;
4005
4006                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4007                 if (bits < 0)
4008                         return -EIO;
4009                 look_ahead |= tmp << have;
4010                 have += bits;
4011         }
4012
4013         c->bit_offset = s;
4014         bm_xfer_ctx_bit_to_word_offset(c);
4015
4016         return (s != c->bm_bits);
4017 }
4018
4019 /**
4020  * decode_bitmap_c
4021  *
4022  * Return 0 when done, 1 when another iteration is needed, and a negative error
4023  * code upon failure.
4024  */
4025 static int
4026 decode_bitmap_c(struct drbd_conf *mdev,
4027                 struct p_compressed_bm *p,
4028                 struct bm_xfer_ctx *c,
4029                 unsigned int len)
4030 {
4031         if (dcbp_get_code(p) == RLE_VLI_Bits)
4032                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4033
4034         /* other variants had been implemented for evaluation,
4035          * but have been dropped as this one turned out to be "best"
4036          * during all our tests. */
4037
4038         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4039         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4040         return -EIO;
4041 }
4042
4043 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4044                 const char *direction, struct bm_xfer_ctx *c)
4045 {
4046         /* what would it take to transfer it "plaintext" */
4047         unsigned int header_size = drbd_header_size(mdev->tconn);
4048         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4049         unsigned int plain =
4050                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4051                 c->bm_words * sizeof(unsigned long);
4052         unsigned int total = c->bytes[0] + c->bytes[1];
4053         unsigned int r;
4054
4055         /* total can not be zero. but just in case: */
4056         if (total == 0)
4057                 return;
4058
4059         /* don't report if not compressed */
4060         if (total >= plain)
4061                 return;
4062
4063         /* total < plain. check for overflow, still */
4064         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4065                                     : (1000 * total / plain);
4066
4067         if (r > 1000)
4068                 r = 1000;
4069
4070         r = 1000 - r;
4071         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4072              "total %u; compression: %u.%u%%\n",
4073                         direction,
4074                         c->bytes[1], c->packets[1],
4075                         c->bytes[0], c->packets[0],
4076                         total, r/10, r % 10);
4077 }
4078
4079 /* Since we are processing the bitfield from lower addresses to higher,
4080    it does not matter if the process it in 32 bit chunks or 64 bit
4081    chunks as long as it is little endian. (Understand it as byte stream,
4082    beginning with the lowest byte...) If we would use big endian
4083    we would need to process it from the highest address to the lowest,
4084    in order to be agnostic to the 32 vs 64 bits issue.
4085
4086    returns 0 on failure, 1 if we successfully received it. */
4087 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4088 {
4089         struct drbd_conf *mdev;
4090         struct bm_xfer_ctx c;
4091         int err;
4092
4093         mdev = vnr_to_mdev(tconn, pi->vnr);
4094         if (!mdev)
4095                 return -EIO;
4096
4097         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4098         /* you are supposed to send additional out-of-sync information
4099          * if you actually set bits during this phase */
4100
4101         c = (struct bm_xfer_ctx) {
4102                 .bm_bits = drbd_bm_bits(mdev),
4103                 .bm_words = drbd_bm_words(mdev),
4104         };
4105
4106         for(;;) {
4107                 if (pi->cmd == P_BITMAP)
4108                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4109                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4110                         /* MAYBE: sanity check that we speak proto >= 90,
4111                          * and the feature is enabled! */
4112                         struct p_compressed_bm *p = pi->data;
4113
4114                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4115                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4116                                 err = -EIO;
4117                                 goto out;
4118                         }
4119                         if (pi->size <= sizeof(*p)) {
4120                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4121                                 err = -EIO;
4122                                 goto out;
4123                         }
4124                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4125                         if (err)
4126                                goto out;
4127                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4128                 } else {
4129                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4130                         err = -EIO;
4131                         goto out;
4132                 }
4133
4134                 c.packets[pi->cmd == P_BITMAP]++;
4135                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4136
4137                 if (err <= 0) {
4138                         if (err < 0)
4139                                 goto out;
4140                         break;
4141                 }
4142                 err = drbd_recv_header(mdev->tconn, pi);
4143                 if (err)
4144                         goto out;
4145         }
4146
4147         INFO_bm_xfer_stats(mdev, "receive", &c);
4148
4149         if (mdev->state.conn == C_WF_BITMAP_T) {
4150                 enum drbd_state_rv rv;
4151
4152                 err = drbd_send_bitmap(mdev);
4153                 if (err)
4154                         goto out;
4155                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4156                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4157                 D_ASSERT(rv == SS_SUCCESS);
4158         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4159                 /* admin may have requested C_DISCONNECTING,
4160                  * other threads may have noticed network errors */
4161                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4162                     drbd_conn_str(mdev->state.conn));
4163         }
4164         err = 0;
4165
4166  out:
4167         drbd_bm_unlock(mdev);
4168         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4169                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4170         return err;
4171 }
4172
4173 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4174 {
4175         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4176                  pi->cmd, pi->size);
4177
4178         return ignore_remaining_packet(tconn, pi);
4179 }
4180
4181 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4182 {
4183         /* Make sure we've acked all the TCP data associated
4184          * with the data requests being unplugged */
4185         drbd_tcp_quickack(tconn->data.socket);
4186
4187         return 0;
4188 }
4189
4190 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4191 {
4192         struct drbd_conf *mdev;
4193         struct p_block_desc *p = pi->data;
4194
4195         mdev = vnr_to_mdev(tconn, pi->vnr);
4196         if (!mdev)
4197                 return -EIO;
4198
4199         switch (mdev->state.conn) {
4200         case C_WF_SYNC_UUID:
4201         case C_WF_BITMAP_T:
4202         case C_BEHIND:
4203                         break;
4204         default:
4205                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4206                                 drbd_conn_str(mdev->state.conn));
4207         }
4208
4209         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4210
4211         return 0;
4212 }
4213
4214 struct data_cmd {
4215         int expect_payload;
4216         size_t pkt_size;
4217         int (*fn)(struct drbd_tconn *, struct packet_info *);
4218 };
4219
4220 static struct data_cmd drbd_cmd_handler[] = {
4221         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4222         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4223         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4224         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4225         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4226         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4227         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4228         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4229         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4230         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4231         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4232         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4233         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4234         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4235         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4236         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4237         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4238         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4239         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4240         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4241         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4242         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4243         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4244         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4245 };
4246
4247 static void drbdd(struct drbd_tconn *tconn)
4248 {
4249         struct packet_info pi;
4250         size_t shs; /* sub header size */
4251         int err;
4252
4253         while (get_t_state(&tconn->receiver) == RUNNING) {
4254                 struct data_cmd *cmd;
4255
4256                 drbd_thread_current_set_cpu(&tconn->receiver);
4257                 if (drbd_recv_header(tconn, &pi))
4258                         goto err_out;
4259
4260                 cmd = &drbd_cmd_handler[pi.cmd];
4261                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4262                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4263                                  cmdname(pi.cmd), pi.cmd);
4264                         goto err_out;
4265                 }
4266
4267                 shs = cmd->pkt_size;
4268                 if (pi.size > shs && !cmd->expect_payload) {
4269                         conn_err(tconn, "No payload expected %s l:%d\n",
4270                                  cmdname(pi.cmd), pi.size);
4271                         goto err_out;
4272                 }
4273
4274                 if (shs) {
4275                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4276                         if (err)
4277                                 goto err_out;
4278                         pi.size -= shs;
4279                 }
4280
4281                 err = cmd->fn(tconn, &pi);
4282                 if (err) {
4283                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4284                                  cmdname(pi.cmd), err, pi.size);
4285                         goto err_out;
4286                 }
4287         }
4288         return;
4289
4290     err_out:
4291         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4292 }
4293
4294 void conn_flush_workqueue(struct drbd_tconn *tconn)
4295 {
4296         struct drbd_wq_barrier barr;
4297
4298         barr.w.cb = w_prev_work_done;
4299         barr.w.tconn = tconn;
4300         init_completion(&barr.done);
4301         drbd_queue_work(&tconn->data.work, &barr.w);
4302         wait_for_completion(&barr.done);
4303 }
4304
4305 static void conn_disconnect(struct drbd_tconn *tconn)
4306 {
4307         struct drbd_conf *mdev;
4308         enum drbd_conns oc;
4309         int vnr, rv = SS_UNKNOWN_ERROR;
4310
4311         if (tconn->cstate == C_STANDALONE)
4312                 return;
4313
4314         /* asender does not clean up anything. it must not interfere, either */
4315         drbd_thread_stop(&tconn->asender);
4316         drbd_free_sock(tconn);
4317
4318         rcu_read_lock();
4319         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4320                 kref_get(&mdev->kref);
4321                 rcu_read_unlock();
4322                 drbd_disconnected(mdev);
4323                 kref_put(&mdev->kref, &drbd_minor_destroy);
4324                 rcu_read_lock();
4325         }
4326         rcu_read_unlock();
4327
4328         conn_info(tconn, "Connection closed\n");
4329
4330         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4331                 conn_try_outdate_peer_async(tconn);
4332
4333         spin_lock_irq(&tconn->req_lock);
4334         oc = tconn->cstate;
4335         if (oc >= C_UNCONNECTED)
4336                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4337
4338         spin_unlock_irq(&tconn->req_lock);
4339
4340         if (oc == C_DISCONNECTING)
4341                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4342 }
4343
4344 static int drbd_disconnected(struct drbd_conf *mdev)
4345 {
4346         enum drbd_fencing_p fp;
4347         unsigned int i;
4348
4349         /* wait for current activity to cease. */
4350         spin_lock_irq(&mdev->tconn->req_lock);
4351         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4352         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4353         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4354         spin_unlock_irq(&mdev->tconn->req_lock);
4355
4356         /* We do not have data structures that would allow us to
4357          * get the rs_pending_cnt down to 0 again.
4358          *  * On C_SYNC_TARGET we do not have any data structures describing
4359          *    the pending RSDataRequest's we have sent.
4360          *  * On C_SYNC_SOURCE there is no data structure that tracks
4361          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4362          *  And no, it is not the sum of the reference counts in the
4363          *  resync_LRU. The resync_LRU tracks the whole operation including
4364          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4365          *  on the fly. */
4366         drbd_rs_cancel_all(mdev);
4367         mdev->rs_total = 0;
4368         mdev->rs_failed = 0;
4369         atomic_set(&mdev->rs_pending_cnt, 0);
4370         wake_up(&mdev->misc_wait);
4371
4372         del_timer_sync(&mdev->resync_timer);
4373         resync_timer_fn((unsigned long)mdev);
4374
4375         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4376          * w_make_resync_request etc. which may still be on the worker queue
4377          * to be "canceled" */
4378         drbd_flush_workqueue(mdev);
4379
4380         drbd_finish_peer_reqs(mdev);
4381
4382         kfree(mdev->p_uuid);
4383         mdev->p_uuid = NULL;
4384
4385         if (!drbd_suspended(mdev))
4386                 tl_clear(mdev->tconn);
4387
4388         drbd_md_sync(mdev);
4389
4390         fp = FP_DONT_CARE;
4391         if (get_ldev(mdev)) {
4392                 rcu_read_lock();
4393                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4394                 rcu_read_unlock();
4395                 put_ldev(mdev);
4396         }
4397
4398         /* serialize with bitmap writeout triggered by the state change,
4399          * if any. */
4400         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4401
4402         /* tcp_close and release of sendpage pages can be deferred.  I don't
4403          * want to use SO_LINGER, because apparently it can be deferred for
4404          * more than 20 seconds (longest time I checked).
4405          *
4406          * Actually we don't care for exactly when the network stack does its
4407          * put_page(), but release our reference on these pages right here.
4408          */
4409         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4410         if (i)
4411                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4412         i = atomic_read(&mdev->pp_in_use_by_net);
4413         if (i)
4414                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4415         i = atomic_read(&mdev->pp_in_use);
4416         if (i)
4417                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4418
4419         D_ASSERT(list_empty(&mdev->read_ee));
4420         D_ASSERT(list_empty(&mdev->active_ee));
4421         D_ASSERT(list_empty(&mdev->sync_ee));
4422         D_ASSERT(list_empty(&mdev->done_ee));
4423
4424         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4425         atomic_set(&mdev->current_epoch->epoch_size, 0);
4426         D_ASSERT(list_empty(&mdev->current_epoch->list));
4427
4428         return 0;
4429 }
4430
4431 /*
4432  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4433  * we can agree on is stored in agreed_pro_version.
4434  *
4435  * feature flags and the reserved array should be enough room for future
4436  * enhancements of the handshake protocol, and possible plugins...
4437  *
4438  * for now, they are expected to be zero, but ignored.
4439  */
4440 static int drbd_send_features(struct drbd_tconn *tconn)
4441 {
4442         struct drbd_socket *sock;
4443         struct p_connection_features *p;
4444
4445         sock = &tconn->data;
4446         p = conn_prepare_command(tconn, sock);
4447         if (!p)
4448                 return -EIO;
4449         memset(p, 0, sizeof(*p));
4450         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4451         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4452         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4453 }
4454
4455 /*
4456  * return values:
4457  *   1 yes, we have a valid connection
4458  *   0 oops, did not work out, please try again
4459  *  -1 peer talks different language,
4460  *     no point in trying again, please go standalone.
4461  */
4462 static int drbd_do_features(struct drbd_tconn *tconn)
4463 {
4464         /* ASSERT current == tconn->receiver ... */
4465         struct p_connection_features *p;
4466         const int expect = sizeof(struct p_connection_features);
4467         struct packet_info pi;
4468         int err;
4469
4470         err = drbd_send_features(tconn);
4471         if (err)
4472                 return 0;
4473
4474         err = drbd_recv_header(tconn, &pi);
4475         if (err)
4476                 return 0;
4477
4478         if (pi.cmd != P_CONNECTION_FEATURES) {
4479                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4480                          cmdname(pi.cmd), pi.cmd);
4481                 return -1;
4482         }
4483
4484         if (pi.size != expect) {
4485                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4486                      expect, pi.size);
4487                 return -1;
4488         }
4489
4490         p = pi.data;
4491         err = drbd_recv_all_warn(tconn, p, expect);
4492         if (err)
4493                 return 0;
4494
4495         p->protocol_min = be32_to_cpu(p->protocol_min);
4496         p->protocol_max = be32_to_cpu(p->protocol_max);
4497         if (p->protocol_max == 0)
4498                 p->protocol_max = p->protocol_min;
4499
4500         if (PRO_VERSION_MAX < p->protocol_min ||
4501             PRO_VERSION_MIN > p->protocol_max)
4502                 goto incompat;
4503
4504         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4505
4506         conn_info(tconn, "Handshake successful: "
4507              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4508
4509         return 1;
4510
4511  incompat:
4512         conn_err(tconn, "incompatible DRBD dialects: "
4513             "I support %d-%d, peer supports %d-%d\n",
4514             PRO_VERSION_MIN, PRO_VERSION_MAX,
4515             p->protocol_min, p->protocol_max);
4516         return -1;
4517 }
4518
4519 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4520 static int drbd_do_auth(struct drbd_tconn *tconn)
4521 {
4522         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4523         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4524         return -1;
4525 }
4526 #else
4527 #define CHALLENGE_LEN 64
4528
4529 /* Return value:
4530         1 - auth succeeded,
4531         0 - failed, try again (network error),
4532         -1 - auth failed, don't try again.
4533 */
4534
4535 static int drbd_do_auth(struct drbd_tconn *tconn)
4536 {
4537         struct drbd_socket *sock;
4538         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4539         struct scatterlist sg;
4540         char *response = NULL;
4541         char *right_response = NULL;
4542         char *peers_ch = NULL;
4543         unsigned int key_len;
4544         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4545         unsigned int resp_size;
4546         struct hash_desc desc;
4547         struct packet_info pi;
4548         struct net_conf *nc;
4549         int err, rv;
4550
4551         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4552
4553         rcu_read_lock();
4554         nc = rcu_dereference(tconn->net_conf);
4555         key_len = strlen(nc->shared_secret);
4556         memcpy(secret, nc->shared_secret, key_len);
4557         rcu_read_unlock();
4558
4559         desc.tfm = tconn->cram_hmac_tfm;
4560         desc.flags = 0;
4561
4562         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4563         if (rv) {
4564                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4565                 rv = -1;
4566                 goto fail;
4567         }
4568
4569         get_random_bytes(my_challenge, CHALLENGE_LEN);
4570
4571         sock = &tconn->data;
4572         if (!conn_prepare_command(tconn, sock)) {
4573                 rv = 0;
4574                 goto fail;
4575         }
4576         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4577                                 my_challenge, CHALLENGE_LEN);
4578         if (!rv)
4579                 goto fail;
4580
4581         err = drbd_recv_header(tconn, &pi);
4582         if (err) {
4583                 rv = 0;
4584                 goto fail;
4585         }
4586
4587         if (pi.cmd != P_AUTH_CHALLENGE) {
4588                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4589                          cmdname(pi.cmd), pi.cmd);
4590                 rv = 0;
4591                 goto fail;
4592         }
4593
4594         if (pi.size > CHALLENGE_LEN * 2) {
4595                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4596                 rv = -1;
4597                 goto fail;
4598         }
4599
4600         peers_ch = kmalloc(pi.size, GFP_NOIO);
4601         if (peers_ch == NULL) {
4602                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4603                 rv = -1;
4604                 goto fail;
4605         }
4606
4607         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4608         if (err) {
4609                 rv = 0;
4610                 goto fail;
4611         }
4612
4613         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4614         response = kmalloc(resp_size, GFP_NOIO);
4615         if (response == NULL) {
4616                 conn_err(tconn, "kmalloc of response failed\n");
4617                 rv = -1;
4618                 goto fail;
4619         }
4620
4621         sg_init_table(&sg, 1);
4622         sg_set_buf(&sg, peers_ch, pi.size);
4623
4624         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4625         if (rv) {
4626                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4627                 rv = -1;
4628                 goto fail;
4629         }
4630
4631         if (!conn_prepare_command(tconn, sock)) {
4632                 rv = 0;
4633                 goto fail;
4634         }
4635         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4636                                 response, resp_size);
4637         if (!rv)
4638                 goto fail;
4639
4640         err = drbd_recv_header(tconn, &pi);
4641         if (err) {
4642                 rv = 0;
4643                 goto fail;
4644         }
4645
4646         if (pi.cmd != P_AUTH_RESPONSE) {
4647                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4648                          cmdname(pi.cmd), pi.cmd);
4649                 rv = 0;
4650                 goto fail;
4651         }
4652
4653         if (pi.size != resp_size) {
4654                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4655                 rv = 0;
4656                 goto fail;
4657         }
4658
4659         err = drbd_recv_all_warn(tconn, response , resp_size);
4660         if (err) {
4661                 rv = 0;
4662                 goto fail;
4663         }
4664
4665         right_response = kmalloc(resp_size, GFP_NOIO);
4666         if (right_response == NULL) {
4667                 conn_err(tconn, "kmalloc of right_response failed\n");
4668                 rv = -1;
4669                 goto fail;
4670         }
4671
4672         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4673
4674         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4675         if (rv) {
4676                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4677                 rv = -1;
4678                 goto fail;
4679         }
4680
4681         rv = !memcmp(response, right_response, resp_size);
4682
4683         if (rv)
4684                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4685                      resp_size);
4686         else
4687                 rv = -1;
4688
4689  fail:
4690         kfree(peers_ch);
4691         kfree(response);
4692         kfree(right_response);
4693
4694         return rv;
4695 }
4696 #endif
4697
4698 int drbdd_init(struct drbd_thread *thi)
4699 {
4700         struct drbd_tconn *tconn = thi->tconn;
4701         int h;
4702
4703         conn_info(tconn, "receiver (re)started\n");
4704
4705         do {
4706                 h = conn_connect(tconn);
4707                 if (h == 0) {
4708                         conn_disconnect(tconn);
4709                         schedule_timeout_interruptible(HZ);
4710                 }
4711                 if (h == -1) {
4712                         conn_warn(tconn, "Discarding network configuration.\n");
4713                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4714                 }
4715         } while (h == 0);
4716
4717         if (h > 0)
4718                 drbdd(tconn);
4719
4720         conn_disconnect(tconn);
4721
4722         conn_info(tconn, "receiver terminated\n");
4723         return 0;
4724 }
4725
4726 /* ********* acknowledge sender ******** */
4727
4728 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4729 {
4730         struct p_req_state_reply *p = pi->data;
4731         int retcode = be32_to_cpu(p->retcode);
4732
4733         if (retcode >= SS_SUCCESS) {
4734                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4735         } else {
4736                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4737                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4738                          drbd_set_st_err_str(retcode), retcode);
4739         }
4740         wake_up(&tconn->ping_wait);
4741
4742         return 0;
4743 }
4744
4745 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4746 {
4747         struct drbd_conf *mdev;
4748         struct p_req_state_reply *p = pi->data;
4749         int retcode = be32_to_cpu(p->retcode);
4750
4751         mdev = vnr_to_mdev(tconn, pi->vnr);
4752         if (!mdev)
4753                 return -EIO;
4754
4755         if (retcode >= SS_SUCCESS) {
4756                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4757         } else {
4758                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4759                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4760                         drbd_set_st_err_str(retcode), retcode);
4761         }
4762         wake_up(&mdev->state_wait);
4763
4764         return 0;
4765 }
4766
4767 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4768 {
4769         return drbd_send_ping_ack(tconn);
4770
4771 }
4772
4773 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4774 {
4775         /* restore idle timeout */
4776         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4777         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4778                 wake_up(&tconn->ping_wait);
4779
4780         return 0;
4781 }
4782
4783 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4784 {
4785         struct drbd_conf *mdev;
4786         struct p_block_ack *p = pi->data;
4787         sector_t sector = be64_to_cpu(p->sector);
4788         int blksize = be32_to_cpu(p->blksize);
4789
4790         mdev = vnr_to_mdev(tconn, pi->vnr);
4791         if (!mdev)
4792                 return -EIO;
4793
4794         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4795
4796         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4797
4798         if (get_ldev(mdev)) {
4799                 drbd_rs_complete_io(mdev, sector);
4800                 drbd_set_in_sync(mdev, sector, blksize);
4801                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4802                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4803                 put_ldev(mdev);
4804         }
4805         dec_rs_pending(mdev);
4806         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4807
4808         return 0;
4809 }
4810
4811 static int
4812 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4813                               struct rb_root *root, const char *func,
4814                               enum drbd_req_event what, bool missing_ok)
4815 {
4816         struct drbd_request *req;
4817         struct bio_and_error m;
4818
4819         spin_lock_irq(&mdev->tconn->req_lock);
4820         req = find_request(mdev, root, id, sector, missing_ok, func);
4821         if (unlikely(!req)) {
4822                 spin_unlock_irq(&mdev->tconn->req_lock);
4823                 return -EIO;
4824         }
4825         __req_mod(req, what, &m);
4826         spin_unlock_irq(&mdev->tconn->req_lock);
4827
4828         if (m.bio)
4829                 complete_master_bio(mdev, &m);
4830         return 0;
4831 }
4832
4833 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4834 {
4835         struct drbd_conf *mdev;
4836         struct p_block_ack *p = pi->data;
4837         sector_t sector = be64_to_cpu(p->sector);
4838         int blksize = be32_to_cpu(p->blksize);
4839         enum drbd_req_event what;
4840
4841         mdev = vnr_to_mdev(tconn, pi->vnr);
4842         if (!mdev)
4843                 return -EIO;
4844
4845         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4846
4847         if (p->block_id == ID_SYNCER) {
4848                 drbd_set_in_sync(mdev, sector, blksize);
4849                 dec_rs_pending(mdev);
4850                 return 0;
4851         }
4852         switch (pi->cmd) {
4853         case P_RS_WRITE_ACK:
4854                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4855                 break;
4856         case P_WRITE_ACK:
4857                 what = WRITE_ACKED_BY_PEER;
4858                 break;
4859         case P_RECV_ACK:
4860                 what = RECV_ACKED_BY_PEER;
4861                 break;
4862         case P_DISCARD_WRITE:
4863                 what = DISCARD_WRITE;
4864                 break;
4865         case P_RETRY_WRITE:
4866                 what = POSTPONE_WRITE;
4867                 break;
4868         default:
4869                 BUG();
4870         }
4871
4872         return validate_req_change_req_state(mdev, p->block_id, sector,
4873                                              &mdev->write_requests, __func__,
4874                                              what, false);
4875 }
4876
4877 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4878 {
4879         struct drbd_conf *mdev;
4880         struct p_block_ack *p = pi->data;
4881         sector_t sector = be64_to_cpu(p->sector);
4882         int size = be32_to_cpu(p->blksize);
4883         int err;
4884
4885         mdev = vnr_to_mdev(tconn, pi->vnr);
4886         if (!mdev)
4887                 return -EIO;
4888
4889         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4890
4891         if (p->block_id == ID_SYNCER) {
4892                 dec_rs_pending(mdev);
4893                 drbd_rs_failed_io(mdev, sector, size);
4894                 return 0;
4895         }
4896
4897         err = validate_req_change_req_state(mdev, p->block_id, sector,
4898                                             &mdev->write_requests, __func__,
4899                                             NEG_ACKED, true);
4900         if (err) {
4901                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4902                    The master bio might already be completed, therefore the
4903                    request is no longer in the collision hash. */
4904                 /* In Protocol B we might already have got a P_RECV_ACK
4905                    but then get a P_NEG_ACK afterwards. */
4906                 drbd_set_out_of_sync(mdev, sector, size);
4907         }
4908         return 0;
4909 }
4910
4911 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4912 {
4913         struct drbd_conf *mdev;
4914         struct p_block_ack *p = pi->data;
4915         sector_t sector = be64_to_cpu(p->sector);
4916
4917         mdev = vnr_to_mdev(tconn, pi->vnr);
4918         if (!mdev)
4919                 return -EIO;
4920
4921         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4922
4923         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4924             (unsigned long long)sector, be32_to_cpu(p->blksize));
4925
4926         return validate_req_change_req_state(mdev, p->block_id, sector,
4927                                              &mdev->read_requests, __func__,
4928                                              NEG_ACKED, false);
4929 }
4930
4931 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4932 {
4933         struct drbd_conf *mdev;
4934         sector_t sector;
4935         int size;
4936         struct p_block_ack *p = pi->data;
4937
4938         mdev = vnr_to_mdev(tconn, pi->vnr);
4939         if (!mdev)
4940                 return -EIO;
4941
4942         sector = be64_to_cpu(p->sector);
4943         size = be32_to_cpu(p->blksize);
4944
4945         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4946
4947         dec_rs_pending(mdev);
4948
4949         if (get_ldev_if_state(mdev, D_FAILED)) {
4950                 drbd_rs_complete_io(mdev, sector);
4951                 switch (pi->cmd) {
4952                 case P_NEG_RS_DREPLY:
4953                         drbd_rs_failed_io(mdev, sector, size);
4954                 case P_RS_CANCEL:
4955                         break;
4956                 default:
4957                         BUG();
4958                 }
4959                 put_ldev(mdev);
4960         }
4961
4962         return 0;
4963 }
4964
4965 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4966 {
4967         struct drbd_conf *mdev;
4968         struct p_barrier_ack *p = pi->data;
4969
4970         mdev = vnr_to_mdev(tconn, pi->vnr);
4971         if (!mdev)
4972                 return -EIO;
4973
4974         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4975
4976         if (mdev->state.conn == C_AHEAD &&
4977             atomic_read(&mdev->ap_in_flight) == 0 &&
4978             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4979                 mdev->start_resync_timer.expires = jiffies + HZ;
4980                 add_timer(&mdev->start_resync_timer);
4981         }
4982
4983         return 0;
4984 }
4985
4986 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4987 {
4988         struct drbd_conf *mdev;
4989         struct p_block_ack *p = pi->data;
4990         struct drbd_work *w;
4991         sector_t sector;
4992         int size;
4993
4994         mdev = vnr_to_mdev(tconn, pi->vnr);
4995         if (!mdev)
4996                 return -EIO;
4997
4998         sector = be64_to_cpu(p->sector);
4999         size = be32_to_cpu(p->blksize);
5000
5001         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5002
5003         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5004                 drbd_ov_out_of_sync_found(mdev, sector, size);
5005         else
5006                 ov_out_of_sync_print(mdev);
5007
5008         if (!get_ldev(mdev))
5009                 return 0;
5010
5011         drbd_rs_complete_io(mdev, sector);
5012         dec_rs_pending(mdev);
5013
5014         --mdev->ov_left;
5015
5016         /* let's advance progress step marks only for every other megabyte */
5017         if ((mdev->ov_left & 0x200) == 0x200)
5018                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5019
5020         if (mdev->ov_left == 0) {
5021                 w = kmalloc(sizeof(*w), GFP_NOIO);
5022                 if (w) {
5023                         w->cb = w_ov_finished;
5024                         w->mdev = mdev;
5025                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5026                 } else {
5027                         dev_err(DEV, "kmalloc(w) failed.");
5028                         ov_out_of_sync_print(mdev);
5029                         drbd_resync_finished(mdev);
5030                 }
5031         }
5032         put_ldev(mdev);
5033         return 0;
5034 }
5035
5036 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5037 {
5038         return 0;
5039 }
5040
5041 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5042 {
5043         struct drbd_conf *mdev;
5044         int vnr, not_empty = 0;
5045
5046         do {
5047                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5048                 flush_signals(current);
5049
5050                 rcu_read_lock();
5051                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5052                         kref_get(&mdev->kref);
5053                         rcu_read_unlock();
5054                         if (drbd_finish_peer_reqs(mdev)) {
5055                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5056                                 return 1;
5057                         }
5058                         kref_put(&mdev->kref, &drbd_minor_destroy);
5059                         rcu_read_lock();
5060                 }
5061                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5062
5063                 spin_lock_irq(&tconn->req_lock);
5064                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5065                         not_empty = !list_empty(&mdev->done_ee);
5066                         if (not_empty)
5067                                 break;
5068                 }
5069                 spin_unlock_irq(&tconn->req_lock);
5070                 rcu_read_unlock();
5071         } while (not_empty);
5072
5073         return 0;
5074 }
5075
5076 struct asender_cmd {
5077         size_t pkt_size;
5078         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5079 };
5080
5081 static struct asender_cmd asender_tbl[] = {
5082         [P_PING]            = { 0, got_Ping },
5083         [P_PING_ACK]        = { 0, got_PingAck },
5084         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5085         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5086         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5087         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5088         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5089         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5090         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5091         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5092         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5093         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5094         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5095         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5096         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5097         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5098         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5099 };
5100
5101 int drbd_asender(struct drbd_thread *thi)
5102 {
5103         struct drbd_tconn *tconn = thi->tconn;
5104         struct asender_cmd *cmd = NULL;
5105         struct packet_info pi;
5106         int rv;
5107         void *buf    = tconn->meta.rbuf;
5108         int received = 0;
5109         unsigned int header_size = drbd_header_size(tconn);
5110         int expect   = header_size;
5111         bool ping_timeout_active = false;
5112         struct net_conf *nc;
5113         int ping_timeo, tcp_cork, ping_int;
5114
5115         current->policy = SCHED_RR;  /* Make this a realtime task! */
5116         current->rt_priority = 2;    /* more important than all other tasks */
5117
5118         while (get_t_state(thi) == RUNNING) {
5119                 drbd_thread_current_set_cpu(thi);
5120
5121                 rcu_read_lock();
5122                 nc = rcu_dereference(tconn->net_conf);
5123                 ping_timeo = nc->ping_timeo;
5124                 tcp_cork = nc->tcp_cork;
5125                 ping_int = nc->ping_int;
5126                 rcu_read_unlock();
5127
5128                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5129                         if (drbd_send_ping(tconn)) {
5130                                 conn_err(tconn, "drbd_send_ping has failed\n");
5131                                 goto reconnect;
5132                         }
5133                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5134                         ping_timeout_active = true;
5135                 }
5136
5137                 /* TODO: conditionally cork; it may hurt latency if we cork without
5138                    much to send */
5139                 if (tcp_cork)
5140                         drbd_tcp_cork(tconn->meta.socket);
5141                 if (tconn_finish_peer_reqs(tconn)) {
5142                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5143                         goto reconnect;
5144                 }
5145                 /* but unconditionally uncork unless disabled */
5146                 if (tcp_cork)
5147                         drbd_tcp_uncork(tconn->meta.socket);
5148
5149                 /* short circuit, recv_msg would return EINTR anyways. */
5150                 if (signal_pending(current))
5151                         continue;
5152
5153                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5154                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5155
5156                 flush_signals(current);
5157
5158                 /* Note:
5159                  * -EINTR        (on meta) we got a signal
5160                  * -EAGAIN       (on meta) rcvtimeo expired
5161                  * -ECONNRESET   other side closed the connection
5162                  * -ERESTARTSYS  (on data) we got a signal
5163                  * rv <  0       other than above: unexpected error!
5164                  * rv == expected: full header or command
5165                  * rv <  expected: "woken" by signal during receive
5166                  * rv == 0       : "connection shut down by peer"
5167                  */
5168                 if (likely(rv > 0)) {
5169                         received += rv;
5170                         buf      += rv;
5171                 } else if (rv == 0) {
5172                         conn_err(tconn, "meta connection shut down by peer.\n");
5173                         goto reconnect;
5174                 } else if (rv == -EAGAIN) {
5175                         /* If the data socket received something meanwhile,
5176                          * that is good enough: peer is still alive. */
5177                         if (time_after(tconn->last_received,
5178                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5179                                 continue;
5180                         if (ping_timeout_active) {
5181                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5182                                 goto reconnect;
5183                         }
5184                         set_bit(SEND_PING, &tconn->flags);
5185                         continue;
5186                 } else if (rv == -EINTR) {
5187                         continue;
5188                 } else {
5189                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5190                         goto reconnect;
5191                 }
5192
5193                 if (received == expect && cmd == NULL) {
5194                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5195                                 goto reconnect;
5196                         cmd = &asender_tbl[pi.cmd];
5197                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5198                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5199                                          cmdname(pi.cmd), pi.cmd);
5200                                 goto disconnect;
5201                         }
5202                         expect = header_size + cmd->pkt_size;
5203                         if (pi.size != expect - header_size) {
5204                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5205                                         pi.cmd, pi.size);
5206                                 goto reconnect;
5207                         }
5208                 }
5209                 if (received == expect) {
5210                         bool err;
5211
5212                         err = cmd->fn(tconn, &pi);
5213                         if (err) {
5214                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5215                                 goto reconnect;
5216                         }
5217
5218                         tconn->last_received = jiffies;
5219
5220                         if (cmd == &asender_tbl[P_PING_ACK]) {
5221                                 /* restore idle timeout */
5222                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5223                                 ping_timeout_active = false;
5224                         }
5225
5226                         buf      = tconn->meta.rbuf;
5227                         received = 0;
5228                         expect   = header_size;
5229                         cmd      = NULL;
5230                 }
5231         }
5232
5233         if (0) {
5234 reconnect:
5235                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5236         }
5237         if (0) {
5238 disconnect:
5239                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5240         }
5241         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5242
5243         conn_info(tconn, "asender terminated\n");
5244
5245         return 0;
5246 }