drbd: Ensure that data_size is not 0 before using data_size-1 as index
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490         __module_get((*newsock)->ops->owner);
491
492 out:
493         return err;
494 }
495
496 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
497 {
498         mm_segment_t oldfs;
499         struct kvec iov = {
500                 .iov_base = buf,
501                 .iov_len = size,
502         };
503         struct msghdr msg = {
504                 .msg_iovlen = 1,
505                 .msg_iov = (struct iovec *)&iov,
506                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
507         };
508         int rv;
509
510         oldfs = get_fs();
511         set_fs(KERNEL_DS);
512         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
513         set_fs(oldfs);
514
515         return rv;
516 }
517
518 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
519 {
520         mm_segment_t oldfs;
521         struct kvec iov = {
522                 .iov_base = buf,
523                 .iov_len = size,
524         };
525         struct msghdr msg = {
526                 .msg_iovlen = 1,
527                 .msg_iov = (struct iovec *)&iov,
528                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
529         };
530         int rv;
531
532         oldfs = get_fs();
533         set_fs(KERNEL_DS);
534
535         for (;;) {
536                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
537                 if (rv == size)
538                         break;
539
540                 /* Note:
541                  * ECONNRESET   other side closed the connection
542                  * ERESTARTSYS  (on  sock) we got a signal
543                  */
544
545                 if (rv < 0) {
546                         if (rv == -ECONNRESET)
547                                 conn_info(tconn, "sock was reset by peer\n");
548                         else if (rv != -ERESTARTSYS)
549                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
550                         break;
551                 } else if (rv == 0) {
552                         conn_info(tconn, "sock was shut down by peer\n");
553                         break;
554                 } else  {
555                         /* signal came in, or peer/link went down,
556                          * after we read a partial message
557                          */
558                         /* D_ASSERT(signal_pending(current)); */
559                         break;
560                 }
561         };
562
563         set_fs(oldfs);
564
565         if (rv != size)
566                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
567
568         return rv;
569 }
570
571 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
572 {
573         int err;
574
575         err = drbd_recv(tconn, buf, size);
576         if (err != size) {
577                 if (err >= 0)
578                         err = -EIO;
579         } else
580                 err = 0;
581         return err;
582 }
583
584 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
585 {
586         int err;
587
588         err = drbd_recv_all(tconn, buf, size);
589         if (err && !signal_pending(current))
590                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
591         return err;
592 }
593
594 /* quoting tcp(7):
595  *   On individual connections, the socket buffer size must be set prior to the
596  *   listen(2) or connect(2) calls in order to have it take effect.
597  * This is our wrapper to do so.
598  */
599 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
600                 unsigned int rcv)
601 {
602         /* open coded SO_SNDBUF, SO_RCVBUF */
603         if (snd) {
604                 sock->sk->sk_sndbuf = snd;
605                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
606         }
607         if (rcv) {
608                 sock->sk->sk_rcvbuf = rcv;
609                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
610         }
611 }
612
613 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
614 {
615         const char *what;
616         struct socket *sock;
617         struct sockaddr_in6 src_in6;
618         struct sockaddr_in6 peer_in6;
619         struct net_conf *nc;
620         int err, peer_addr_len, my_addr_len;
621         int sndbuf_size, rcvbuf_size, connect_int;
622         int disconnect_on_error = 1;
623
624         rcu_read_lock();
625         nc = rcu_dereference(tconn->net_conf);
626         if (!nc) {
627                 rcu_read_unlock();
628                 return NULL;
629         }
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633         rcu_read_unlock();
634
635         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
636         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
637
638         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
639                 src_in6.sin6_port = 0;
640         else
641                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
642
643         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
644         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
645
646         what = "sock_create_kern";
647         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
648                                SOCK_STREAM, IPPROTO_TCP, &sock);
649         if (err < 0) {
650                 sock = NULL;
651                 goto out;
652         }
653
654         sock->sk->sk_rcvtimeo =
655         sock->sk->sk_sndtimeo = connect_int * HZ;
656         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
657
658        /* explicitly bind to the configured IP as source IP
659         *  for the outgoing connections.
660         *  This is needed for multihomed hosts and to be
661         *  able to use lo: interfaces for drbd.
662         * Make sure to use 0 as port number, so linux selects
663         *  a free one dynamically.
664         */
665         what = "bind before connect";
666         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
667         if (err < 0)
668                 goto out;
669
670         /* connect may fail, peer not yet available.
671          * stay C_WF_CONNECTION, don't go Disconnecting! */
672         disconnect_on_error = 0;
673         what = "connect";
674         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
675
676 out:
677         if (err < 0) {
678                 if (sock) {
679                         sock_release(sock);
680                         sock = NULL;
681                 }
682                 switch (-err) {
683                         /* timeout, busy, signal pending */
684                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
685                 case EINTR: case ERESTARTSYS:
686                         /* peer not (yet) available, network problem */
687                 case ECONNREFUSED: case ENETUNREACH:
688                 case EHOSTDOWN:    case EHOSTUNREACH:
689                         disconnect_on_error = 0;
690                         break;
691                 default:
692                         conn_err(tconn, "%s failed, err = %d\n", what, err);
693                 }
694                 if (disconnect_on_error)
695                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
696         }
697
698         return sock;
699 }
700
701 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
702 {
703         int timeo, err, my_addr_len;
704         int sndbuf_size, rcvbuf_size, connect_int;
705         struct socket *s_estab = NULL, *s_listen;
706         struct sockaddr_in6 my_addr;
707         struct net_conf *nc;
708         const char *what;
709
710         rcu_read_lock();
711         nc = rcu_dereference(tconn->net_conf);
712         if (!nc) {
713                 rcu_read_unlock();
714                 return NULL;
715         }
716         sndbuf_size = nc->sndbuf_size;
717         rcvbuf_size = nc->rcvbuf_size;
718         connect_int = nc->connect_int;
719         rcu_read_unlock();
720
721         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
722         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
723
724         what = "sock_create_kern";
725         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
726                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
727         if (err) {
728                 s_listen = NULL;
729                 goto out;
730         }
731
732         timeo = connect_int * HZ;
733         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
734
735         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
736         s_listen->sk->sk_rcvtimeo = timeo;
737         s_listen->sk->sk_sndtimeo = timeo;
738         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
739
740         what = "bind before listen";
741         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
742         if (err < 0)
743                 goto out;
744
745         err = drbd_accept(&what, s_listen, &s_estab);
746
747 out:
748         if (s_listen)
749                 sock_release(s_listen);
750         if (err < 0) {
751                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
752                         conn_err(tconn, "%s failed, err = %d\n", what, err);
753                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
754                 }
755         }
756
757         return s_estab;
758 }
759
760 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
761
762 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
763                              enum drbd_packet cmd)
764 {
765         if (!conn_prepare_command(tconn, sock))
766                 return -EIO;
767         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
768 }
769
770 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
771 {
772         unsigned int header_size = drbd_header_size(tconn);
773         struct packet_info pi;
774         int err;
775
776         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
777         if (err != header_size) {
778                 if (err >= 0)
779                         err = -EIO;
780                 return err;
781         }
782         err = decode_header(tconn, tconn->data.rbuf, &pi);
783         if (err)
784                 return err;
785         return pi.cmd;
786 }
787
788 /**
789  * drbd_socket_okay() - Free the socket if its connection is not okay
790  * @sock:       pointer to the pointer to the socket.
791  */
792 static int drbd_socket_okay(struct socket **sock)
793 {
794         int rr;
795         char tb[4];
796
797         if (!*sock)
798                 return false;
799
800         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
801
802         if (rr > 0 || rr == -EAGAIN) {
803                 return true;
804         } else {
805                 sock_release(*sock);
806                 *sock = NULL;
807                 return false;
808         }
809 }
810 /* Gets called if a connection is established, or if a new minor gets created
811    in a connection */
812 int drbd_connected(struct drbd_conf *mdev)
813 {
814         int err;
815
816         atomic_set(&mdev->packet_seq, 0);
817         mdev->peer_seq = 0;
818
819         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
820                 &mdev->tconn->cstate_mutex :
821                 &mdev->own_state_mutex;
822
823         err = drbd_send_sync_param(mdev);
824         if (!err)
825                 err = drbd_send_sizes(mdev, 0, 0);
826         if (!err)
827                 err = drbd_send_uuids(mdev);
828         if (!err)
829                 err = drbd_send_current_state(mdev);
830         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
831         clear_bit(RESIZE_PENDING, &mdev->flags);
832         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
833         return err;
834 }
835
836 /*
837  * return values:
838  *   1 yes, we have a valid connection
839  *   0 oops, did not work out, please try again
840  *  -1 peer talks different language,
841  *     no point in trying again, please go standalone.
842  *  -2 We do not have a network config...
843  */
844 static int conn_connect(struct drbd_tconn *tconn)
845 {
846         struct drbd_socket sock, msock;
847         struct drbd_conf *mdev;
848         struct net_conf *nc;
849         int vnr, timeout, try, h, ok;
850         bool discard_my_data;
851         enum drbd_state_rv rv;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         mutex_init(&sock.mutex);
857         sock.sbuf = tconn->data.sbuf;
858         sock.rbuf = tconn->data.rbuf;
859         sock.socket = NULL;
860         mutex_init(&msock.mutex);
861         msock.sbuf = tconn->meta.sbuf;
862         msock.rbuf = tconn->meta.rbuf;
863         msock.socket = NULL;
864
865         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
866
867         /* Assume that the peer only understands protocol 80 until we know better.  */
868         tconn->agreed_pro_version = 80;
869
870         do {
871                 struct socket *s;
872
873                 for (try = 0;;) {
874                         /* 3 tries, this should take less than a second! */
875                         s = drbd_try_connect(tconn);
876                         if (s || ++try >= 3)
877                                 break;
878                         /* give the other side time to call bind() & listen() */
879                         schedule_timeout_interruptible(HZ / 10);
880                 }
881
882                 if (s) {
883                         if (!sock.socket) {
884                                 sock.socket = s;
885                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
886                         } else if (!msock.socket) {
887                                 msock.socket = s;
888                                 send_first_packet(tconn, &msock, P_INITIAL_META);
889                         } else {
890                                 conn_err(tconn, "Logic error in conn_connect()\n");
891                                 goto out_release_sockets;
892                         }
893                 }
894
895                 if (sock.socket && msock.socket) {
896                         rcu_read_lock();
897                         nc = rcu_dereference(tconn->net_conf);
898                         timeout = nc->ping_timeo * HZ / 10;
899                         rcu_read_unlock();
900                         schedule_timeout_interruptible(timeout);
901                         ok = drbd_socket_okay(&sock.socket);
902                         ok = drbd_socket_okay(&msock.socket) && ok;
903                         if (ok)
904                                 break;
905                 }
906
907 retry:
908                 s = drbd_wait_for_connect(tconn);
909                 if (s) {
910                         try = receive_first_packet(tconn, s);
911                         drbd_socket_okay(&sock.socket);
912                         drbd_socket_okay(&msock.socket);
913                         switch (try) {
914                         case P_INITIAL_DATA:
915                                 if (sock.socket) {
916                                         conn_warn(tconn, "initial packet S crossed\n");
917                                         sock_release(sock.socket);
918                                 }
919                                 sock.socket = s;
920                                 break;
921                         case P_INITIAL_META:
922                                 if (msock.socket) {
923                                         conn_warn(tconn, "initial packet M crossed\n");
924                                         sock_release(msock.socket);
925                                 }
926                                 msock.socket = s;
927                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
928                                 break;
929                         default:
930                                 conn_warn(tconn, "Error receiving initial packet\n");
931                                 sock_release(s);
932                                 if (random32() & 1)
933                                         goto retry;
934                         }
935                 }
936
937                 if (tconn->cstate <= C_DISCONNECTING)
938                         goto out_release_sockets;
939                 if (signal_pending(current)) {
940                         flush_signals(current);
941                         smp_rmb();
942                         if (get_t_state(&tconn->receiver) == EXITING)
943                                 goto out_release_sockets;
944                 }
945
946                 if (sock.socket && &msock.socket) {
947                         ok = drbd_socket_okay(&sock.socket);
948                         ok = drbd_socket_okay(&msock.socket) && ok;
949                         if (ok)
950                                 break;
951                 }
952         } while (1);
953
954         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
955         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
956
957         sock.socket->sk->sk_allocation = GFP_NOIO;
958         msock.socket->sk->sk_allocation = GFP_NOIO;
959
960         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
961         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
962
963         /* NOT YET ...
964          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
965          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
966          * first set it to the P_CONNECTION_FEATURES timeout,
967          * which we set to 4x the configured ping_timeout. */
968         rcu_read_lock();
969         nc = rcu_dereference(tconn->net_conf);
970
971         sock.socket->sk->sk_sndtimeo =
972         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
973
974         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
975         timeout = nc->timeout * HZ / 10;
976         discard_my_data = nc->discard_my_data;
977         rcu_read_unlock();
978
979         msock.socket->sk->sk_sndtimeo = timeout;
980
981         /* we don't want delays.
982          * we use TCP_CORK where appropriate, though */
983         drbd_tcp_nodelay(sock.socket);
984         drbd_tcp_nodelay(msock.socket);
985
986         tconn->data.socket = sock.socket;
987         tconn->meta.socket = msock.socket;
988         tconn->last_received = jiffies;
989
990         h = drbd_do_features(tconn);
991         if (h <= 0)
992                 return h;
993
994         if (tconn->cram_hmac_tfm) {
995                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
996                 switch (drbd_do_auth(tconn)) {
997                 case -1:
998                         conn_err(tconn, "Authentication of peer failed\n");
999                         return -1;
1000                 case 0:
1001                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
1002                         return 0;
1003                 }
1004         }
1005
1006         tconn->data.socket->sk->sk_sndtimeo = timeout;
1007         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1008
1009         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1010                 return -1;
1011
1012         set_bit(STATE_SENT, &tconn->flags);
1013
1014         rcu_read_lock();
1015         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1016                 kref_get(&mdev->kref);
1017                 rcu_read_unlock();
1018
1019                 if (discard_my_data)
1020                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1021                 else
1022                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1023
1024                 drbd_connected(mdev);
1025                 kref_put(&mdev->kref, &drbd_minor_destroy);
1026                 rcu_read_lock();
1027         }
1028         rcu_read_unlock();
1029
1030         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1031         if (rv < SS_SUCCESS) {
1032                 clear_bit(STATE_SENT, &tconn->flags);
1033                 return 0;
1034         }
1035
1036         drbd_thread_start(&tconn->asender);
1037
1038         mutex_lock(&tconn->conf_update);
1039         /* The discard_my_data flag is a single-shot modifier to the next
1040          * connection attempt, the handshake of which is now well underway.
1041          * No need for rcu style copying of the whole struct
1042          * just to clear a single value. */
1043         tconn->net_conf->discard_my_data = 0;
1044         mutex_unlock(&tconn->conf_update);
1045
1046         return h;
1047
1048 out_release_sockets:
1049         if (sock.socket)
1050                 sock_release(sock.socket);
1051         if (msock.socket)
1052                 sock_release(msock.socket);
1053         return -1;
1054 }
1055
1056 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1057 {
1058         unsigned int header_size = drbd_header_size(tconn);
1059
1060         if (header_size == sizeof(struct p_header100) &&
1061             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1062                 struct p_header100 *h = header;
1063                 if (h->pad != 0) {
1064                         conn_err(tconn, "Header padding is not zero\n");
1065                         return -EINVAL;
1066                 }
1067                 pi->vnr = be16_to_cpu(h->volume);
1068                 pi->cmd = be16_to_cpu(h->command);
1069                 pi->size = be32_to_cpu(h->length);
1070         } else if (header_size == sizeof(struct p_header95) &&
1071                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1072                 struct p_header95 *h = header;
1073                 pi->cmd = be16_to_cpu(h->command);
1074                 pi->size = be32_to_cpu(h->length);
1075                 pi->vnr = 0;
1076         } else if (header_size == sizeof(struct p_header80) &&
1077                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1078                 struct p_header80 *h = header;
1079                 pi->cmd = be16_to_cpu(h->command);
1080                 pi->size = be16_to_cpu(h->length);
1081                 pi->vnr = 0;
1082         } else {
1083                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1084                          be32_to_cpu(*(__be32 *)header),
1085                          tconn->agreed_pro_version);
1086                 return -EINVAL;
1087         }
1088         pi->data = header + header_size;
1089         return 0;
1090 }
1091
1092 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1093 {
1094         void *buffer = tconn->data.rbuf;
1095         int err;
1096
1097         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1098         if (err)
1099                 return err;
1100
1101         err = decode_header(tconn, buffer, pi);
1102         tconn->last_received = jiffies;
1103
1104         return err;
1105 }
1106
1107 static void drbd_flush(struct drbd_tconn *tconn)
1108 {
1109         int rv;
1110         struct drbd_conf *mdev;
1111         int vnr;
1112
1113         if (tconn->write_ordering >= WO_bdev_flush) {
1114                 rcu_read_lock();
1115                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1116                         if (!get_ldev(mdev))
1117                                 continue;
1118                         kref_get(&mdev->kref);
1119                         rcu_read_unlock();
1120
1121                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1122                                         GFP_NOIO, NULL);
1123                         if (rv) {
1124                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1125                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1126                                  * don't try again for ANY return value != 0
1127                                  * if (rv == -EOPNOTSUPP) */
1128                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1129                         }
1130                         put_ldev(mdev);
1131                         kref_put(&mdev->kref, &drbd_minor_destroy);
1132
1133                         rcu_read_lock();
1134                         if (rv)
1135                                 break;
1136                 }
1137                 rcu_read_unlock();
1138         }
1139 }
1140
1141 /**
1142  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1143  * @mdev:       DRBD device.
1144  * @epoch:      Epoch object.
1145  * @ev:         Epoch event.
1146  */
1147 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1148                                                struct drbd_epoch *epoch,
1149                                                enum epoch_event ev)
1150 {
1151         int epoch_size;
1152         struct drbd_epoch *next_epoch;
1153         enum finish_epoch rv = FE_STILL_LIVE;
1154
1155         spin_lock(&tconn->epoch_lock);
1156         do {
1157                 next_epoch = NULL;
1158
1159                 epoch_size = atomic_read(&epoch->epoch_size);
1160
1161                 switch (ev & ~EV_CLEANUP) {
1162                 case EV_PUT:
1163                         atomic_dec(&epoch->active);
1164                         break;
1165                 case EV_GOT_BARRIER_NR:
1166                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1167                         break;
1168                 case EV_BECAME_LAST:
1169                         /* nothing to do*/
1170                         break;
1171                 }
1172
1173                 if (epoch_size != 0 &&
1174                     atomic_read(&epoch->active) == 0 &&
1175                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1176                         if (!(ev & EV_CLEANUP)) {
1177                                 spin_unlock(&tconn->epoch_lock);
1178                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1179                                 spin_lock(&tconn->epoch_lock);
1180                         }
1181 #if 0
1182                         /* FIXME: dec unacked on connection, once we have
1183                          * something to count pending connection packets in. */
1184                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1185                                 dec_unacked(epoch->tconn);
1186 #endif
1187
1188                         if (tconn->current_epoch != epoch) {
1189                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1190                                 list_del(&epoch->list);
1191                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1192                                 tconn->epochs--;
1193                                 kfree(epoch);
1194
1195                                 if (rv == FE_STILL_LIVE)
1196                                         rv = FE_DESTROYED;
1197                         } else {
1198                                 epoch->flags = 0;
1199                                 atomic_set(&epoch->epoch_size, 0);
1200                                 /* atomic_set(&epoch->active, 0); is already zero */
1201                                 if (rv == FE_STILL_LIVE)
1202                                         rv = FE_RECYCLED;
1203                         }
1204                 }
1205
1206                 if (!next_epoch)
1207                         break;
1208
1209                 epoch = next_epoch;
1210         } while (1);
1211
1212         spin_unlock(&tconn->epoch_lock);
1213
1214         return rv;
1215 }
1216
1217 /**
1218  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1219  * @tconn:      DRBD connection.
1220  * @wo:         Write ordering method to try.
1221  */
1222 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1223 {
1224         struct disk_conf *dc;
1225         struct drbd_conf *mdev;
1226         enum write_ordering_e pwo;
1227         int vnr;
1228         static char *write_ordering_str[] = {
1229                 [WO_none] = "none",
1230                 [WO_drain_io] = "drain",
1231                 [WO_bdev_flush] = "flush",
1232         };
1233
1234         pwo = tconn->write_ordering;
1235         wo = min(pwo, wo);
1236         rcu_read_lock();
1237         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1238                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1239                         continue;
1240                 dc = rcu_dereference(mdev->ldev->disk_conf);
1241
1242                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1243                         wo = WO_drain_io;
1244                 if (wo == WO_drain_io && !dc->disk_drain)
1245                         wo = WO_none;
1246                 put_ldev(mdev);
1247         }
1248         rcu_read_unlock();
1249         tconn->write_ordering = wo;
1250         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1251                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1252 }
1253
1254 /**
1255  * drbd_submit_peer_request()
1256  * @mdev:       DRBD device.
1257  * @peer_req:   peer request
1258  * @rw:         flag field, see bio->bi_rw
1259  *
1260  * May spread the pages to multiple bios,
1261  * depending on bio_add_page restrictions.
1262  *
1263  * Returns 0 if all bios have been submitted,
1264  * -ENOMEM if we could not allocate enough bios,
1265  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1266  *  single page to an empty bio (which should never happen and likely indicates
1267  *  that the lower level IO stack is in some way broken). This has been observed
1268  *  on certain Xen deployments.
1269  */
1270 /* TODO allocate from our own bio_set. */
1271 int drbd_submit_peer_request(struct drbd_conf *mdev,
1272                              struct drbd_peer_request *peer_req,
1273                              const unsigned rw, const int fault_type)
1274 {
1275         struct bio *bios = NULL;
1276         struct bio *bio;
1277         struct page *page = peer_req->pages;
1278         sector_t sector = peer_req->i.sector;
1279         unsigned ds = peer_req->i.size;
1280         unsigned n_bios = 0;
1281         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1282         int err = -ENOMEM;
1283
1284         /* In most cases, we will only need one bio.  But in case the lower
1285          * level restrictions happen to be different at this offset on this
1286          * side than those of the sending peer, we may need to submit the
1287          * request in more than one bio.
1288          *
1289          * Plain bio_alloc is good enough here, this is no DRBD internally
1290          * generated bio, but a bio allocated on behalf of the peer.
1291          */
1292 next_bio:
1293         bio = bio_alloc(GFP_NOIO, nr_pages);
1294         if (!bio) {
1295                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1296                 goto fail;
1297         }
1298         /* > peer_req->i.sector, unless this is the first bio */
1299         bio->bi_sector = sector;
1300         bio->bi_bdev = mdev->ldev->backing_bdev;
1301         bio->bi_rw = rw;
1302         bio->bi_private = peer_req;
1303         bio->bi_end_io = drbd_peer_request_endio;
1304
1305         bio->bi_next = bios;
1306         bios = bio;
1307         ++n_bios;
1308
1309         page_chain_for_each(page) {
1310                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1311                 if (!bio_add_page(bio, page, len, 0)) {
1312                         /* A single page must always be possible!
1313                          * But in case it fails anyways,
1314                          * we deal with it, and complain (below). */
1315                         if (bio->bi_vcnt == 0) {
1316                                 dev_err(DEV,
1317                                         "bio_add_page failed for len=%u, "
1318                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1319                                         len, (unsigned long long)bio->bi_sector);
1320                                 err = -ENOSPC;
1321                                 goto fail;
1322                         }
1323                         goto next_bio;
1324                 }
1325                 ds -= len;
1326                 sector += len >> 9;
1327                 --nr_pages;
1328         }
1329         D_ASSERT(page == NULL);
1330         D_ASSERT(ds == 0);
1331
1332         atomic_set(&peer_req->pending_bios, n_bios);
1333         do {
1334                 bio = bios;
1335                 bios = bios->bi_next;
1336                 bio->bi_next = NULL;
1337
1338                 drbd_generic_make_request(mdev, fault_type, bio);
1339         } while (bios);
1340         return 0;
1341
1342 fail:
1343         while (bios) {
1344                 bio = bios;
1345                 bios = bios->bi_next;
1346                 bio_put(bio);
1347         }
1348         return err;
1349 }
1350
1351 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1352                                              struct drbd_peer_request *peer_req)
1353 {
1354         struct drbd_interval *i = &peer_req->i;
1355
1356         drbd_remove_interval(&mdev->write_requests, i);
1357         drbd_clear_interval(i);
1358
1359         /* Wake up any processes waiting for this peer request to complete.  */
1360         if (i->waiting)
1361                 wake_up(&mdev->misc_wait);
1362 }
1363
1364 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1365 {
1366         struct drbd_conf *mdev;
1367         int vnr;
1368
1369         rcu_read_lock();
1370         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1371                 kref_get(&mdev->kref);
1372                 rcu_read_unlock();
1373                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1374                 kref_put(&mdev->kref, &drbd_minor_destroy);
1375                 rcu_read_lock();
1376         }
1377         rcu_read_unlock();
1378 }
1379
1380 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1381 {
1382         int rv;
1383         struct p_barrier *p = pi->data;
1384         struct drbd_epoch *epoch;
1385
1386         /* FIXME these are unacked on connection,
1387          * not a specific (peer)device.
1388          */
1389         tconn->current_epoch->barrier_nr = p->barrier;
1390         tconn->current_epoch->tconn = tconn;
1391         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1392
1393         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1394          * the activity log, which means it would not be resynced in case the
1395          * R_PRIMARY crashes now.
1396          * Therefore we must send the barrier_ack after the barrier request was
1397          * completed. */
1398         switch (tconn->write_ordering) {
1399         case WO_none:
1400                 if (rv == FE_RECYCLED)
1401                         return 0;
1402
1403                 /* receiver context, in the writeout path of the other node.
1404                  * avoid potential distributed deadlock */
1405                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1406                 if (epoch)
1407                         break;
1408                 else
1409                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1410                         /* Fall through */
1411
1412         case WO_bdev_flush:
1413         case WO_drain_io:
1414                 conn_wait_active_ee_empty(tconn);
1415                 drbd_flush(tconn);
1416
1417                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1418                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1419                         if (epoch)
1420                                 break;
1421                 }
1422
1423                 return 0;
1424         default:
1425                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1426                 return -EIO;
1427         }
1428
1429         epoch->flags = 0;
1430         atomic_set(&epoch->epoch_size, 0);
1431         atomic_set(&epoch->active, 0);
1432
1433         spin_lock(&tconn->epoch_lock);
1434         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1435                 list_add(&epoch->list, &tconn->current_epoch->list);
1436                 tconn->current_epoch = epoch;
1437                 tconn->epochs++;
1438         } else {
1439                 /* The current_epoch got recycled while we allocated this one... */
1440                 kfree(epoch);
1441         }
1442         spin_unlock(&tconn->epoch_lock);
1443
1444         return 0;
1445 }
1446
1447 /* used from receive_RSDataReply (recv_resync_read)
1448  * and from receive_Data */
1449 static struct drbd_peer_request *
1450 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1451               int data_size) __must_hold(local)
1452 {
1453         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1454         struct drbd_peer_request *peer_req;
1455         struct page *page;
1456         int dgs, ds, err;
1457         void *dig_in = mdev->tconn->int_dig_in;
1458         void *dig_vv = mdev->tconn->int_dig_vv;
1459         unsigned long *data;
1460
1461         dgs = 0;
1462         if (mdev->tconn->peer_integrity_tfm) {
1463                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1464                 /*
1465                  * FIXME: Receive the incoming digest into the receive buffer
1466                  *        here, together with its struct p_data?
1467                  */
1468                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1469                 if (err)
1470                         return NULL;
1471                 data_size -= dgs;
1472         }
1473
1474         if (!expect(data_size != 0))
1475                 return NULL;
1476         if (!expect(IS_ALIGNED(data_size, 512)))
1477                 return NULL;
1478         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1479                 return NULL;
1480
1481         /* even though we trust out peer,
1482          * we sometimes have to double check. */
1483         if (sector + (data_size>>9) > capacity) {
1484                 dev_err(DEV, "request from peer beyond end of local disk: "
1485                         "capacity: %llus < sector: %llus + size: %u\n",
1486                         (unsigned long long)capacity,
1487                         (unsigned long long)sector, data_size);
1488                 return NULL;
1489         }
1490
1491         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1492          * "criss-cross" setup, that might cause write-out on some other DRBD,
1493          * which in turn might block on the other node at this very place.  */
1494         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1495         if (!peer_req)
1496                 return NULL;
1497
1498         ds = data_size;
1499         page = peer_req->pages;
1500         page_chain_for_each(page) {
1501                 unsigned len = min_t(int, ds, PAGE_SIZE);
1502                 data = kmap(page);
1503                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1504                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1505                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1506                         data[0] = data[0] ^ (unsigned long)-1;
1507                 }
1508                 kunmap(page);
1509                 if (err) {
1510                         drbd_free_peer_req(mdev, peer_req);
1511                         return NULL;
1512                 }
1513                 ds -= len;
1514         }
1515
1516         if (dgs) {
1517                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1518                 if (memcmp(dig_in, dig_vv, dgs)) {
1519                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1520                                 (unsigned long long)sector, data_size);
1521                         drbd_free_peer_req(mdev, peer_req);
1522                         return NULL;
1523                 }
1524         }
1525         mdev->recv_cnt += data_size>>9;
1526         return peer_req;
1527 }
1528
1529 /* drbd_drain_block() just takes a data block
1530  * out of the socket input buffer, and discards it.
1531  */
1532 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1533 {
1534         struct page *page;
1535         int err = 0;
1536         void *data;
1537
1538         if (!data_size)
1539                 return 0;
1540
1541         page = drbd_alloc_pages(mdev, 1, 1);
1542
1543         data = kmap(page);
1544         while (data_size) {
1545                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1546
1547                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1548                 if (err)
1549                         break;
1550                 data_size -= len;
1551         }
1552         kunmap(page);
1553         drbd_free_pages(mdev, page, 0);
1554         return err;
1555 }
1556
1557 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1558                            sector_t sector, int data_size)
1559 {
1560         struct bio_vec *bvec;
1561         struct bio *bio;
1562         int dgs, err, i, expect;
1563         void *dig_in = mdev->tconn->int_dig_in;
1564         void *dig_vv = mdev->tconn->int_dig_vv;
1565
1566         dgs = 0;
1567         if (mdev->tconn->peer_integrity_tfm) {
1568                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1569                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1570                 if (err)
1571                         return err;
1572                 data_size -= dgs;
1573         }
1574
1575         /* optimistically update recv_cnt.  if receiving fails below,
1576          * we disconnect anyways, and counters will be reset. */
1577         mdev->recv_cnt += data_size>>9;
1578
1579         bio = req->master_bio;
1580         D_ASSERT(sector == bio->bi_sector);
1581
1582         bio_for_each_segment(bvec, bio, i) {
1583                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1584                 expect = min_t(int, data_size, bvec->bv_len);
1585                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1586                 kunmap(bvec->bv_page);
1587                 if (err)
1588                         return err;
1589                 data_size -= expect;
1590         }
1591
1592         if (dgs) {
1593                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1594                 if (memcmp(dig_in, dig_vv, dgs)) {
1595                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1596                         return -EINVAL;
1597                 }
1598         }
1599
1600         D_ASSERT(data_size == 0);
1601         return 0;
1602 }
1603
1604 /*
1605  * e_end_resync_block() is called in asender context via
1606  * drbd_finish_peer_reqs().
1607  */
1608 static int e_end_resync_block(struct drbd_work *w, int unused)
1609 {
1610         struct drbd_peer_request *peer_req =
1611                 container_of(w, struct drbd_peer_request, w);
1612         struct drbd_conf *mdev = w->mdev;
1613         sector_t sector = peer_req->i.sector;
1614         int err;
1615
1616         D_ASSERT(drbd_interval_empty(&peer_req->i));
1617
1618         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1619                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1620                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1621         } else {
1622                 /* Record failure to sync */
1623                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1624
1625                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1626         }
1627         dec_unacked(mdev);
1628
1629         return err;
1630 }
1631
1632 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1633 {
1634         struct drbd_peer_request *peer_req;
1635
1636         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1637         if (!peer_req)
1638                 goto fail;
1639
1640         dec_rs_pending(mdev);
1641
1642         inc_unacked(mdev);
1643         /* corresponding dec_unacked() in e_end_resync_block()
1644          * respective _drbd_clear_done_ee */
1645
1646         peer_req->w.cb = e_end_resync_block;
1647
1648         spin_lock_irq(&mdev->tconn->req_lock);
1649         list_add(&peer_req->w.list, &mdev->sync_ee);
1650         spin_unlock_irq(&mdev->tconn->req_lock);
1651
1652         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1653         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1654                 return 0;
1655
1656         /* don't care for the reason here */
1657         dev_err(DEV, "submit failed, triggering re-connect\n");
1658         spin_lock_irq(&mdev->tconn->req_lock);
1659         list_del(&peer_req->w.list);
1660         spin_unlock_irq(&mdev->tconn->req_lock);
1661
1662         drbd_free_peer_req(mdev, peer_req);
1663 fail:
1664         put_ldev(mdev);
1665         return -EIO;
1666 }
1667
1668 static struct drbd_request *
1669 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1670              sector_t sector, bool missing_ok, const char *func)
1671 {
1672         struct drbd_request *req;
1673
1674         /* Request object according to our peer */
1675         req = (struct drbd_request *)(unsigned long)id;
1676         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1677                 return req;
1678         if (!missing_ok) {
1679                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1680                         (unsigned long)id, (unsigned long long)sector);
1681         }
1682         return NULL;
1683 }
1684
1685 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1686 {
1687         struct drbd_conf *mdev;
1688         struct drbd_request *req;
1689         sector_t sector;
1690         int err;
1691         struct p_data *p = pi->data;
1692
1693         mdev = vnr_to_mdev(tconn, pi->vnr);
1694         if (!mdev)
1695                 return -EIO;
1696
1697         sector = be64_to_cpu(p->sector);
1698
1699         spin_lock_irq(&mdev->tconn->req_lock);
1700         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1701         spin_unlock_irq(&mdev->tconn->req_lock);
1702         if (unlikely(!req))
1703                 return -EIO;
1704
1705         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1706          * special casing it there for the various failure cases.
1707          * still no race with drbd_fail_pending_reads */
1708         err = recv_dless_read(mdev, req, sector, pi->size);
1709         if (!err)
1710                 req_mod(req, DATA_RECEIVED);
1711         /* else: nothing. handled from drbd_disconnect...
1712          * I don't think we may complete this just yet
1713          * in case we are "on-disconnect: freeze" */
1714
1715         return err;
1716 }
1717
1718 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1719 {
1720         struct drbd_conf *mdev;
1721         sector_t sector;
1722         int err;
1723         struct p_data *p = pi->data;
1724
1725         mdev = vnr_to_mdev(tconn, pi->vnr);
1726         if (!mdev)
1727                 return -EIO;
1728
1729         sector = be64_to_cpu(p->sector);
1730         D_ASSERT(p->block_id == ID_SYNCER);
1731
1732         if (get_ldev(mdev)) {
1733                 /* data is submitted to disk within recv_resync_read.
1734                  * corresponding put_ldev done below on error,
1735                  * or in drbd_peer_request_endio. */
1736                 err = recv_resync_read(mdev, sector, pi->size);
1737         } else {
1738                 if (__ratelimit(&drbd_ratelimit_state))
1739                         dev_err(DEV, "Can not write resync data to local disk.\n");
1740
1741                 err = drbd_drain_block(mdev, pi->size);
1742
1743                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1744         }
1745
1746         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1747
1748         return err;
1749 }
1750
1751 static void restart_conflicting_writes(struct drbd_conf *mdev,
1752                                        sector_t sector, int size)
1753 {
1754         struct drbd_interval *i;
1755         struct drbd_request *req;
1756
1757         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1758                 if (!i->local)
1759                         continue;
1760                 req = container_of(i, struct drbd_request, i);
1761                 if (req->rq_state & RQ_LOCAL_PENDING ||
1762                     !(req->rq_state & RQ_POSTPONED))
1763                         continue;
1764                 /* as it is RQ_POSTPONED, this will cause it to
1765                  * be queued on the retry workqueue. */
1766                 __req_mod(req, DISCARD_WRITE, NULL);
1767         }
1768 }
1769
1770 /*
1771  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1772  */
1773 static int e_end_block(struct drbd_work *w, int cancel)
1774 {
1775         struct drbd_peer_request *peer_req =
1776                 container_of(w, struct drbd_peer_request, w);
1777         struct drbd_conf *mdev = w->mdev;
1778         sector_t sector = peer_req->i.sector;
1779         int err = 0, pcmd;
1780
1781         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1782                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1783                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1784                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1785                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1786                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1787                         err = drbd_send_ack(mdev, pcmd, peer_req);
1788                         if (pcmd == P_RS_WRITE_ACK)
1789                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1790                 } else {
1791                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1792                         /* we expect it to be marked out of sync anyways...
1793                          * maybe assert this?  */
1794                 }
1795                 dec_unacked(mdev);
1796         }
1797         /* we delete from the conflict detection hash _after_ we sent out the
1798          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1799         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1800                 spin_lock_irq(&mdev->tconn->req_lock);
1801                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1802                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1803                 if (peer_req->flags & EE_RESTART_REQUESTS)
1804                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1805                 spin_unlock_irq(&mdev->tconn->req_lock);
1806         } else
1807                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1808
1809         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1810
1811         return err;
1812 }
1813
1814 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1815 {
1816         struct drbd_conf *mdev = w->mdev;
1817         struct drbd_peer_request *peer_req =
1818                 container_of(w, struct drbd_peer_request, w);
1819         int err;
1820
1821         err = drbd_send_ack(mdev, ack, peer_req);
1822         dec_unacked(mdev);
1823
1824         return err;
1825 }
1826
1827 static int e_send_discard_write(struct drbd_work *w, int unused)
1828 {
1829         return e_send_ack(w, P_DISCARD_WRITE);
1830 }
1831
1832 static int e_send_retry_write(struct drbd_work *w, int unused)
1833 {
1834         struct drbd_tconn *tconn = w->mdev->tconn;
1835
1836         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1837                              P_RETRY_WRITE : P_DISCARD_WRITE);
1838 }
1839
1840 static bool seq_greater(u32 a, u32 b)
1841 {
1842         /*
1843          * We assume 32-bit wrap-around here.
1844          * For 24-bit wrap-around, we would have to shift:
1845          *  a <<= 8; b <<= 8;
1846          */
1847         return (s32)a - (s32)b > 0;
1848 }
1849
1850 static u32 seq_max(u32 a, u32 b)
1851 {
1852         return seq_greater(a, b) ? a : b;
1853 }
1854
1855 static bool need_peer_seq(struct drbd_conf *mdev)
1856 {
1857         struct drbd_tconn *tconn = mdev->tconn;
1858         int tp;
1859
1860         /*
1861          * We only need to keep track of the last packet_seq number of our peer
1862          * if we are in dual-primary mode and we have the discard flag set; see
1863          * handle_write_conflicts().
1864          */
1865
1866         rcu_read_lock();
1867         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1868         rcu_read_unlock();
1869
1870         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1871 }
1872
1873 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1874 {
1875         unsigned int newest_peer_seq;
1876
1877         if (need_peer_seq(mdev)) {
1878                 spin_lock(&mdev->peer_seq_lock);
1879                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1880                 mdev->peer_seq = newest_peer_seq;
1881                 spin_unlock(&mdev->peer_seq_lock);
1882                 /* wake up only if we actually changed mdev->peer_seq */
1883                 if (peer_seq == newest_peer_seq)
1884                         wake_up(&mdev->seq_wait);
1885         }
1886 }
1887
1888 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1889 {
1890         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1891 }
1892
1893 /* maybe change sync_ee into interval trees as well? */
1894 static bool overlaping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1895 {
1896         struct drbd_peer_request *rs_req;
1897         bool rv = 0;
1898
1899         spin_lock_irq(&mdev->tconn->req_lock);
1900         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1901                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1902                              rs_req->i.sector, rs_req->i.size)) {
1903                         rv = 1;
1904                         break;
1905                 }
1906         }
1907         spin_unlock_irq(&mdev->tconn->req_lock);
1908
1909         if (rv)
1910                 dev_warn(DEV, "WARN: Avoiding concurrent data/resync write to single sector.\n");
1911
1912         return rv;
1913 }
1914
1915 /* Called from receive_Data.
1916  * Synchronize packets on sock with packets on msock.
1917  *
1918  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1919  * packet traveling on msock, they are still processed in the order they have
1920  * been sent.
1921  *
1922  * Note: we don't care for Ack packets overtaking P_DATA packets.
1923  *
1924  * In case packet_seq is larger than mdev->peer_seq number, there are
1925  * outstanding packets on the msock. We wait for them to arrive.
1926  * In case we are the logically next packet, we update mdev->peer_seq
1927  * ourselves. Correctly handles 32bit wrap around.
1928  *
1929  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1930  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1931  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1932  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1933  *
1934  * returns 0 if we may process the packet,
1935  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1936 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1937 {
1938         DEFINE_WAIT(wait);
1939         long timeout;
1940         int ret;
1941
1942         if (!need_peer_seq(mdev))
1943                 return 0;
1944
1945         spin_lock(&mdev->peer_seq_lock);
1946         for (;;) {
1947                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1948                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1949                         ret = 0;
1950                         break;
1951                 }
1952                 if (signal_pending(current)) {
1953                         ret = -ERESTARTSYS;
1954                         break;
1955                 }
1956                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1957                 spin_unlock(&mdev->peer_seq_lock);
1958                 rcu_read_lock();
1959                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1960                 rcu_read_unlock();
1961                 timeout = schedule_timeout(timeout);
1962                 spin_lock(&mdev->peer_seq_lock);
1963                 if (!timeout) {
1964                         ret = -ETIMEDOUT;
1965                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1966                         break;
1967                 }
1968         }
1969         spin_unlock(&mdev->peer_seq_lock);
1970         finish_wait(&mdev->seq_wait, &wait);
1971         return ret;
1972 }
1973
1974 /* see also bio_flags_to_wire()
1975  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1976  * flags and back. We may replicate to other kernel versions. */
1977 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1978 {
1979         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1980                 (dpf & DP_FUA ? REQ_FUA : 0) |
1981                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1982                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1983 }
1984
1985 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1986                                     unsigned int size)
1987 {
1988         struct drbd_interval *i;
1989
1990     repeat:
1991         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1992                 struct drbd_request *req;
1993                 struct bio_and_error m;
1994
1995                 if (!i->local)
1996                         continue;
1997                 req = container_of(i, struct drbd_request, i);
1998                 if (!(req->rq_state & RQ_POSTPONED))
1999                         continue;
2000                 req->rq_state &= ~RQ_POSTPONED;
2001                 __req_mod(req, NEG_ACKED, &m);
2002                 spin_unlock_irq(&mdev->tconn->req_lock);
2003                 if (m.bio)
2004                         complete_master_bio(mdev, &m);
2005                 spin_lock_irq(&mdev->tconn->req_lock);
2006                 goto repeat;
2007         }
2008 }
2009
2010 static int handle_write_conflicts(struct drbd_conf *mdev,
2011                                   struct drbd_peer_request *peer_req)
2012 {
2013         struct drbd_tconn *tconn = mdev->tconn;
2014         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
2015         sector_t sector = peer_req->i.sector;
2016         const unsigned int size = peer_req->i.size;
2017         struct drbd_interval *i;
2018         bool equal;
2019         int err;
2020
2021         /*
2022          * Inserting the peer request into the write_requests tree will prevent
2023          * new conflicting local requests from being added.
2024          */
2025         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2026
2027     repeat:
2028         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2029                 if (i == &peer_req->i)
2030                         continue;
2031
2032                 if (!i->local) {
2033                         /*
2034                          * Our peer has sent a conflicting remote request; this
2035                          * should not happen in a two-node setup.  Wait for the
2036                          * earlier peer request to complete.
2037                          */
2038                         err = drbd_wait_misc(mdev, i);
2039                         if (err)
2040                                 goto out;
2041                         goto repeat;
2042                 }
2043
2044                 equal = i->sector == sector && i->size == size;
2045                 if (resolve_conflicts) {
2046                         /*
2047                          * If the peer request is fully contained within the
2048                          * overlapping request, it can be discarded; otherwise,
2049                          * it will be retried once all overlapping requests
2050                          * have completed.
2051                          */
2052                         bool discard = i->sector <= sector && i->sector +
2053                                        (i->size >> 9) >= sector + (size >> 9);
2054
2055                         if (!equal)
2056                                 dev_alert(DEV, "Concurrent writes detected: "
2057                                                "local=%llus +%u, remote=%llus +%u, "
2058                                                "assuming %s came first\n",
2059                                           (unsigned long long)i->sector, i->size,
2060                                           (unsigned long long)sector, size,
2061                                           discard ? "local" : "remote");
2062
2063                         inc_unacked(mdev);
2064                         peer_req->w.cb = discard ? e_send_discard_write :
2065                                                    e_send_retry_write;
2066                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2067                         wake_asender(mdev->tconn);
2068
2069                         err = -ENOENT;
2070                         goto out;
2071                 } else {
2072                         struct drbd_request *req =
2073                                 container_of(i, struct drbd_request, i);
2074
2075                         if (!equal)
2076                                 dev_alert(DEV, "Concurrent writes detected: "
2077                                                "local=%llus +%u, remote=%llus +%u\n",
2078                                           (unsigned long long)i->sector, i->size,
2079                                           (unsigned long long)sector, size);
2080
2081                         if (req->rq_state & RQ_LOCAL_PENDING ||
2082                             !(req->rq_state & RQ_POSTPONED)) {
2083                                 /*
2084                                  * Wait for the node with the discard flag to
2085                                  * decide if this request will be discarded or
2086                                  * retried.  Requests that are discarded will
2087                                  * disappear from the write_requests tree.
2088                                  *
2089                                  * In addition, wait for the conflicting
2090                                  * request to finish locally before submitting
2091                                  * the conflicting peer request.
2092                                  */
2093                                 err = drbd_wait_misc(mdev, &req->i);
2094                                 if (err) {
2095                                         _conn_request_state(mdev->tconn,
2096                                                             NS(conn, C_TIMEOUT),
2097                                                             CS_HARD);
2098                                         fail_postponed_requests(mdev, sector, size);
2099                                         goto out;
2100                                 }
2101                                 goto repeat;
2102                         }
2103                         /*
2104                          * Remember to restart the conflicting requests after
2105                          * the new peer request has completed.
2106                          */
2107                         peer_req->flags |= EE_RESTART_REQUESTS;
2108                 }
2109         }
2110         err = 0;
2111
2112     out:
2113         if (err)
2114                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2115         return err;
2116 }
2117
2118 /* mirrored write */
2119 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2120 {
2121         struct drbd_conf *mdev;
2122         sector_t sector;
2123         struct drbd_peer_request *peer_req;
2124         struct p_data *p = pi->data;
2125         u32 peer_seq = be32_to_cpu(p->seq_num);
2126         int rw = WRITE;
2127         u32 dp_flags;
2128         int err, tp;
2129
2130         mdev = vnr_to_mdev(tconn, pi->vnr);
2131         if (!mdev)
2132                 return -EIO;
2133
2134         if (!get_ldev(mdev)) {
2135                 int err2;
2136
2137                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2138                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2139                 atomic_inc(&tconn->current_epoch->epoch_size);
2140                 err2 = drbd_drain_block(mdev, pi->size);
2141                 if (!err)
2142                         err = err2;
2143                 return err;
2144         }
2145
2146         /*
2147          * Corresponding put_ldev done either below (on various errors), or in
2148          * drbd_peer_request_endio, if we successfully submit the data at the
2149          * end of this function.
2150          */
2151
2152         sector = be64_to_cpu(p->sector);
2153         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2154         if (!peer_req) {
2155                 put_ldev(mdev);
2156                 return -EIO;
2157         }
2158
2159         peer_req->w.cb = e_end_block;
2160
2161         dp_flags = be32_to_cpu(p->dp_flags);
2162         rw |= wire_flags_to_bio(mdev, dp_flags);
2163
2164         if (dp_flags & DP_MAY_SET_IN_SYNC)
2165                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2166
2167         spin_lock(&tconn->epoch_lock);
2168         peer_req->epoch = tconn->current_epoch;
2169         atomic_inc(&peer_req->epoch->epoch_size);
2170         atomic_inc(&peer_req->epoch->active);
2171         spin_unlock(&tconn->epoch_lock);
2172
2173         rcu_read_lock();
2174         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2175         rcu_read_unlock();
2176         if (tp) {
2177                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2178                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2179                 if (err)
2180                         goto out_interrupted;
2181                 spin_lock_irq(&mdev->tconn->req_lock);
2182                 err = handle_write_conflicts(mdev, peer_req);
2183                 if (err) {
2184                         spin_unlock_irq(&mdev->tconn->req_lock);
2185                         if (err == -ENOENT) {
2186                                 put_ldev(mdev);
2187                                 return 0;
2188                         }
2189                         goto out_interrupted;
2190                 }
2191         } else
2192                 spin_lock_irq(&mdev->tconn->req_lock);
2193         list_add(&peer_req->w.list, &mdev->active_ee);
2194         spin_unlock_irq(&mdev->tconn->req_lock);
2195
2196         if (mdev->state.conn == C_SYNC_TARGET)
2197                 wait_event(mdev->ee_wait, !overlaping_resync_write(mdev, peer_req));
2198
2199         if (mdev->tconn->agreed_pro_version < 100) {
2200                 rcu_read_lock();
2201                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2202                 case DRBD_PROT_C:
2203                         dp_flags |= DP_SEND_WRITE_ACK;
2204                         break;
2205                 case DRBD_PROT_B:
2206                         dp_flags |= DP_SEND_RECEIVE_ACK;
2207                         break;
2208                 }
2209                 rcu_read_unlock();
2210         }
2211
2212         if (dp_flags & DP_SEND_WRITE_ACK) {
2213                 peer_req->flags |= EE_SEND_WRITE_ACK;
2214                 inc_unacked(mdev);
2215                 /* corresponding dec_unacked() in e_end_block()
2216                  * respective _drbd_clear_done_ee */
2217         }
2218
2219         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2220                 /* I really don't like it that the receiver thread
2221                  * sends on the msock, but anyways */
2222                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2223         }
2224
2225         if (mdev->state.pdsk < D_INCONSISTENT) {
2226                 /* In case we have the only disk of the cluster, */
2227                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2228                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2229                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2230                 drbd_al_begin_io(mdev, &peer_req->i);
2231         }
2232
2233         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2234         if (!err)
2235                 return 0;
2236
2237         /* don't care for the reason here */
2238         dev_err(DEV, "submit failed, triggering re-connect\n");
2239         spin_lock_irq(&mdev->tconn->req_lock);
2240         list_del(&peer_req->w.list);
2241         drbd_remove_epoch_entry_interval(mdev, peer_req);
2242         spin_unlock_irq(&mdev->tconn->req_lock);
2243         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2244                 drbd_al_complete_io(mdev, &peer_req->i);
2245
2246 out_interrupted:
2247         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2248         put_ldev(mdev);
2249         drbd_free_peer_req(mdev, peer_req);
2250         return err;
2251 }
2252
2253 /* We may throttle resync, if the lower device seems to be busy,
2254  * and current sync rate is above c_min_rate.
2255  *
2256  * To decide whether or not the lower device is busy, we use a scheme similar
2257  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2258  * (more than 64 sectors) of activity we cannot account for with our own resync
2259  * activity, it obviously is "busy".
2260  *
2261  * The current sync rate used here uses only the most recent two step marks,
2262  * to have a short time average so we can react faster.
2263  */
2264 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2265 {
2266         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2267         unsigned long db, dt, dbdt;
2268         struct lc_element *tmp;
2269         int curr_events;
2270         int throttle = 0;
2271         unsigned int c_min_rate;
2272
2273         rcu_read_lock();
2274         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2275         rcu_read_unlock();
2276
2277         /* feature disabled? */
2278         if (c_min_rate == 0)
2279                 return 0;
2280
2281         spin_lock_irq(&mdev->al_lock);
2282         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2283         if (tmp) {
2284                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2285                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2286                         spin_unlock_irq(&mdev->al_lock);
2287                         return 0;
2288                 }
2289                 /* Do not slow down if app IO is already waiting for this extent */
2290         }
2291         spin_unlock_irq(&mdev->al_lock);
2292
2293         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2294                       (int)part_stat_read(&disk->part0, sectors[1]) -
2295                         atomic_read(&mdev->rs_sect_ev);
2296
2297         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2298                 unsigned long rs_left;
2299                 int i;
2300
2301                 mdev->rs_last_events = curr_events;
2302
2303                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2304                  * approx. */
2305                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2306
2307                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2308                         rs_left = mdev->ov_left;
2309                 else
2310                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2311
2312                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2313                 if (!dt)
2314                         dt++;
2315                 db = mdev->rs_mark_left[i] - rs_left;
2316                 dbdt = Bit2KB(db/dt);
2317
2318                 if (dbdt > c_min_rate)
2319                         throttle = 1;
2320         }
2321         return throttle;
2322 }
2323
2324
2325 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2326 {
2327         struct drbd_conf *mdev;
2328         sector_t sector;
2329         sector_t capacity;
2330         struct drbd_peer_request *peer_req;
2331         struct digest_info *di = NULL;
2332         int size, verb;
2333         unsigned int fault_type;
2334         struct p_block_req *p = pi->data;
2335
2336         mdev = vnr_to_mdev(tconn, pi->vnr);
2337         if (!mdev)
2338                 return -EIO;
2339         capacity = drbd_get_capacity(mdev->this_bdev);
2340
2341         sector = be64_to_cpu(p->sector);
2342         size   = be32_to_cpu(p->blksize);
2343
2344         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2345                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2346                                 (unsigned long long)sector, size);
2347                 return -EINVAL;
2348         }
2349         if (sector + (size>>9) > capacity) {
2350                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2351                                 (unsigned long long)sector, size);
2352                 return -EINVAL;
2353         }
2354
2355         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2356                 verb = 1;
2357                 switch (pi->cmd) {
2358                 case P_DATA_REQUEST:
2359                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2360                         break;
2361                 case P_RS_DATA_REQUEST:
2362                 case P_CSUM_RS_REQUEST:
2363                 case P_OV_REQUEST:
2364                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2365                         break;
2366                 case P_OV_REPLY:
2367                         verb = 0;
2368                         dec_rs_pending(mdev);
2369                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2370                         break;
2371                 default:
2372                         BUG();
2373                 }
2374                 if (verb && __ratelimit(&drbd_ratelimit_state))
2375                         dev_err(DEV, "Can not satisfy peer's read request, "
2376                             "no local data.\n");
2377
2378                 /* drain possibly payload */
2379                 return drbd_drain_block(mdev, pi->size);
2380         }
2381
2382         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2383          * "criss-cross" setup, that might cause write-out on some other DRBD,
2384          * which in turn might block on the other node at this very place.  */
2385         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2386         if (!peer_req) {
2387                 put_ldev(mdev);
2388                 return -ENOMEM;
2389         }
2390
2391         switch (pi->cmd) {
2392         case P_DATA_REQUEST:
2393                 peer_req->w.cb = w_e_end_data_req;
2394                 fault_type = DRBD_FAULT_DT_RD;
2395                 /* application IO, don't drbd_rs_begin_io */
2396                 goto submit;
2397
2398         case P_RS_DATA_REQUEST:
2399                 peer_req->w.cb = w_e_end_rsdata_req;
2400                 fault_type = DRBD_FAULT_RS_RD;
2401                 /* used in the sector offset progress display */
2402                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2403                 break;
2404
2405         case P_OV_REPLY:
2406         case P_CSUM_RS_REQUEST:
2407                 fault_type = DRBD_FAULT_RS_RD;
2408                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2409                 if (!di)
2410                         goto out_free_e;
2411
2412                 di->digest_size = pi->size;
2413                 di->digest = (((char *)di)+sizeof(struct digest_info));
2414
2415                 peer_req->digest = di;
2416                 peer_req->flags |= EE_HAS_DIGEST;
2417
2418                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2419                         goto out_free_e;
2420
2421                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2422                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2423                         peer_req->w.cb = w_e_end_csum_rs_req;
2424                         /* used in the sector offset progress display */
2425                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2426                 } else if (pi->cmd == P_OV_REPLY) {
2427                         /* track progress, we may need to throttle */
2428                         atomic_add(size >> 9, &mdev->rs_sect_in);
2429                         peer_req->w.cb = w_e_end_ov_reply;
2430                         dec_rs_pending(mdev);
2431                         /* drbd_rs_begin_io done when we sent this request,
2432                          * but accounting still needs to be done. */
2433                         goto submit_for_resync;
2434                 }
2435                 break;
2436
2437         case P_OV_REQUEST:
2438                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2439                     mdev->tconn->agreed_pro_version >= 90) {
2440                         unsigned long now = jiffies;
2441                         int i;
2442                         mdev->ov_start_sector = sector;
2443                         mdev->ov_position = sector;
2444                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2445                         mdev->rs_total = mdev->ov_left;
2446                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2447                                 mdev->rs_mark_left[i] = mdev->ov_left;
2448                                 mdev->rs_mark_time[i] = now;
2449                         }
2450                         dev_info(DEV, "Online Verify start sector: %llu\n",
2451                                         (unsigned long long)sector);
2452                 }
2453                 peer_req->w.cb = w_e_end_ov_req;
2454                 fault_type = DRBD_FAULT_RS_RD;
2455                 break;
2456
2457         default:
2458                 BUG();
2459         }
2460
2461         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2462          * wrt the receiver, but it is not as straightforward as it may seem.
2463          * Various places in the resync start and stop logic assume resync
2464          * requests are processed in order, requeuing this on the worker thread
2465          * introduces a bunch of new code for synchronization between threads.
2466          *
2467          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2468          * "forever", throttling after drbd_rs_begin_io will lock that extent
2469          * for application writes for the same time.  For now, just throttle
2470          * here, where the rest of the code expects the receiver to sleep for
2471          * a while, anyways.
2472          */
2473
2474         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2475          * this defers syncer requests for some time, before letting at least
2476          * on request through.  The resync controller on the receiving side
2477          * will adapt to the incoming rate accordingly.
2478          *
2479          * We cannot throttle here if remote is Primary/SyncTarget:
2480          * we would also throttle its application reads.
2481          * In that case, throttling is done on the SyncTarget only.
2482          */
2483         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2484                 schedule_timeout_uninterruptible(HZ/10);
2485         if (drbd_rs_begin_io(mdev, sector))
2486                 goto out_free_e;
2487
2488 submit_for_resync:
2489         atomic_add(size >> 9, &mdev->rs_sect_ev);
2490
2491 submit:
2492         inc_unacked(mdev);
2493         spin_lock_irq(&mdev->tconn->req_lock);
2494         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2495         spin_unlock_irq(&mdev->tconn->req_lock);
2496
2497         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2498                 return 0;
2499
2500         /* don't care for the reason here */
2501         dev_err(DEV, "submit failed, triggering re-connect\n");
2502         spin_lock_irq(&mdev->tconn->req_lock);
2503         list_del(&peer_req->w.list);
2504         spin_unlock_irq(&mdev->tconn->req_lock);
2505         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2506
2507 out_free_e:
2508         put_ldev(mdev);
2509         drbd_free_peer_req(mdev, peer_req);
2510         return -EIO;
2511 }
2512
2513 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2514 {
2515         int self, peer, rv = -100;
2516         unsigned long ch_self, ch_peer;
2517         enum drbd_after_sb_p after_sb_0p;
2518
2519         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2520         peer = mdev->p_uuid[UI_BITMAP] & 1;
2521
2522         ch_peer = mdev->p_uuid[UI_SIZE];
2523         ch_self = mdev->comm_bm_set;
2524
2525         rcu_read_lock();
2526         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2527         rcu_read_unlock();
2528         switch (after_sb_0p) {
2529         case ASB_CONSENSUS:
2530         case ASB_DISCARD_SECONDARY:
2531         case ASB_CALL_HELPER:
2532         case ASB_VIOLENTLY:
2533                 dev_err(DEV, "Configuration error.\n");
2534                 break;
2535         case ASB_DISCONNECT:
2536                 break;
2537         case ASB_DISCARD_YOUNGER_PRI:
2538                 if (self == 0 && peer == 1) {
2539                         rv = -1;
2540                         break;
2541                 }
2542                 if (self == 1 && peer == 0) {
2543                         rv =  1;
2544                         break;
2545                 }
2546                 /* Else fall through to one of the other strategies... */
2547         case ASB_DISCARD_OLDER_PRI:
2548                 if (self == 0 && peer == 1) {
2549                         rv = 1;
2550                         break;
2551                 }
2552                 if (self == 1 && peer == 0) {
2553                         rv = -1;
2554                         break;
2555                 }
2556                 /* Else fall through to one of the other strategies... */
2557                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2558                      "Using discard-least-changes instead\n");
2559         case ASB_DISCARD_ZERO_CHG:
2560                 if (ch_peer == 0 && ch_self == 0) {
2561                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2562                                 ? -1 : 1;
2563                         break;
2564                 } else {
2565                         if (ch_peer == 0) { rv =  1; break; }
2566                         if (ch_self == 0) { rv = -1; break; }
2567                 }
2568                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2569                         break;
2570         case ASB_DISCARD_LEAST_CHG:
2571                 if      (ch_self < ch_peer)
2572                         rv = -1;
2573                 else if (ch_self > ch_peer)
2574                         rv =  1;
2575                 else /* ( ch_self == ch_peer ) */
2576                      /* Well, then use something else. */
2577                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2578                                 ? -1 : 1;
2579                 break;
2580         case ASB_DISCARD_LOCAL:
2581                 rv = -1;
2582                 break;
2583         case ASB_DISCARD_REMOTE:
2584                 rv =  1;
2585         }
2586
2587         return rv;
2588 }
2589
2590 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2591 {
2592         int hg, rv = -100;
2593         enum drbd_after_sb_p after_sb_1p;
2594
2595         rcu_read_lock();
2596         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2597         rcu_read_unlock();
2598         switch (after_sb_1p) {
2599         case ASB_DISCARD_YOUNGER_PRI:
2600         case ASB_DISCARD_OLDER_PRI:
2601         case ASB_DISCARD_LEAST_CHG:
2602         case ASB_DISCARD_LOCAL:
2603         case ASB_DISCARD_REMOTE:
2604         case ASB_DISCARD_ZERO_CHG:
2605                 dev_err(DEV, "Configuration error.\n");
2606                 break;
2607         case ASB_DISCONNECT:
2608                 break;
2609         case ASB_CONSENSUS:
2610                 hg = drbd_asb_recover_0p(mdev);
2611                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2612                         rv = hg;
2613                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2614                         rv = hg;
2615                 break;
2616         case ASB_VIOLENTLY:
2617                 rv = drbd_asb_recover_0p(mdev);
2618                 break;
2619         case ASB_DISCARD_SECONDARY:
2620                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2621         case ASB_CALL_HELPER:
2622                 hg = drbd_asb_recover_0p(mdev);
2623                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2624                         enum drbd_state_rv rv2;
2625
2626                         drbd_set_role(mdev, R_SECONDARY, 0);
2627                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2628                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2629                           * we do not need to wait for the after state change work either. */
2630                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2631                         if (rv2 != SS_SUCCESS) {
2632                                 drbd_khelper(mdev, "pri-lost-after-sb");
2633                         } else {
2634                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2635                                 rv = hg;
2636                         }
2637                 } else
2638                         rv = hg;
2639         }
2640
2641         return rv;
2642 }
2643
2644 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2645 {
2646         int hg, rv = -100;
2647         enum drbd_after_sb_p after_sb_2p;
2648
2649         rcu_read_lock();
2650         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2651         rcu_read_unlock();
2652         switch (after_sb_2p) {
2653         case ASB_DISCARD_YOUNGER_PRI:
2654         case ASB_DISCARD_OLDER_PRI:
2655         case ASB_DISCARD_LEAST_CHG:
2656         case ASB_DISCARD_LOCAL:
2657         case ASB_DISCARD_REMOTE:
2658         case ASB_CONSENSUS:
2659         case ASB_DISCARD_SECONDARY:
2660         case ASB_DISCARD_ZERO_CHG:
2661                 dev_err(DEV, "Configuration error.\n");
2662                 break;
2663         case ASB_VIOLENTLY:
2664                 rv = drbd_asb_recover_0p(mdev);
2665                 break;
2666         case ASB_DISCONNECT:
2667                 break;
2668         case ASB_CALL_HELPER:
2669                 hg = drbd_asb_recover_0p(mdev);
2670                 if (hg == -1) {
2671                         enum drbd_state_rv rv2;
2672
2673                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2674                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2675                           * we do not need to wait for the after state change work either. */
2676                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2677                         if (rv2 != SS_SUCCESS) {
2678                                 drbd_khelper(mdev, "pri-lost-after-sb");
2679                         } else {
2680                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2681                                 rv = hg;
2682                         }
2683                 } else
2684                         rv = hg;
2685         }
2686
2687         return rv;
2688 }
2689
2690 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2691                            u64 bits, u64 flags)
2692 {
2693         if (!uuid) {
2694                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2695                 return;
2696         }
2697         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2698              text,
2699              (unsigned long long)uuid[UI_CURRENT],
2700              (unsigned long long)uuid[UI_BITMAP],
2701              (unsigned long long)uuid[UI_HISTORY_START],
2702              (unsigned long long)uuid[UI_HISTORY_END],
2703              (unsigned long long)bits,
2704              (unsigned long long)flags);
2705 }
2706
2707 /*
2708   100   after split brain try auto recover
2709     2   C_SYNC_SOURCE set BitMap
2710     1   C_SYNC_SOURCE use BitMap
2711     0   no Sync
2712    -1   C_SYNC_TARGET use BitMap
2713    -2   C_SYNC_TARGET set BitMap
2714  -100   after split brain, disconnect
2715 -1000   unrelated data
2716 -1091   requires proto 91
2717 -1096   requires proto 96
2718  */
2719 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2720 {
2721         u64 self, peer;
2722         int i, j;
2723
2724         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2725         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2726
2727         *rule_nr = 10;
2728         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2729                 return 0;
2730
2731         *rule_nr = 20;
2732         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2733              peer != UUID_JUST_CREATED)
2734                 return -2;
2735
2736         *rule_nr = 30;
2737         if (self != UUID_JUST_CREATED &&
2738             (peer == UUID_JUST_CREATED || peer == (u64)0))
2739                 return 2;
2740
2741         if (self == peer) {
2742                 int rct, dc; /* roles at crash time */
2743
2744                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2745
2746                         if (mdev->tconn->agreed_pro_version < 91)
2747                                 return -1091;
2748
2749                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2750                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2751                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2752                                 drbd_uuid_set_bm(mdev, 0UL);
2753
2754                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2755                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2756                                 *rule_nr = 34;
2757                         } else {
2758                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2759                                 *rule_nr = 36;
2760                         }
2761
2762                         return 1;
2763                 }
2764
2765                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2766
2767                         if (mdev->tconn->agreed_pro_version < 91)
2768                                 return -1091;
2769
2770                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2771                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2772                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2773
2774                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2775                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2776                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2777
2778                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2779                                 *rule_nr = 35;
2780                         } else {
2781                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2782                                 *rule_nr = 37;
2783                         }
2784
2785                         return -1;
2786                 }
2787
2788                 /* Common power [off|failure] */
2789                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2790                         (mdev->p_uuid[UI_FLAGS] & 2);
2791                 /* lowest bit is set when we were primary,
2792                  * next bit (weight 2) is set when peer was primary */
2793                 *rule_nr = 40;
2794
2795                 switch (rct) {
2796                 case 0: /* !self_pri && !peer_pri */ return 0;
2797                 case 1: /*  self_pri && !peer_pri */ return 1;
2798                 case 2: /* !self_pri &&  peer_pri */ return -1;
2799                 case 3: /*  self_pri &&  peer_pri */
2800                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2801                         return dc ? -1 : 1;
2802                 }
2803         }
2804
2805         *rule_nr = 50;
2806         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2807         if (self == peer)
2808                 return -1;
2809
2810         *rule_nr = 51;
2811         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2812         if (self == peer) {
2813                 if (mdev->tconn->agreed_pro_version < 96 ?
2814                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2815                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2816                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2817                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2818                            resync as sync source modifications of the peer's UUIDs. */
2819
2820                         if (mdev->tconn->agreed_pro_version < 91)
2821                                 return -1091;
2822
2823                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2824                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2825
2826                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2827                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2828
2829                         return -1;
2830                 }
2831         }
2832
2833         *rule_nr = 60;
2834         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2835         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2836                 peer = mdev->p_uuid[i] & ~((u64)1);
2837                 if (self == peer)
2838                         return -2;
2839         }
2840
2841         *rule_nr = 70;
2842         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2843         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2844         if (self == peer)
2845                 return 1;
2846
2847         *rule_nr = 71;
2848         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2849         if (self == peer) {
2850                 if (mdev->tconn->agreed_pro_version < 96 ?
2851                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2852                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2853                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2854                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2855                            resync as sync source modifications of our UUIDs. */
2856
2857                         if (mdev->tconn->agreed_pro_version < 91)
2858                                 return -1091;
2859
2860                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2861                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2862
2863                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2864                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2865                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2866
2867                         return 1;
2868                 }
2869         }
2870
2871
2872         *rule_nr = 80;
2873         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2874         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2875                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2876                 if (self == peer)
2877                         return 2;
2878         }
2879
2880         *rule_nr = 90;
2881         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2882         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2883         if (self == peer && self != ((u64)0))
2884                 return 100;
2885
2886         *rule_nr = 100;
2887         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2888                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2889                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2890                         peer = mdev->p_uuid[j] & ~((u64)1);
2891                         if (self == peer)
2892                                 return -100;
2893                 }
2894         }
2895
2896         return -1000;
2897 }
2898
2899 /* drbd_sync_handshake() returns the new conn state on success, or
2900    CONN_MASK (-1) on failure.
2901  */
2902 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2903                                            enum drbd_disk_state peer_disk) __must_hold(local)
2904 {
2905         enum drbd_conns rv = C_MASK;
2906         enum drbd_disk_state mydisk;
2907         struct net_conf *nc;
2908         int hg, rule_nr, rr_conflict, tentative;
2909
2910         mydisk = mdev->state.disk;
2911         if (mydisk == D_NEGOTIATING)
2912                 mydisk = mdev->new_state_tmp.disk;
2913
2914         dev_info(DEV, "drbd_sync_handshake:\n");
2915         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2916         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2917                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2918
2919         hg = drbd_uuid_compare(mdev, &rule_nr);
2920
2921         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2922
2923         if (hg == -1000) {
2924                 dev_alert(DEV, "Unrelated data, aborting!\n");
2925                 return C_MASK;
2926         }
2927         if (hg < -1000) {
2928                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2929                 return C_MASK;
2930         }
2931
2932         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2933             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2934                 int f = (hg == -100) || abs(hg) == 2;
2935                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2936                 if (f)
2937                         hg = hg*2;
2938                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2939                      hg > 0 ? "source" : "target");
2940         }
2941
2942         if (abs(hg) == 100)
2943                 drbd_khelper(mdev, "initial-split-brain");
2944
2945         rcu_read_lock();
2946         nc = rcu_dereference(mdev->tconn->net_conf);
2947
2948         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2949                 int pcount = (mdev->state.role == R_PRIMARY)
2950                            + (peer_role == R_PRIMARY);
2951                 int forced = (hg == -100);
2952
2953                 switch (pcount) {
2954                 case 0:
2955                         hg = drbd_asb_recover_0p(mdev);
2956                         break;
2957                 case 1:
2958                         hg = drbd_asb_recover_1p(mdev);
2959                         break;
2960                 case 2:
2961                         hg = drbd_asb_recover_2p(mdev);
2962                         break;
2963                 }
2964                 if (abs(hg) < 100) {
2965                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2966                              "automatically solved. Sync from %s node\n",
2967                              pcount, (hg < 0) ? "peer" : "this");
2968                         if (forced) {
2969                                 dev_warn(DEV, "Doing a full sync, since"
2970                                      " UUIDs where ambiguous.\n");
2971                                 hg = hg*2;
2972                         }
2973                 }
2974         }
2975
2976         if (hg == -100) {
2977                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2978                         hg = -1;
2979                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2980                         hg = 1;
2981
2982                 if (abs(hg) < 100)
2983                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2984                              "Sync from %s node\n",
2985                              (hg < 0) ? "peer" : "this");
2986         }
2987         rr_conflict = nc->rr_conflict;
2988         tentative = nc->tentative;
2989         rcu_read_unlock();
2990
2991         if (hg == -100) {
2992                 /* FIXME this log message is not correct if we end up here
2993                  * after an attempted attach on a diskless node.
2994                  * We just refuse to attach -- well, we drop the "connection"
2995                  * to that disk, in a way... */
2996                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2997                 drbd_khelper(mdev, "split-brain");
2998                 return C_MASK;
2999         }
3000
3001         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3002                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3003                 return C_MASK;
3004         }
3005
3006         if (hg < 0 && /* by intention we do not use mydisk here. */
3007             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3008                 switch (rr_conflict) {
3009                 case ASB_CALL_HELPER:
3010                         drbd_khelper(mdev, "pri-lost");
3011                         /* fall through */
3012                 case ASB_DISCONNECT:
3013                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3014                         return C_MASK;
3015                 case ASB_VIOLENTLY:
3016                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3017                              "assumption\n");
3018                 }
3019         }
3020
3021         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3022                 if (hg == 0)
3023                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3024                 else
3025                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3026                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3027                                  abs(hg) >= 2 ? "full" : "bit-map based");
3028                 return C_MASK;
3029         }
3030
3031         if (abs(hg) >= 2) {
3032                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3033                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3034                                         BM_LOCKED_SET_ALLOWED))
3035                         return C_MASK;
3036         }
3037
3038         if (hg > 0) { /* become sync source. */
3039                 rv = C_WF_BITMAP_S;
3040         } else if (hg < 0) { /* become sync target */
3041                 rv = C_WF_BITMAP_T;
3042         } else {
3043                 rv = C_CONNECTED;
3044                 if (drbd_bm_total_weight(mdev)) {
3045                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3046                              drbd_bm_total_weight(mdev));
3047                 }
3048         }
3049
3050         return rv;
3051 }
3052
3053 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3054 {
3055         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3056         if (peer == ASB_DISCARD_REMOTE)
3057                 return ASB_DISCARD_LOCAL;
3058
3059         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3060         if (peer == ASB_DISCARD_LOCAL)
3061                 return ASB_DISCARD_REMOTE;
3062
3063         /* everything else is valid if they are equal on both sides. */
3064         return peer;
3065 }
3066
3067 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3068 {
3069         struct p_protocol *p = pi->data;
3070         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3071         int p_proto, p_discard_my_data, p_two_primaries, cf;
3072         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3073         char integrity_alg[SHARED_SECRET_MAX] = "";
3074         struct crypto_hash *peer_integrity_tfm = NULL;
3075         void *int_dig_in = NULL, *int_dig_vv = NULL;
3076
3077         p_proto         = be32_to_cpu(p->protocol);
3078         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3079         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3080         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3081         p_two_primaries = be32_to_cpu(p->two_primaries);
3082         cf              = be32_to_cpu(p->conn_flags);
3083         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3084
3085         if (tconn->agreed_pro_version >= 87) {
3086                 int err;
3087
3088                 if (pi->size > sizeof(integrity_alg))
3089                         return -EIO;
3090                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3091                 if (err)
3092                         return err;
3093                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3094         }
3095
3096         if (pi->cmd != P_PROTOCOL_UPDATE) {
3097                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3098
3099                 if (cf & CF_DRY_RUN)
3100                         set_bit(CONN_DRY_RUN, &tconn->flags);
3101
3102                 rcu_read_lock();
3103                 nc = rcu_dereference(tconn->net_conf);
3104
3105                 if (p_proto != nc->wire_protocol) {
3106                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3107                         goto disconnect_rcu_unlock;
3108                 }
3109
3110                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3111                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3112                         goto disconnect_rcu_unlock;
3113                 }
3114
3115                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3116                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3117                         goto disconnect_rcu_unlock;
3118                 }
3119
3120                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3121                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3122                         goto disconnect_rcu_unlock;
3123                 }
3124
3125                 if (p_discard_my_data && nc->discard_my_data) {
3126                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3127                         goto disconnect_rcu_unlock;
3128                 }
3129
3130                 if (p_two_primaries != nc->two_primaries) {
3131                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3132                         goto disconnect_rcu_unlock;
3133                 }
3134
3135                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3136                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3137                         goto disconnect_rcu_unlock;
3138                 }
3139
3140                 rcu_read_unlock();
3141         }
3142
3143         if (integrity_alg[0]) {
3144                 int hash_size;
3145
3146                 /*
3147                  * We can only change the peer data integrity algorithm
3148                  * here.  Changing our own data integrity algorithm
3149                  * requires that we send a P_PROTOCOL_UPDATE packet at
3150                  * the same time; otherwise, the peer has no way to
3151                  * tell between which packets the algorithm should
3152                  * change.
3153                  */
3154
3155                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3156                 if (!peer_integrity_tfm) {
3157                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3158                                  integrity_alg);
3159                         goto disconnect;
3160                 }
3161
3162                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3163                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3164                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3165                 if (!(int_dig_in && int_dig_vv)) {
3166                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3167                         goto disconnect;
3168                 }
3169         }
3170
3171         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3172         if (!new_net_conf) {
3173                 conn_err(tconn, "Allocation of new net_conf failed\n");
3174                 goto disconnect;
3175         }
3176
3177         mutex_lock(&tconn->data.mutex);
3178         mutex_lock(&tconn->conf_update);
3179         old_net_conf = tconn->net_conf;
3180         *new_net_conf = *old_net_conf;
3181
3182         new_net_conf->wire_protocol = p_proto;
3183         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3184         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3185         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3186         new_net_conf->two_primaries = p_two_primaries;
3187
3188         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3189         mutex_unlock(&tconn->conf_update);
3190         mutex_unlock(&tconn->data.mutex);
3191
3192         crypto_free_hash(tconn->peer_integrity_tfm);
3193         kfree(tconn->int_dig_in);
3194         kfree(tconn->int_dig_vv);
3195         tconn->peer_integrity_tfm = peer_integrity_tfm;
3196         tconn->int_dig_in = int_dig_in;
3197         tconn->int_dig_vv = int_dig_vv;
3198
3199         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3200                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3201                           integrity_alg[0] ? integrity_alg : "(none)");
3202
3203         synchronize_rcu();
3204         kfree(old_net_conf);
3205         return 0;
3206
3207 disconnect_rcu_unlock:
3208         rcu_read_unlock();
3209 disconnect:
3210         crypto_free_hash(peer_integrity_tfm);
3211         kfree(int_dig_in);
3212         kfree(int_dig_vv);
3213         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3214         return -EIO;
3215 }
3216
3217 /* helper function
3218  * input: alg name, feature name
3219  * return: NULL (alg name was "")
3220  *         ERR_PTR(error) if something goes wrong
3221  *         or the crypto hash ptr, if it worked out ok. */
3222 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3223                 const char *alg, const char *name)
3224 {
3225         struct crypto_hash *tfm;
3226
3227         if (!alg[0])
3228                 return NULL;
3229
3230         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3231         if (IS_ERR(tfm)) {
3232                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3233                         alg, name, PTR_ERR(tfm));
3234                 return tfm;
3235         }
3236         return tfm;
3237 }
3238
3239 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3240 {
3241         void *buffer = tconn->data.rbuf;
3242         int size = pi->size;
3243
3244         while (size) {
3245                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3246                 s = drbd_recv(tconn, buffer, s);
3247                 if (s <= 0) {
3248                         if (s < 0)
3249                                 return s;
3250                         break;
3251                 }
3252                 size -= s;
3253         }
3254         if (size)
3255                 return -EIO;
3256         return 0;
3257 }
3258
3259 /*
3260  * config_unknown_volume  -  device configuration command for unknown volume
3261  *
3262  * When a device is added to an existing connection, the node on which the
3263  * device is added first will send configuration commands to its peer but the
3264  * peer will not know about the device yet.  It will warn and ignore these
3265  * commands.  Once the device is added on the second node, the second node will
3266  * send the same device configuration commands, but in the other direction.
3267  *
3268  * (We can also end up here if drbd is misconfigured.)
3269  */
3270 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3271 {
3272         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3273                   cmdname(pi->cmd), pi->vnr);
3274         return ignore_remaining_packet(tconn, pi);
3275 }
3276
3277 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3278 {
3279         struct drbd_conf *mdev;
3280         struct p_rs_param_95 *p;
3281         unsigned int header_size, data_size, exp_max_sz;
3282         struct crypto_hash *verify_tfm = NULL;
3283         struct crypto_hash *csums_tfm = NULL;
3284         struct net_conf *old_net_conf, *new_net_conf = NULL;
3285         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3286         const int apv = tconn->agreed_pro_version;
3287         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3288         int fifo_size = 0;
3289         int err;
3290
3291         mdev = vnr_to_mdev(tconn, pi->vnr);
3292         if (!mdev)
3293                 return config_unknown_volume(tconn, pi);
3294
3295         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3296                     : apv == 88 ? sizeof(struct p_rs_param)
3297                                         + SHARED_SECRET_MAX
3298                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3299                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3300
3301         if (pi->size > exp_max_sz) {
3302                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3303                     pi->size, exp_max_sz);
3304                 return -EIO;
3305         }
3306
3307         if (apv <= 88) {
3308                 header_size = sizeof(struct p_rs_param);
3309                 data_size = pi->size - header_size;
3310         } else if (apv <= 94) {
3311                 header_size = sizeof(struct p_rs_param_89);
3312                 data_size = pi->size - header_size;
3313                 D_ASSERT(data_size == 0);
3314         } else {
3315                 header_size = sizeof(struct p_rs_param_95);
3316                 data_size = pi->size - header_size;
3317                 D_ASSERT(data_size == 0);
3318         }
3319
3320         /* initialize verify_alg and csums_alg */
3321         p = pi->data;
3322         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3323
3324         err = drbd_recv_all(mdev->tconn, p, header_size);
3325         if (err)
3326                 return err;
3327
3328         mutex_lock(&mdev->tconn->conf_update);
3329         old_net_conf = mdev->tconn->net_conf;
3330         if (get_ldev(mdev)) {
3331                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3332                 if (!new_disk_conf) {
3333                         put_ldev(mdev);
3334                         mutex_unlock(&mdev->tconn->conf_update);
3335                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3336                         return -ENOMEM;
3337                 }
3338
3339                 old_disk_conf = mdev->ldev->disk_conf;
3340                 *new_disk_conf = *old_disk_conf;
3341
3342                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3343         }
3344
3345         if (apv >= 88) {
3346                 if (apv == 88) {
3347                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3348                                 dev_err(DEV, "verify-alg of wrong size, "
3349                                         "peer wants %u, accepting only up to %u byte\n",
3350                                         data_size, SHARED_SECRET_MAX);
3351                                 err = -EIO;
3352                                 goto reconnect;
3353                         }
3354
3355                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3356                         if (err)
3357                                 goto reconnect;
3358                         /* we expect NUL terminated string */
3359                         /* but just in case someone tries to be evil */
3360                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3361                         p->verify_alg[data_size-1] = 0;
3362
3363                 } else /* apv >= 89 */ {
3364                         /* we still expect NUL terminated strings */
3365                         /* but just in case someone tries to be evil */
3366                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3367                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3368                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3369                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3370                 }
3371
3372                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3373                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3374                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3375                                     old_net_conf->verify_alg, p->verify_alg);
3376                                 goto disconnect;
3377                         }
3378                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3379                                         p->verify_alg, "verify-alg");
3380                         if (IS_ERR(verify_tfm)) {
3381                                 verify_tfm = NULL;
3382                                 goto disconnect;
3383                         }
3384                 }
3385
3386                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3387                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3388                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3389                                     old_net_conf->csums_alg, p->csums_alg);
3390                                 goto disconnect;
3391                         }
3392                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3393                                         p->csums_alg, "csums-alg");
3394                         if (IS_ERR(csums_tfm)) {
3395                                 csums_tfm = NULL;
3396                                 goto disconnect;
3397                         }
3398                 }
3399
3400                 if (apv > 94 && new_disk_conf) {
3401                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3402                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3403                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3404                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3405
3406                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3407                         if (fifo_size != mdev->rs_plan_s->size) {
3408                                 new_plan = fifo_alloc(fifo_size);
3409                                 if (!new_plan) {
3410                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3411                                         put_ldev(mdev);
3412                                         goto disconnect;
3413                                 }
3414                         }
3415                 }
3416
3417                 if (verify_tfm || csums_tfm) {
3418                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3419                         if (!new_net_conf) {
3420                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3421                                 goto disconnect;
3422                         }
3423
3424                         *new_net_conf = *old_net_conf;
3425
3426                         if (verify_tfm) {
3427                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3428                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3429                                 crypto_free_hash(mdev->tconn->verify_tfm);
3430                                 mdev->tconn->verify_tfm = verify_tfm;
3431                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3432                         }
3433                         if (csums_tfm) {
3434                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3435                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3436                                 crypto_free_hash(mdev->tconn->csums_tfm);
3437                                 mdev->tconn->csums_tfm = csums_tfm;
3438                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3439                         }
3440                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3441                 }
3442         }
3443
3444         if (new_disk_conf) {
3445                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3446                 put_ldev(mdev);
3447         }
3448
3449         if (new_plan) {
3450                 old_plan = mdev->rs_plan_s;
3451                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3452         }
3453
3454         mutex_unlock(&mdev->tconn->conf_update);
3455         synchronize_rcu();
3456         if (new_net_conf)
3457                 kfree(old_net_conf);
3458         kfree(old_disk_conf);
3459         kfree(old_plan);
3460
3461         return 0;
3462
3463 reconnect:
3464         if (new_disk_conf) {
3465                 put_ldev(mdev);
3466                 kfree(new_disk_conf);
3467         }
3468         mutex_unlock(&mdev->tconn->conf_update);
3469         return -EIO;
3470
3471 disconnect:
3472         kfree(new_plan);
3473         if (new_disk_conf) {
3474                 put_ldev(mdev);
3475                 kfree(new_disk_conf);
3476         }
3477         mutex_unlock(&mdev->tconn->conf_update);
3478         /* just for completeness: actually not needed,
3479          * as this is not reached if csums_tfm was ok. */
3480         crypto_free_hash(csums_tfm);
3481         /* but free the verify_tfm again, if csums_tfm did not work out */
3482         crypto_free_hash(verify_tfm);
3483         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3484         return -EIO;
3485 }
3486
3487 /* warn if the arguments differ by more than 12.5% */
3488 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3489         const char *s, sector_t a, sector_t b)
3490 {
3491         sector_t d;
3492         if (a == 0 || b == 0)
3493                 return;
3494         d = (a > b) ? (a - b) : (b - a);
3495         if (d > (a>>3) || d > (b>>3))
3496                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3497                      (unsigned long long)a, (unsigned long long)b);
3498 }
3499
3500 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3501 {
3502         struct drbd_conf *mdev;
3503         struct p_sizes *p = pi->data;
3504         enum determine_dev_size dd = unchanged;
3505         sector_t p_size, p_usize, my_usize;
3506         int ldsc = 0; /* local disk size changed */
3507         enum dds_flags ddsf;
3508
3509         mdev = vnr_to_mdev(tconn, pi->vnr);
3510         if (!mdev)
3511                 return config_unknown_volume(tconn, pi);
3512
3513         p_size = be64_to_cpu(p->d_size);
3514         p_usize = be64_to_cpu(p->u_size);
3515
3516         /* just store the peer's disk size for now.
3517          * we still need to figure out whether we accept that. */
3518         mdev->p_size = p_size;
3519
3520         if (get_ldev(mdev)) {
3521                 rcu_read_lock();
3522                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3523                 rcu_read_unlock();
3524
3525                 warn_if_differ_considerably(mdev, "lower level device sizes",
3526                            p_size, drbd_get_max_capacity(mdev->ldev));
3527                 warn_if_differ_considerably(mdev, "user requested size",
3528                                             p_usize, my_usize);
3529
3530                 /* if this is the first connect, or an otherwise expected
3531                  * param exchange, choose the minimum */
3532                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3533                         p_usize = min_not_zero(my_usize, p_usize);
3534
3535                 /* Never shrink a device with usable data during connect.
3536                    But allow online shrinking if we are connected. */
3537                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3538                     drbd_get_capacity(mdev->this_bdev) &&
3539                     mdev->state.disk >= D_OUTDATED &&
3540                     mdev->state.conn < C_CONNECTED) {
3541                         dev_err(DEV, "The peer's disk size is too small!\n");
3542                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3543                         put_ldev(mdev);
3544                         return -EIO;
3545                 }
3546
3547                 if (my_usize != p_usize) {
3548                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3549
3550                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3551                         if (!new_disk_conf) {
3552                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3553                                 put_ldev(mdev);
3554                                 return -ENOMEM;
3555                         }
3556
3557                         mutex_lock(&mdev->tconn->conf_update);
3558                         old_disk_conf = mdev->ldev->disk_conf;
3559                         *new_disk_conf = *old_disk_conf;
3560                         new_disk_conf->disk_size = p_usize;
3561
3562                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3563                         mutex_unlock(&mdev->tconn->conf_update);
3564                         synchronize_rcu();
3565                         kfree(old_disk_conf);
3566
3567                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3568                                  (unsigned long)my_usize);
3569                 }
3570
3571                 put_ldev(mdev);
3572         }
3573
3574         ddsf = be16_to_cpu(p->dds_flags);
3575         if (get_ldev(mdev)) {
3576                 dd = drbd_determine_dev_size(mdev, ddsf);
3577                 put_ldev(mdev);
3578                 if (dd == dev_size_error)
3579                         return -EIO;
3580                 drbd_md_sync(mdev);
3581         } else {
3582                 /* I am diskless, need to accept the peer's size. */
3583                 drbd_set_my_capacity(mdev, p_size);
3584         }
3585
3586         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3587         drbd_reconsider_max_bio_size(mdev);
3588
3589         if (get_ldev(mdev)) {
3590                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3591                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3592                         ldsc = 1;
3593                 }
3594
3595                 put_ldev(mdev);
3596         }
3597
3598         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3599                 if (be64_to_cpu(p->c_size) !=
3600                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3601                         /* we have different sizes, probably peer
3602                          * needs to know my new size... */
3603                         drbd_send_sizes(mdev, 0, ddsf);
3604                 }
3605                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3606                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3607                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3608                             mdev->state.disk >= D_INCONSISTENT) {
3609                                 if (ddsf & DDSF_NO_RESYNC)
3610                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3611                                 else
3612                                         resync_after_online_grow(mdev);
3613                         } else
3614                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3615                 }
3616         }
3617
3618         return 0;
3619 }
3620
3621 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3622 {
3623         struct drbd_conf *mdev;
3624         struct p_uuids *p = pi->data;
3625         u64 *p_uuid;
3626         int i, updated_uuids = 0;
3627
3628         mdev = vnr_to_mdev(tconn, pi->vnr);
3629         if (!mdev)
3630                 return config_unknown_volume(tconn, pi);
3631
3632         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3633
3634         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3635                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3636
3637         kfree(mdev->p_uuid);
3638         mdev->p_uuid = p_uuid;
3639
3640         if (mdev->state.conn < C_CONNECTED &&
3641             mdev->state.disk < D_INCONSISTENT &&
3642             mdev->state.role == R_PRIMARY &&
3643             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3644                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3645                     (unsigned long long)mdev->ed_uuid);
3646                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3647                 return -EIO;
3648         }
3649
3650         if (get_ldev(mdev)) {
3651                 int skip_initial_sync =
3652                         mdev->state.conn == C_CONNECTED &&
3653                         mdev->tconn->agreed_pro_version >= 90 &&
3654                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3655                         (p_uuid[UI_FLAGS] & 8);
3656                 if (skip_initial_sync) {
3657                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3658                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3659                                         "clear_n_write from receive_uuids",
3660                                         BM_LOCKED_TEST_ALLOWED);
3661                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3662                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3663                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3664                                         CS_VERBOSE, NULL);
3665                         drbd_md_sync(mdev);
3666                         updated_uuids = 1;
3667                 }
3668                 put_ldev(mdev);
3669         } else if (mdev->state.disk < D_INCONSISTENT &&
3670                    mdev->state.role == R_PRIMARY) {
3671                 /* I am a diskless primary, the peer just created a new current UUID
3672                    for me. */
3673                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3674         }
3675
3676         /* Before we test for the disk state, we should wait until an eventually
3677            ongoing cluster wide state change is finished. That is important if
3678            we are primary and are detaching from our disk. We need to see the
3679            new disk state... */
3680         mutex_lock(mdev->state_mutex);
3681         mutex_unlock(mdev->state_mutex);
3682         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3683                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3684
3685         if (updated_uuids)
3686                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3687
3688         return 0;
3689 }
3690
3691 /**
3692  * convert_state() - Converts the peer's view of the cluster state to our point of view
3693  * @ps:         The state as seen by the peer.
3694  */
3695 static union drbd_state convert_state(union drbd_state ps)
3696 {
3697         union drbd_state ms;
3698
3699         static enum drbd_conns c_tab[] = {
3700                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3701                 [C_CONNECTED] = C_CONNECTED,
3702
3703                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3704                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3705                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3706                 [C_VERIFY_S]       = C_VERIFY_T,
3707                 [C_MASK]   = C_MASK,
3708         };
3709
3710         ms.i = ps.i;
3711
3712         ms.conn = c_tab[ps.conn];
3713         ms.peer = ps.role;
3714         ms.role = ps.peer;
3715         ms.pdsk = ps.disk;
3716         ms.disk = ps.pdsk;
3717         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3718
3719         return ms;
3720 }
3721
3722 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3723 {
3724         struct drbd_conf *mdev;
3725         struct p_req_state *p = pi->data;
3726         union drbd_state mask, val;
3727         enum drbd_state_rv rv;
3728
3729         mdev = vnr_to_mdev(tconn, pi->vnr);
3730         if (!mdev)
3731                 return -EIO;
3732
3733         mask.i = be32_to_cpu(p->mask);
3734         val.i = be32_to_cpu(p->val);
3735
3736         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3737             mutex_is_locked(mdev->state_mutex)) {
3738                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3739                 return 0;
3740         }
3741
3742         mask = convert_state(mask);
3743         val = convert_state(val);
3744
3745         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3746         drbd_send_sr_reply(mdev, rv);
3747
3748         drbd_md_sync(mdev);
3749
3750         return 0;
3751 }
3752
3753 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3754 {
3755         struct p_req_state *p = pi->data;
3756         union drbd_state mask, val;
3757         enum drbd_state_rv rv;
3758
3759         mask.i = be32_to_cpu(p->mask);
3760         val.i = be32_to_cpu(p->val);
3761
3762         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3763             mutex_is_locked(&tconn->cstate_mutex)) {
3764                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3765                 return 0;
3766         }
3767
3768         mask = convert_state(mask);
3769         val = convert_state(val);
3770
3771         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3772         conn_send_sr_reply(tconn, rv);
3773
3774         return 0;
3775 }
3776
3777 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3778 {
3779         struct drbd_conf *mdev;
3780         struct p_state *p = pi->data;
3781         union drbd_state os, ns, peer_state;
3782         enum drbd_disk_state real_peer_disk;
3783         enum chg_state_flags cs_flags;
3784         int rv;
3785
3786         mdev = vnr_to_mdev(tconn, pi->vnr);
3787         if (!mdev)
3788                 return config_unknown_volume(tconn, pi);
3789
3790         peer_state.i = be32_to_cpu(p->state);
3791
3792         real_peer_disk = peer_state.disk;
3793         if (peer_state.disk == D_NEGOTIATING) {
3794                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3795                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3796         }
3797
3798         spin_lock_irq(&mdev->tconn->req_lock);
3799  retry:
3800         os = ns = drbd_read_state(mdev);
3801         spin_unlock_irq(&mdev->tconn->req_lock);
3802
3803         /* If some other part of the code (asender thread, timeout)
3804          * already decided to close the connection again,
3805          * we must not "re-establish" it here. */
3806         if (os.conn <= C_TEAR_DOWN)
3807                 return false;
3808
3809         /* If this is the "end of sync" confirmation, usually the peer disk
3810          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3811          * set) resync started in PausedSyncT, or if the timing of pause-/
3812          * unpause-sync events has been "just right", the peer disk may
3813          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3814          */
3815         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3816             real_peer_disk == D_UP_TO_DATE &&
3817             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3818                 /* If we are (becoming) SyncSource, but peer is still in sync
3819                  * preparation, ignore its uptodate-ness to avoid flapping, it
3820                  * will change to inconsistent once the peer reaches active
3821                  * syncing states.
3822                  * It may have changed syncer-paused flags, however, so we
3823                  * cannot ignore this completely. */
3824                 if (peer_state.conn > C_CONNECTED &&
3825                     peer_state.conn < C_SYNC_SOURCE)
3826                         real_peer_disk = D_INCONSISTENT;
3827
3828                 /* if peer_state changes to connected at the same time,
3829                  * it explicitly notifies us that it finished resync.
3830                  * Maybe we should finish it up, too? */
3831                 else if (os.conn >= C_SYNC_SOURCE &&
3832                          peer_state.conn == C_CONNECTED) {
3833                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3834                                 drbd_resync_finished(mdev);
3835                         return 0;
3836                 }
3837         }
3838
3839         /* peer says his disk is inconsistent, while we think it is uptodate,
3840          * and this happens while the peer still thinks we have a sync going on,
3841          * but we think we are already done with the sync.
3842          * We ignore this to avoid flapping pdsk.
3843          * This should not happen, if the peer is a recent version of drbd. */
3844         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3845             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3846                 real_peer_disk = D_UP_TO_DATE;
3847
3848         if (ns.conn == C_WF_REPORT_PARAMS)
3849                 ns.conn = C_CONNECTED;
3850
3851         if (peer_state.conn == C_AHEAD)
3852                 ns.conn = C_BEHIND;
3853
3854         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3855             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3856                 int cr; /* consider resync */
3857
3858                 /* if we established a new connection */
3859                 cr  = (os.conn < C_CONNECTED);
3860                 /* if we had an established connection
3861                  * and one of the nodes newly attaches a disk */
3862                 cr |= (os.conn == C_CONNECTED &&
3863                        (peer_state.disk == D_NEGOTIATING ||
3864                         os.disk == D_NEGOTIATING));
3865                 /* if we have both been inconsistent, and the peer has been
3866                  * forced to be UpToDate with --overwrite-data */
3867                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3868                 /* if we had been plain connected, and the admin requested to
3869                  * start a sync by "invalidate" or "invalidate-remote" */
3870                 cr |= (os.conn == C_CONNECTED &&
3871                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3872                                  peer_state.conn <= C_WF_BITMAP_T));
3873
3874                 if (cr)
3875                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3876
3877                 put_ldev(mdev);
3878                 if (ns.conn == C_MASK) {
3879                         ns.conn = C_CONNECTED;
3880                         if (mdev->state.disk == D_NEGOTIATING) {
3881                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3882                         } else if (peer_state.disk == D_NEGOTIATING) {
3883                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3884                                 peer_state.disk = D_DISKLESS;
3885                                 real_peer_disk = D_DISKLESS;
3886                         } else {
3887                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3888                                         return -EIO;
3889                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3890                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3891                                 return -EIO;
3892                         }
3893                 }
3894         }
3895
3896         spin_lock_irq(&mdev->tconn->req_lock);
3897         if (os.i != drbd_read_state(mdev).i)
3898                 goto retry;
3899         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3900         ns.peer = peer_state.role;
3901         ns.pdsk = real_peer_disk;
3902         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3903         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3904                 ns.disk = mdev->new_state_tmp.disk;
3905         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3906         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3907             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3908                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3909                    for temporal network outages! */
3910                 spin_unlock_irq(&mdev->tconn->req_lock);
3911                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3912                 tl_clear(mdev->tconn);
3913                 drbd_uuid_new_current(mdev);
3914                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3915                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3916                 return -EIO;
3917         }
3918         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3919         ns = drbd_read_state(mdev);
3920         spin_unlock_irq(&mdev->tconn->req_lock);
3921
3922         if (rv < SS_SUCCESS) {
3923                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3924                 return -EIO;
3925         }
3926
3927         if (os.conn > C_WF_REPORT_PARAMS) {
3928                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3929                     peer_state.disk != D_NEGOTIATING ) {
3930                         /* we want resync, peer has not yet decided to sync... */
3931                         /* Nowadays only used when forcing a node into primary role and
3932                            setting its disk to UpToDate with that */
3933                         drbd_send_uuids(mdev);
3934                         drbd_send_current_state(mdev);
3935                 }
3936         }
3937
3938         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3939
3940         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3941
3942         return 0;
3943 }
3944
3945 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3946 {
3947         struct drbd_conf *mdev;
3948         struct p_rs_uuid *p = pi->data;
3949
3950         mdev = vnr_to_mdev(tconn, pi->vnr);
3951         if (!mdev)
3952                 return -EIO;
3953
3954         wait_event(mdev->misc_wait,
3955                    mdev->state.conn == C_WF_SYNC_UUID ||
3956                    mdev->state.conn == C_BEHIND ||
3957                    mdev->state.conn < C_CONNECTED ||
3958                    mdev->state.disk < D_NEGOTIATING);
3959
3960         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3961
3962         /* Here the _drbd_uuid_ functions are right, current should
3963            _not_ be rotated into the history */
3964         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3965                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3966                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3967
3968                 drbd_print_uuids(mdev, "updated sync uuid");
3969                 drbd_start_resync(mdev, C_SYNC_TARGET);
3970
3971                 put_ldev(mdev);
3972         } else
3973                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3974
3975         return 0;
3976 }
3977
3978 /**
3979  * receive_bitmap_plain
3980  *
3981  * Return 0 when done, 1 when another iteration is needed, and a negative error
3982  * code upon failure.
3983  */
3984 static int
3985 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3986                      unsigned long *p, struct bm_xfer_ctx *c)
3987 {
3988         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3989                                  drbd_header_size(mdev->tconn);
3990         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3991                                        c->bm_words - c->word_offset);
3992         unsigned int want = num_words * sizeof(*p);
3993         int err;
3994
3995         if (want != size) {
3996                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3997                 return -EIO;
3998         }
3999         if (want == 0)
4000                 return 0;
4001         err = drbd_recv_all(mdev->tconn, p, want);
4002         if (err)
4003                 return err;
4004
4005         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4006
4007         c->word_offset += num_words;
4008         c->bit_offset = c->word_offset * BITS_PER_LONG;
4009         if (c->bit_offset > c->bm_bits)
4010                 c->bit_offset = c->bm_bits;
4011
4012         return 1;
4013 }
4014
4015 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4016 {
4017         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4018 }
4019
4020 static int dcbp_get_start(struct p_compressed_bm *p)
4021 {
4022         return (p->encoding & 0x80) != 0;
4023 }
4024
4025 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4026 {
4027         return (p->encoding >> 4) & 0x7;
4028 }
4029
4030 /**
4031  * recv_bm_rle_bits
4032  *
4033  * Return 0 when done, 1 when another iteration is needed, and a negative error
4034  * code upon failure.
4035  */
4036 static int
4037 recv_bm_rle_bits(struct drbd_conf *mdev,
4038                 struct p_compressed_bm *p,
4039                  struct bm_xfer_ctx *c,
4040                  unsigned int len)
4041 {
4042         struct bitstream bs;
4043         u64 look_ahead;
4044         u64 rl;
4045         u64 tmp;
4046         unsigned long s = c->bit_offset;
4047         unsigned long e;
4048         int toggle = dcbp_get_start(p);
4049         int have;
4050         int bits;
4051
4052         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4053
4054         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4055         if (bits < 0)
4056                 return -EIO;
4057
4058         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4059                 bits = vli_decode_bits(&rl, look_ahead);
4060                 if (bits <= 0)
4061                         return -EIO;
4062
4063                 if (toggle) {
4064                         e = s + rl -1;
4065                         if (e >= c->bm_bits) {
4066                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4067                                 return -EIO;
4068                         }
4069                         _drbd_bm_set_bits(mdev, s, e);
4070                 }
4071
4072                 if (have < bits) {
4073                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4074                                 have, bits, look_ahead,
4075                                 (unsigned int)(bs.cur.b - p->code),
4076                                 (unsigned int)bs.buf_len);
4077                         return -EIO;
4078                 }
4079                 look_ahead >>= bits;
4080                 have -= bits;
4081
4082                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4083                 if (bits < 0)
4084                         return -EIO;
4085                 look_ahead |= tmp << have;
4086                 have += bits;
4087         }
4088
4089         c->bit_offset = s;
4090         bm_xfer_ctx_bit_to_word_offset(c);
4091
4092         return (s != c->bm_bits);
4093 }
4094
4095 /**
4096  * decode_bitmap_c
4097  *
4098  * Return 0 when done, 1 when another iteration is needed, and a negative error
4099  * code upon failure.
4100  */
4101 static int
4102 decode_bitmap_c(struct drbd_conf *mdev,
4103                 struct p_compressed_bm *p,
4104                 struct bm_xfer_ctx *c,
4105                 unsigned int len)
4106 {
4107         if (dcbp_get_code(p) == RLE_VLI_Bits)
4108                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4109
4110         /* other variants had been implemented for evaluation,
4111          * but have been dropped as this one turned out to be "best"
4112          * during all our tests. */
4113
4114         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4115         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4116         return -EIO;
4117 }
4118
4119 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4120                 const char *direction, struct bm_xfer_ctx *c)
4121 {
4122         /* what would it take to transfer it "plaintext" */
4123         unsigned int header_size = drbd_header_size(mdev->tconn);
4124         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4125         unsigned int plain =
4126                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4127                 c->bm_words * sizeof(unsigned long);
4128         unsigned int total = c->bytes[0] + c->bytes[1];
4129         unsigned int r;
4130
4131         /* total can not be zero. but just in case: */
4132         if (total == 0)
4133                 return;
4134
4135         /* don't report if not compressed */
4136         if (total >= plain)
4137                 return;
4138
4139         /* total < plain. check for overflow, still */
4140         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4141                                     : (1000 * total / plain);
4142
4143         if (r > 1000)
4144                 r = 1000;
4145
4146         r = 1000 - r;
4147         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4148              "total %u; compression: %u.%u%%\n",
4149                         direction,
4150                         c->bytes[1], c->packets[1],
4151                         c->bytes[0], c->packets[0],
4152                         total, r/10, r % 10);
4153 }
4154
4155 /* Since we are processing the bitfield from lower addresses to higher,
4156    it does not matter if the process it in 32 bit chunks or 64 bit
4157    chunks as long as it is little endian. (Understand it as byte stream,
4158    beginning with the lowest byte...) If we would use big endian
4159    we would need to process it from the highest address to the lowest,
4160    in order to be agnostic to the 32 vs 64 bits issue.
4161
4162    returns 0 on failure, 1 if we successfully received it. */
4163 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4164 {
4165         struct drbd_conf *mdev;
4166         struct bm_xfer_ctx c;
4167         int err;
4168
4169         mdev = vnr_to_mdev(tconn, pi->vnr);
4170         if (!mdev)
4171                 return -EIO;
4172
4173         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4174         /* you are supposed to send additional out-of-sync information
4175          * if you actually set bits during this phase */
4176
4177         c = (struct bm_xfer_ctx) {
4178                 .bm_bits = drbd_bm_bits(mdev),
4179                 .bm_words = drbd_bm_words(mdev),
4180         };
4181
4182         for(;;) {
4183                 if (pi->cmd == P_BITMAP)
4184                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4185                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4186                         /* MAYBE: sanity check that we speak proto >= 90,
4187                          * and the feature is enabled! */
4188                         struct p_compressed_bm *p = pi->data;
4189
4190                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4191                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4192                                 err = -EIO;
4193                                 goto out;
4194                         }
4195                         if (pi->size <= sizeof(*p)) {
4196                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4197                                 err = -EIO;
4198                                 goto out;
4199                         }
4200                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4201                         if (err)
4202                                goto out;
4203                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4204                 } else {
4205                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4206                         err = -EIO;
4207                         goto out;
4208                 }
4209
4210                 c.packets[pi->cmd == P_BITMAP]++;
4211                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4212
4213                 if (err <= 0) {
4214                         if (err < 0)
4215                                 goto out;
4216                         break;
4217                 }
4218                 err = drbd_recv_header(mdev->tconn, pi);
4219                 if (err)
4220                         goto out;
4221         }
4222
4223         INFO_bm_xfer_stats(mdev, "receive", &c);
4224
4225         if (mdev->state.conn == C_WF_BITMAP_T) {
4226                 enum drbd_state_rv rv;
4227
4228                 err = drbd_send_bitmap(mdev);
4229                 if (err)
4230                         goto out;
4231                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4232                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4233                 D_ASSERT(rv == SS_SUCCESS);
4234         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4235                 /* admin may have requested C_DISCONNECTING,
4236                  * other threads may have noticed network errors */
4237                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4238                     drbd_conn_str(mdev->state.conn));
4239         }
4240         err = 0;
4241
4242  out:
4243         drbd_bm_unlock(mdev);
4244         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4245                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4246         return err;
4247 }
4248
4249 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4250 {
4251         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4252                  pi->cmd, pi->size);
4253
4254         return ignore_remaining_packet(tconn, pi);
4255 }
4256
4257 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4258 {
4259         /* Make sure we've acked all the TCP data associated
4260          * with the data requests being unplugged */
4261         drbd_tcp_quickack(tconn->data.socket);
4262
4263         return 0;
4264 }
4265
4266 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4267 {
4268         struct drbd_conf *mdev;
4269         struct p_block_desc *p = pi->data;
4270
4271         mdev = vnr_to_mdev(tconn, pi->vnr);
4272         if (!mdev)
4273                 return -EIO;
4274
4275         switch (mdev->state.conn) {
4276         case C_WF_SYNC_UUID:
4277         case C_WF_BITMAP_T:
4278         case C_BEHIND:
4279                         break;
4280         default:
4281                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4282                                 drbd_conn_str(mdev->state.conn));
4283         }
4284
4285         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4286
4287         return 0;
4288 }
4289
4290 struct data_cmd {
4291         int expect_payload;
4292         size_t pkt_size;
4293         int (*fn)(struct drbd_tconn *, struct packet_info *);
4294 };
4295
4296 static struct data_cmd drbd_cmd_handler[] = {
4297         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4298         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4299         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4300         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4301         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4302         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4303         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4304         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4305         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4306         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4307         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4308         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4309         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4310         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4311         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4312         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4313         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4314         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4315         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4316         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4317         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4318         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4319         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4320         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4321 };
4322
4323 static void drbdd(struct drbd_tconn *tconn)
4324 {
4325         struct packet_info pi;
4326         size_t shs; /* sub header size */
4327         int err;
4328
4329         while (get_t_state(&tconn->receiver) == RUNNING) {
4330                 struct data_cmd *cmd;
4331
4332                 drbd_thread_current_set_cpu(&tconn->receiver);
4333                 if (drbd_recv_header(tconn, &pi))
4334                         goto err_out;
4335
4336                 cmd = &drbd_cmd_handler[pi.cmd];
4337                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4338                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4339                                  cmdname(pi.cmd), pi.cmd);
4340                         goto err_out;
4341                 }
4342
4343                 shs = cmd->pkt_size;
4344                 if (pi.size > shs && !cmd->expect_payload) {
4345                         conn_err(tconn, "No payload expected %s l:%d\n",
4346                                  cmdname(pi.cmd), pi.size);
4347                         goto err_out;
4348                 }
4349
4350                 if (shs) {
4351                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4352                         if (err)
4353                                 goto err_out;
4354                         pi.size -= shs;
4355                 }
4356
4357                 err = cmd->fn(tconn, &pi);
4358                 if (err) {
4359                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4360                                  cmdname(pi.cmd), err, pi.size);
4361                         goto err_out;
4362                 }
4363         }
4364         return;
4365
4366     err_out:
4367         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4368 }
4369
4370 void conn_flush_workqueue(struct drbd_tconn *tconn)
4371 {
4372         struct drbd_wq_barrier barr;
4373
4374         barr.w.cb = w_prev_work_done;
4375         barr.w.tconn = tconn;
4376         init_completion(&barr.done);
4377         drbd_queue_work(&tconn->data.work, &barr.w);
4378         wait_for_completion(&barr.done);
4379 }
4380
4381 static void conn_disconnect(struct drbd_tconn *tconn)
4382 {
4383         struct drbd_conf *mdev;
4384         enum drbd_conns oc;
4385         int vnr;
4386
4387         if (tconn->cstate == C_STANDALONE)
4388                 return;
4389
4390         /* We are about to start the cleanup after connection loss.
4391          * Make sure drbd_make_request knows about that.
4392          * Usually we should be in some network failure state already,
4393          * but just in case we are not, we fix it up here.
4394          */
4395         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4396
4397         /* asender does not clean up anything. it must not interfere, either */
4398         drbd_thread_stop(&tconn->asender);
4399         drbd_free_sock(tconn);
4400
4401         rcu_read_lock();
4402         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4403                 kref_get(&mdev->kref);
4404                 rcu_read_unlock();
4405                 drbd_disconnected(mdev);
4406                 kref_put(&mdev->kref, &drbd_minor_destroy);
4407                 rcu_read_lock();
4408         }
4409         rcu_read_unlock();
4410
4411         if (!list_empty(&tconn->current_epoch->list))
4412                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4413         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4414         atomic_set(&tconn->current_epoch->epoch_size, 0);
4415
4416         conn_info(tconn, "Connection closed\n");
4417
4418         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4419                 conn_try_outdate_peer_async(tconn);
4420
4421         spin_lock_irq(&tconn->req_lock);
4422         oc = tconn->cstate;
4423         if (oc >= C_UNCONNECTED)
4424                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4425
4426         spin_unlock_irq(&tconn->req_lock);
4427
4428         if (oc == C_DISCONNECTING)
4429                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4430 }
4431
4432 static int drbd_disconnected(struct drbd_conf *mdev)
4433 {
4434         unsigned int i;
4435
4436         /* wait for current activity to cease. */
4437         spin_lock_irq(&mdev->tconn->req_lock);
4438         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4439         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4440         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4441         spin_unlock_irq(&mdev->tconn->req_lock);
4442
4443         /* We do not have data structures that would allow us to
4444          * get the rs_pending_cnt down to 0 again.
4445          *  * On C_SYNC_TARGET we do not have any data structures describing
4446          *    the pending RSDataRequest's we have sent.
4447          *  * On C_SYNC_SOURCE there is no data structure that tracks
4448          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4449          *  And no, it is not the sum of the reference counts in the
4450          *  resync_LRU. The resync_LRU tracks the whole operation including
4451          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4452          *  on the fly. */
4453         drbd_rs_cancel_all(mdev);
4454         mdev->rs_total = 0;
4455         mdev->rs_failed = 0;
4456         atomic_set(&mdev->rs_pending_cnt, 0);
4457         wake_up(&mdev->misc_wait);
4458
4459         del_timer_sync(&mdev->resync_timer);
4460         resync_timer_fn((unsigned long)mdev);
4461
4462         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4463          * w_make_resync_request etc. which may still be on the worker queue
4464          * to be "canceled" */
4465         drbd_flush_workqueue(mdev);
4466
4467         drbd_finish_peer_reqs(mdev);
4468
4469         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4470            might have issued a work again. The one before drbd_finish_peer_reqs() is
4471            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4472         drbd_flush_workqueue(mdev);
4473
4474         kfree(mdev->p_uuid);
4475         mdev->p_uuid = NULL;
4476
4477         if (!drbd_suspended(mdev))
4478                 tl_clear(mdev->tconn);
4479
4480         drbd_md_sync(mdev);
4481
4482         /* serialize with bitmap writeout triggered by the state change,
4483          * if any. */
4484         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4485
4486         /* tcp_close and release of sendpage pages can be deferred.  I don't
4487          * want to use SO_LINGER, because apparently it can be deferred for
4488          * more than 20 seconds (longest time I checked).
4489          *
4490          * Actually we don't care for exactly when the network stack does its
4491          * put_page(), but release our reference on these pages right here.
4492          */
4493         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4494         if (i)
4495                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4496         i = atomic_read(&mdev->pp_in_use_by_net);
4497         if (i)
4498                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4499         i = atomic_read(&mdev->pp_in_use);
4500         if (i)
4501                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4502
4503         D_ASSERT(list_empty(&mdev->read_ee));
4504         D_ASSERT(list_empty(&mdev->active_ee));
4505         D_ASSERT(list_empty(&mdev->sync_ee));
4506         D_ASSERT(list_empty(&mdev->done_ee));
4507
4508         return 0;
4509 }
4510
4511 /*
4512  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4513  * we can agree on is stored in agreed_pro_version.
4514  *
4515  * feature flags and the reserved array should be enough room for future
4516  * enhancements of the handshake protocol, and possible plugins...
4517  *
4518  * for now, they are expected to be zero, but ignored.
4519  */
4520 static int drbd_send_features(struct drbd_tconn *tconn)
4521 {
4522         struct drbd_socket *sock;
4523         struct p_connection_features *p;
4524
4525         sock = &tconn->data;
4526         p = conn_prepare_command(tconn, sock);
4527         if (!p)
4528                 return -EIO;
4529         memset(p, 0, sizeof(*p));
4530         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4531         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4532         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4533 }
4534
4535 /*
4536  * return values:
4537  *   1 yes, we have a valid connection
4538  *   0 oops, did not work out, please try again
4539  *  -1 peer talks different language,
4540  *     no point in trying again, please go standalone.
4541  */
4542 static int drbd_do_features(struct drbd_tconn *tconn)
4543 {
4544         /* ASSERT current == tconn->receiver ... */
4545         struct p_connection_features *p;
4546         const int expect = sizeof(struct p_connection_features);
4547         struct packet_info pi;
4548         int err;
4549
4550         err = drbd_send_features(tconn);
4551         if (err)
4552                 return 0;
4553
4554         err = drbd_recv_header(tconn, &pi);
4555         if (err)
4556                 return 0;
4557
4558         if (pi.cmd != P_CONNECTION_FEATURES) {
4559                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4560                          cmdname(pi.cmd), pi.cmd);
4561                 return -1;
4562         }
4563
4564         if (pi.size != expect) {
4565                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4566                      expect, pi.size);
4567                 return -1;
4568         }
4569
4570         p = pi.data;
4571         err = drbd_recv_all_warn(tconn, p, expect);
4572         if (err)
4573                 return 0;
4574
4575         p->protocol_min = be32_to_cpu(p->protocol_min);
4576         p->protocol_max = be32_to_cpu(p->protocol_max);
4577         if (p->protocol_max == 0)
4578                 p->protocol_max = p->protocol_min;
4579
4580         if (PRO_VERSION_MAX < p->protocol_min ||
4581             PRO_VERSION_MIN > p->protocol_max)
4582                 goto incompat;
4583
4584         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4585
4586         conn_info(tconn, "Handshake successful: "
4587              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4588
4589         return 1;
4590
4591  incompat:
4592         conn_err(tconn, "incompatible DRBD dialects: "
4593             "I support %d-%d, peer supports %d-%d\n",
4594             PRO_VERSION_MIN, PRO_VERSION_MAX,
4595             p->protocol_min, p->protocol_max);
4596         return -1;
4597 }
4598
4599 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4600 static int drbd_do_auth(struct drbd_tconn *tconn)
4601 {
4602         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4603         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4604         return -1;
4605 }
4606 #else
4607 #define CHALLENGE_LEN 64
4608
4609 /* Return value:
4610         1 - auth succeeded,
4611         0 - failed, try again (network error),
4612         -1 - auth failed, don't try again.
4613 */
4614
4615 static int drbd_do_auth(struct drbd_tconn *tconn)
4616 {
4617         struct drbd_socket *sock;
4618         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4619         struct scatterlist sg;
4620         char *response = NULL;
4621         char *right_response = NULL;
4622         char *peers_ch = NULL;
4623         unsigned int key_len;
4624         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4625         unsigned int resp_size;
4626         struct hash_desc desc;
4627         struct packet_info pi;
4628         struct net_conf *nc;
4629         int err, rv;
4630
4631         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4632
4633         rcu_read_lock();
4634         nc = rcu_dereference(tconn->net_conf);
4635         key_len = strlen(nc->shared_secret);
4636         memcpy(secret, nc->shared_secret, key_len);
4637         rcu_read_unlock();
4638
4639         desc.tfm = tconn->cram_hmac_tfm;
4640         desc.flags = 0;
4641
4642         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4643         if (rv) {
4644                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4645                 rv = -1;
4646                 goto fail;
4647         }
4648
4649         get_random_bytes(my_challenge, CHALLENGE_LEN);
4650
4651         sock = &tconn->data;
4652         if (!conn_prepare_command(tconn, sock)) {
4653                 rv = 0;
4654                 goto fail;
4655         }
4656         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4657                                 my_challenge, CHALLENGE_LEN);
4658         if (!rv)
4659                 goto fail;
4660
4661         err = drbd_recv_header(tconn, &pi);
4662         if (err) {
4663                 rv = 0;
4664                 goto fail;
4665         }
4666
4667         if (pi.cmd != P_AUTH_CHALLENGE) {
4668                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4669                          cmdname(pi.cmd), pi.cmd);
4670                 rv = 0;
4671                 goto fail;
4672         }
4673
4674         if (pi.size > CHALLENGE_LEN * 2) {
4675                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4676                 rv = -1;
4677                 goto fail;
4678         }
4679
4680         peers_ch = kmalloc(pi.size, GFP_NOIO);
4681         if (peers_ch == NULL) {
4682                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4683                 rv = -1;
4684                 goto fail;
4685         }
4686
4687         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4688         if (err) {
4689                 rv = 0;
4690                 goto fail;
4691         }
4692
4693         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4694         response = kmalloc(resp_size, GFP_NOIO);
4695         if (response == NULL) {
4696                 conn_err(tconn, "kmalloc of response failed\n");
4697                 rv = -1;
4698                 goto fail;
4699         }
4700
4701         sg_init_table(&sg, 1);
4702         sg_set_buf(&sg, peers_ch, pi.size);
4703
4704         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4705         if (rv) {
4706                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4707                 rv = -1;
4708                 goto fail;
4709         }
4710
4711         if (!conn_prepare_command(tconn, sock)) {
4712                 rv = 0;
4713                 goto fail;
4714         }
4715         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4716                                 response, resp_size);
4717         if (!rv)
4718                 goto fail;
4719
4720         err = drbd_recv_header(tconn, &pi);
4721         if (err) {
4722                 rv = 0;
4723                 goto fail;
4724         }
4725
4726         if (pi.cmd != P_AUTH_RESPONSE) {
4727                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4728                          cmdname(pi.cmd), pi.cmd);
4729                 rv = 0;
4730                 goto fail;
4731         }
4732
4733         if (pi.size != resp_size) {
4734                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4735                 rv = 0;
4736                 goto fail;
4737         }
4738
4739         err = drbd_recv_all_warn(tconn, response , resp_size);
4740         if (err) {
4741                 rv = 0;
4742                 goto fail;
4743         }
4744
4745         right_response = kmalloc(resp_size, GFP_NOIO);
4746         if (right_response == NULL) {
4747                 conn_err(tconn, "kmalloc of right_response failed\n");
4748                 rv = -1;
4749                 goto fail;
4750         }
4751
4752         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4753
4754         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4755         if (rv) {
4756                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4757                 rv = -1;
4758                 goto fail;
4759         }
4760
4761         rv = !memcmp(response, right_response, resp_size);
4762
4763         if (rv)
4764                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4765                      resp_size);
4766         else
4767                 rv = -1;
4768
4769  fail:
4770         kfree(peers_ch);
4771         kfree(response);
4772         kfree(right_response);
4773
4774         return rv;
4775 }
4776 #endif
4777
4778 int drbdd_init(struct drbd_thread *thi)
4779 {
4780         struct drbd_tconn *tconn = thi->tconn;
4781         int h;
4782
4783         conn_info(tconn, "receiver (re)started\n");
4784
4785         do {
4786                 h = conn_connect(tconn);
4787                 if (h == 0) {
4788                         conn_disconnect(tconn);
4789                         schedule_timeout_interruptible(HZ);
4790                 }
4791                 if (h == -1) {
4792                         conn_warn(tconn, "Discarding network configuration.\n");
4793                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4794                 }
4795         } while (h == 0);
4796
4797         if (h > 0)
4798                 drbdd(tconn);
4799
4800         conn_disconnect(tconn);
4801
4802         conn_info(tconn, "receiver terminated\n");
4803         return 0;
4804 }
4805
4806 /* ********* acknowledge sender ******** */
4807
4808 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4809 {
4810         struct p_req_state_reply *p = pi->data;
4811         int retcode = be32_to_cpu(p->retcode);
4812
4813         if (retcode >= SS_SUCCESS) {
4814                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4815         } else {
4816                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4817                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4818                          drbd_set_st_err_str(retcode), retcode);
4819         }
4820         wake_up(&tconn->ping_wait);
4821
4822         return 0;
4823 }
4824
4825 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4826 {
4827         struct drbd_conf *mdev;
4828         struct p_req_state_reply *p = pi->data;
4829         int retcode = be32_to_cpu(p->retcode);
4830
4831         mdev = vnr_to_mdev(tconn, pi->vnr);
4832         if (!mdev)
4833                 return -EIO;
4834
4835         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4836                 D_ASSERT(tconn->agreed_pro_version < 100);
4837                 return got_conn_RqSReply(tconn, pi);
4838         }
4839
4840         if (retcode >= SS_SUCCESS) {
4841                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4842         } else {
4843                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4844                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4845                         drbd_set_st_err_str(retcode), retcode);
4846         }
4847         wake_up(&mdev->state_wait);
4848
4849         return 0;
4850 }
4851
4852 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4853 {
4854         return drbd_send_ping_ack(tconn);
4855
4856 }
4857
4858 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4859 {
4860         /* restore idle timeout */
4861         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4862         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4863                 wake_up(&tconn->ping_wait);
4864
4865         return 0;
4866 }
4867
4868 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4869 {
4870         struct drbd_conf *mdev;
4871         struct p_block_ack *p = pi->data;
4872         sector_t sector = be64_to_cpu(p->sector);
4873         int blksize = be32_to_cpu(p->blksize);
4874
4875         mdev = vnr_to_mdev(tconn, pi->vnr);
4876         if (!mdev)
4877                 return -EIO;
4878
4879         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4880
4881         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4882
4883         if (get_ldev(mdev)) {
4884                 drbd_rs_complete_io(mdev, sector);
4885                 drbd_set_in_sync(mdev, sector, blksize);
4886                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4887                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4888                 put_ldev(mdev);
4889         }
4890         dec_rs_pending(mdev);
4891         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4892
4893         return 0;
4894 }
4895
4896 static int
4897 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4898                               struct rb_root *root, const char *func,
4899                               enum drbd_req_event what, bool missing_ok)
4900 {
4901         struct drbd_request *req;
4902         struct bio_and_error m;
4903
4904         spin_lock_irq(&mdev->tconn->req_lock);
4905         req = find_request(mdev, root, id, sector, missing_ok, func);
4906         if (unlikely(!req)) {
4907                 spin_unlock_irq(&mdev->tconn->req_lock);
4908                 return -EIO;
4909         }
4910         __req_mod(req, what, &m);
4911         spin_unlock_irq(&mdev->tconn->req_lock);
4912
4913         if (m.bio)
4914                 complete_master_bio(mdev, &m);
4915         return 0;
4916 }
4917
4918 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4919 {
4920         struct drbd_conf *mdev;
4921         struct p_block_ack *p = pi->data;
4922         sector_t sector = be64_to_cpu(p->sector);
4923         int blksize = be32_to_cpu(p->blksize);
4924         enum drbd_req_event what;
4925
4926         mdev = vnr_to_mdev(tconn, pi->vnr);
4927         if (!mdev)
4928                 return -EIO;
4929
4930         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4931
4932         if (p->block_id == ID_SYNCER) {
4933                 drbd_set_in_sync(mdev, sector, blksize);
4934                 dec_rs_pending(mdev);
4935                 return 0;
4936         }
4937         switch (pi->cmd) {
4938         case P_RS_WRITE_ACK:
4939                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4940                 break;
4941         case P_WRITE_ACK:
4942                 what = WRITE_ACKED_BY_PEER;
4943                 break;
4944         case P_RECV_ACK:
4945                 what = RECV_ACKED_BY_PEER;
4946                 break;
4947         case P_DISCARD_WRITE:
4948                 what = DISCARD_WRITE;
4949                 break;
4950         case P_RETRY_WRITE:
4951                 what = POSTPONE_WRITE;
4952                 break;
4953         default:
4954                 BUG();
4955         }
4956
4957         return validate_req_change_req_state(mdev, p->block_id, sector,
4958                                              &mdev->write_requests, __func__,
4959                                              what, false);
4960 }
4961
4962 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4963 {
4964         struct drbd_conf *mdev;
4965         struct p_block_ack *p = pi->data;
4966         sector_t sector = be64_to_cpu(p->sector);
4967         int size = be32_to_cpu(p->blksize);
4968         int err;
4969
4970         mdev = vnr_to_mdev(tconn, pi->vnr);
4971         if (!mdev)
4972                 return -EIO;
4973
4974         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4975
4976         if (p->block_id == ID_SYNCER) {
4977                 dec_rs_pending(mdev);
4978                 drbd_rs_failed_io(mdev, sector, size);
4979                 return 0;
4980         }
4981
4982         err = validate_req_change_req_state(mdev, p->block_id, sector,
4983                                             &mdev->write_requests, __func__,
4984                                             NEG_ACKED, true);
4985         if (err) {
4986                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4987                    The master bio might already be completed, therefore the
4988                    request is no longer in the collision hash. */
4989                 /* In Protocol B we might already have got a P_RECV_ACK
4990                    but then get a P_NEG_ACK afterwards. */
4991                 drbd_set_out_of_sync(mdev, sector, size);
4992         }
4993         return 0;
4994 }
4995
4996 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4997 {
4998         struct drbd_conf *mdev;
4999         struct p_block_ack *p = pi->data;
5000         sector_t sector = be64_to_cpu(p->sector);
5001
5002         mdev = vnr_to_mdev(tconn, pi->vnr);
5003         if (!mdev)
5004                 return -EIO;
5005
5006         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5007
5008         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5009             (unsigned long long)sector, be32_to_cpu(p->blksize));
5010
5011         return validate_req_change_req_state(mdev, p->block_id, sector,
5012                                              &mdev->read_requests, __func__,
5013                                              NEG_ACKED, false);
5014 }
5015
5016 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5017 {
5018         struct drbd_conf *mdev;
5019         sector_t sector;
5020         int size;
5021         struct p_block_ack *p = pi->data;
5022
5023         mdev = vnr_to_mdev(tconn, pi->vnr);
5024         if (!mdev)
5025                 return -EIO;
5026
5027         sector = be64_to_cpu(p->sector);
5028         size = be32_to_cpu(p->blksize);
5029
5030         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5031
5032         dec_rs_pending(mdev);
5033
5034         if (get_ldev_if_state(mdev, D_FAILED)) {
5035                 drbd_rs_complete_io(mdev, sector);
5036                 switch (pi->cmd) {
5037                 case P_NEG_RS_DREPLY:
5038                         drbd_rs_failed_io(mdev, sector, size);
5039                 case P_RS_CANCEL:
5040                         break;
5041                 default:
5042                         BUG();
5043                 }
5044                 put_ldev(mdev);
5045         }
5046
5047         return 0;
5048 }
5049
5050 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5051 {
5052         struct p_barrier_ack *p = pi->data;
5053         struct drbd_conf *mdev;
5054         int vnr;
5055
5056         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5057
5058         rcu_read_lock();
5059         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5060                 if (mdev->state.conn == C_AHEAD &&
5061                     atomic_read(&mdev->ap_in_flight) == 0 &&
5062                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5063                         mdev->start_resync_timer.expires = jiffies + HZ;
5064                         add_timer(&mdev->start_resync_timer);
5065                 }
5066         }
5067         rcu_read_unlock();
5068
5069         return 0;
5070 }
5071
5072 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5073 {
5074         struct drbd_conf *mdev;
5075         struct p_block_ack *p = pi->data;
5076         struct drbd_work *w;
5077         sector_t sector;
5078         int size;
5079
5080         mdev = vnr_to_mdev(tconn, pi->vnr);
5081         if (!mdev)
5082                 return -EIO;
5083
5084         sector = be64_to_cpu(p->sector);
5085         size = be32_to_cpu(p->blksize);
5086
5087         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5088
5089         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5090                 drbd_ov_out_of_sync_found(mdev, sector, size);
5091         else
5092                 ov_out_of_sync_print(mdev);
5093
5094         if (!get_ldev(mdev))
5095                 return 0;
5096
5097         drbd_rs_complete_io(mdev, sector);
5098         dec_rs_pending(mdev);
5099
5100         --mdev->ov_left;
5101
5102         /* let's advance progress step marks only for every other megabyte */
5103         if ((mdev->ov_left & 0x200) == 0x200)
5104                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5105
5106         if (mdev->ov_left == 0) {
5107                 w = kmalloc(sizeof(*w), GFP_NOIO);
5108                 if (w) {
5109                         w->cb = w_ov_finished;
5110                         w->mdev = mdev;
5111                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5112                 } else {
5113                         dev_err(DEV, "kmalloc(w) failed.");
5114                         ov_out_of_sync_print(mdev);
5115                         drbd_resync_finished(mdev);
5116                 }
5117         }
5118         put_ldev(mdev);
5119         return 0;
5120 }
5121
5122 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5123 {
5124         return 0;
5125 }
5126
5127 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5128 {
5129         struct drbd_conf *mdev;
5130         int vnr, not_empty = 0;
5131
5132         do {
5133                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5134                 flush_signals(current);
5135
5136                 rcu_read_lock();
5137                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5138                         kref_get(&mdev->kref);
5139                         rcu_read_unlock();
5140                         if (drbd_finish_peer_reqs(mdev)) {
5141                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5142                                 return 1;
5143                         }
5144                         kref_put(&mdev->kref, &drbd_minor_destroy);
5145                         rcu_read_lock();
5146                 }
5147                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5148
5149                 spin_lock_irq(&tconn->req_lock);
5150                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5151                         not_empty = !list_empty(&mdev->done_ee);
5152                         if (not_empty)
5153                                 break;
5154                 }
5155                 spin_unlock_irq(&tconn->req_lock);
5156                 rcu_read_unlock();
5157         } while (not_empty);
5158
5159         return 0;
5160 }
5161
5162 struct asender_cmd {
5163         size_t pkt_size;
5164         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5165 };
5166
5167 static struct asender_cmd asender_tbl[] = {
5168         [P_PING]            = { 0, got_Ping },
5169         [P_PING_ACK]        = { 0, got_PingAck },
5170         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5171         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5172         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5173         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5174         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5175         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5176         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5177         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5178         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5179         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5180         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5181         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5182         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5183         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5184         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5185 };
5186
5187 int drbd_asender(struct drbd_thread *thi)
5188 {
5189         struct drbd_tconn *tconn = thi->tconn;
5190         struct asender_cmd *cmd = NULL;
5191         struct packet_info pi;
5192         int rv;
5193         void *buf    = tconn->meta.rbuf;
5194         int received = 0;
5195         unsigned int header_size = drbd_header_size(tconn);
5196         int expect   = header_size;
5197         bool ping_timeout_active = false;
5198         struct net_conf *nc;
5199         int ping_timeo, tcp_cork, ping_int;
5200
5201         current->policy = SCHED_RR;  /* Make this a realtime task! */
5202         current->rt_priority = 2;    /* more important than all other tasks */
5203
5204         while (get_t_state(thi) == RUNNING) {
5205                 drbd_thread_current_set_cpu(thi);
5206
5207                 rcu_read_lock();
5208                 nc = rcu_dereference(tconn->net_conf);
5209                 ping_timeo = nc->ping_timeo;
5210                 tcp_cork = nc->tcp_cork;
5211                 ping_int = nc->ping_int;
5212                 rcu_read_unlock();
5213
5214                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5215                         if (drbd_send_ping(tconn)) {
5216                                 conn_err(tconn, "drbd_send_ping has failed\n");
5217                                 goto reconnect;
5218                         }
5219                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5220                         ping_timeout_active = true;
5221                 }
5222
5223                 /* TODO: conditionally cork; it may hurt latency if we cork without
5224                    much to send */
5225                 if (tcp_cork)
5226                         drbd_tcp_cork(tconn->meta.socket);
5227                 if (tconn_finish_peer_reqs(tconn)) {
5228                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5229                         goto reconnect;
5230                 }
5231                 /* but unconditionally uncork unless disabled */
5232                 if (tcp_cork)
5233                         drbd_tcp_uncork(tconn->meta.socket);
5234
5235                 /* short circuit, recv_msg would return EINTR anyways. */
5236                 if (signal_pending(current))
5237                         continue;
5238
5239                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5240                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5241
5242                 flush_signals(current);
5243
5244                 /* Note:
5245                  * -EINTR        (on meta) we got a signal
5246                  * -EAGAIN       (on meta) rcvtimeo expired
5247                  * -ECONNRESET   other side closed the connection
5248                  * -ERESTARTSYS  (on data) we got a signal
5249                  * rv <  0       other than above: unexpected error!
5250                  * rv == expected: full header or command
5251                  * rv <  expected: "woken" by signal during receive
5252                  * rv == 0       : "connection shut down by peer"
5253                  */
5254                 if (likely(rv > 0)) {
5255                         received += rv;
5256                         buf      += rv;
5257                 } else if (rv == 0) {
5258                         conn_err(tconn, "meta connection shut down by peer.\n");
5259                         goto reconnect;
5260                 } else if (rv == -EAGAIN) {
5261                         /* If the data socket received something meanwhile,
5262                          * that is good enough: peer is still alive. */
5263                         if (time_after(tconn->last_received,
5264                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5265                                 continue;
5266                         if (ping_timeout_active) {
5267                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5268                                 goto reconnect;
5269                         }
5270                         set_bit(SEND_PING, &tconn->flags);
5271                         continue;
5272                 } else if (rv == -EINTR) {
5273                         continue;
5274                 } else {
5275                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5276                         goto reconnect;
5277                 }
5278
5279                 if (received == expect && cmd == NULL) {
5280                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5281                                 goto reconnect;
5282                         cmd = &asender_tbl[pi.cmd];
5283                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5284                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5285                                          cmdname(pi.cmd), pi.cmd);
5286                                 goto disconnect;
5287                         }
5288                         expect = header_size + cmd->pkt_size;
5289                         if (pi.size != expect - header_size) {
5290                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5291                                         pi.cmd, pi.size);
5292                                 goto reconnect;
5293                         }
5294                 }
5295                 if (received == expect) {
5296                         bool err;
5297
5298                         err = cmd->fn(tconn, &pi);
5299                         if (err) {
5300                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5301                                 goto reconnect;
5302                         }
5303
5304                         tconn->last_received = jiffies;
5305
5306                         if (cmd == &asender_tbl[P_PING_ACK]) {
5307                                 /* restore idle timeout */
5308                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5309                                 ping_timeout_active = false;
5310                         }
5311
5312                         buf      = tconn->meta.rbuf;
5313                         received = 0;
5314                         expect   = header_size;
5315                         cmd      = NULL;
5316                 }
5317         }
5318
5319         if (0) {
5320 reconnect:
5321                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5322         }
5323         if (0) {
5324 disconnect:
5325                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5326         }
5327         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5328
5329         conn_info(tconn, "asender terminated\n");
5330
5331         return 0;
5332 }