drbd: Move list of epochs from mdev to tconn
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629         sndbuf_size = nc->sndbuf_size;
630         rcvbuf_size = nc->rcvbuf_size;
631         connect_int = nc->connect_int;
632         rcu_read_unlock();
633
634         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
644
645         what = "sock_create_kern";
646         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
647                                SOCK_STREAM, IPPROTO_TCP, &sock);
648         if (err < 0) {
649                 sock = NULL;
650                 goto out;
651         }
652
653         sock->sk->sk_rcvtimeo =
654         sock->sk->sk_sndtimeo = connect_int * HZ;
655         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
656
657        /* explicitly bind to the configured IP as source IP
658         *  for the outgoing connections.
659         *  This is needed for multihomed hosts and to be
660         *  able to use lo: interfaces for drbd.
661         * Make sure to use 0 as port number, so linux selects
662         *  a free one dynamically.
663         */
664         what = "bind before connect";
665         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
666         if (err < 0)
667                 goto out;
668
669         /* connect may fail, peer not yet available.
670          * stay C_WF_CONNECTION, don't go Disconnecting! */
671         disconnect_on_error = 0;
672         what = "connect";
673         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
674
675 out:
676         if (err < 0) {
677                 if (sock) {
678                         sock_release(sock);
679                         sock = NULL;
680                 }
681                 switch (-err) {
682                         /* timeout, busy, signal pending */
683                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
684                 case EINTR: case ERESTARTSYS:
685                         /* peer not (yet) available, network problem */
686                 case ECONNREFUSED: case ENETUNREACH:
687                 case EHOSTDOWN:    case EHOSTUNREACH:
688                         disconnect_on_error = 0;
689                         break;
690                 default:
691                         conn_err(tconn, "%s failed, err = %d\n", what, err);
692                 }
693                 if (disconnect_on_error)
694                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
695         }
696
697         return sock;
698 }
699
700 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
701 {
702         int timeo, err, my_addr_len;
703         int sndbuf_size, rcvbuf_size, connect_int;
704         struct socket *s_estab = NULL, *s_listen;
705         struct sockaddr_in6 my_addr;
706         struct net_conf *nc;
707         const char *what;
708
709         rcu_read_lock();
710         nc = rcu_dereference(tconn->net_conf);
711         if (!nc) {
712                 rcu_read_unlock();
713                 return NULL;
714         }
715         sndbuf_size = nc->sndbuf_size;
716         rcvbuf_size = nc->rcvbuf_size;
717         connect_int = nc->connect_int;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
725                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         timeo = connect_int * HZ;
732         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
733
734         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
735         s_listen->sk->sk_rcvtimeo = timeo;
736         s_listen->sk->sk_sndtimeo = timeo;
737         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
738
739         what = "bind before listen";
740         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
741         if (err < 0)
742                 goto out;
743
744         err = drbd_accept(&what, s_listen, &s_estab);
745
746 out:
747         if (s_listen)
748                 sock_release(s_listen);
749         if (err < 0) {
750                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
751                         conn_err(tconn, "%s failed, err = %d\n", what, err);
752                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
753                 }
754         }
755
756         return s_estab;
757 }
758
759 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
760
761 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
762                              enum drbd_packet cmd)
763 {
764         if (!conn_prepare_command(tconn, sock))
765                 return -EIO;
766         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
767 }
768
769 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
770 {
771         unsigned int header_size = drbd_header_size(tconn);
772         struct packet_info pi;
773         int err;
774
775         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
776         if (err != header_size) {
777                 if (err >= 0)
778                         err = -EIO;
779                 return err;
780         }
781         err = decode_header(tconn, tconn->data.rbuf, &pi);
782         if (err)
783                 return err;
784         return pi.cmd;
785 }
786
787 /**
788  * drbd_socket_okay() - Free the socket if its connection is not okay
789  * @sock:       pointer to the pointer to the socket.
790  */
791 static int drbd_socket_okay(struct socket **sock)
792 {
793         int rr;
794         char tb[4];
795
796         if (!*sock)
797                 return false;
798
799         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
800
801         if (rr > 0 || rr == -EAGAIN) {
802                 return true;
803         } else {
804                 sock_release(*sock);
805                 *sock = NULL;
806                 return false;
807         }
808 }
809 /* Gets called if a connection is established, or if a new minor gets created
810    in a connection */
811 int drbd_connected(struct drbd_conf *mdev)
812 {
813         int err;
814
815         atomic_set(&mdev->packet_seq, 0);
816         mdev->peer_seq = 0;
817
818         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
819                 &mdev->tconn->cstate_mutex :
820                 &mdev->own_state_mutex;
821
822         err = drbd_send_sync_param(mdev);
823         if (!err)
824                 err = drbd_send_sizes(mdev, 0, 0);
825         if (!err)
826                 err = drbd_send_uuids(mdev);
827         if (!err)
828                 err = drbd_send_current_state(mdev);
829         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
830         clear_bit(RESIZE_PENDING, &mdev->flags);
831         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
832         return err;
833 }
834
835 /*
836  * return values:
837  *   1 yes, we have a valid connection
838  *   0 oops, did not work out, please try again
839  *  -1 peer talks different language,
840  *     no point in trying again, please go standalone.
841  *  -2 We do not have a network config...
842  */
843 static int conn_connect(struct drbd_tconn *tconn)
844 {
845         struct socket *sock, *msock;
846         struct drbd_conf *mdev;
847         struct net_conf *nc;
848         int vnr, timeout, try, h, ok;
849         bool discard_my_data;
850
851         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
852                 return -2;
853
854         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
855
856         /* Assume that the peer only understands protocol 80 until we know better.  */
857         tconn->agreed_pro_version = 80;
858
859         do {
860                 struct socket *s;
861
862                 for (try = 0;;) {
863                         /* 3 tries, this should take less than a second! */
864                         s = drbd_try_connect(tconn);
865                         if (s || ++try >= 3)
866                                 break;
867                         /* give the other side time to call bind() & listen() */
868                         schedule_timeout_interruptible(HZ / 10);
869                 }
870
871                 if (s) {
872                         if (!tconn->data.socket) {
873                                 tconn->data.socket = s;
874                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
875                         } else if (!tconn->meta.socket) {
876                                 tconn->meta.socket = s;
877                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
878                         } else {
879                                 conn_err(tconn, "Logic error in conn_connect()\n");
880                                 goto out_release_sockets;
881                         }
882                 }
883
884                 if (tconn->data.socket && tconn->meta.socket) {
885                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
886                         ok = drbd_socket_okay(&tconn->data.socket);
887                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
888                         if (ok)
889                                 break;
890                 }
891
892 retry:
893                 s = drbd_wait_for_connect(tconn);
894                 if (s) {
895                         try = receive_first_packet(tconn, s);
896                         drbd_socket_okay(&tconn->data.socket);
897                         drbd_socket_okay(&tconn->meta.socket);
898                         switch (try) {
899                         case P_INITIAL_DATA:
900                                 if (tconn->data.socket) {
901                                         conn_warn(tconn, "initial packet S crossed\n");
902                                         sock_release(tconn->data.socket);
903                                 }
904                                 tconn->data.socket = s;
905                                 break;
906                         case P_INITIAL_META:
907                                 if (tconn->meta.socket) {
908                                         conn_warn(tconn, "initial packet M crossed\n");
909                                         sock_release(tconn->meta.socket);
910                                 }
911                                 tconn->meta.socket = s;
912                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
913                                 break;
914                         default:
915                                 conn_warn(tconn, "Error receiving initial packet\n");
916                                 sock_release(s);
917                                 if (random32() & 1)
918                                         goto retry;
919                         }
920                 }
921
922                 if (tconn->cstate <= C_DISCONNECTING)
923                         goto out_release_sockets;
924                 if (signal_pending(current)) {
925                         flush_signals(current);
926                         smp_rmb();
927                         if (get_t_state(&tconn->receiver) == EXITING)
928                                 goto out_release_sockets;
929                 }
930
931                 if (tconn->data.socket && &tconn->meta.socket) {
932                         ok = drbd_socket_okay(&tconn->data.socket);
933                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
934                         if (ok)
935                                 break;
936                 }
937         } while (1);
938
939         sock  = tconn->data.socket;
940         msock = tconn->meta.socket;
941
942         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
943         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
944
945         sock->sk->sk_allocation = GFP_NOIO;
946         msock->sk->sk_allocation = GFP_NOIO;
947
948         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
949         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
950
951         /* NOT YET ...
952          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
953          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
954          * first set it to the P_CONNECTION_FEATURES timeout,
955          * which we set to 4x the configured ping_timeout. */
956         rcu_read_lock();
957         nc = rcu_dereference(tconn->net_conf);
958
959         sock->sk->sk_sndtimeo =
960         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
961
962         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
963         timeout = nc->timeout * HZ / 10;
964         discard_my_data = nc->discard_my_data;
965         rcu_read_unlock();
966
967         msock->sk->sk_sndtimeo = timeout;
968
969         /* we don't want delays.
970          * we use TCP_CORK where appropriate, though */
971         drbd_tcp_nodelay(sock);
972         drbd_tcp_nodelay(msock);
973
974         tconn->last_received = jiffies;
975
976         h = drbd_do_features(tconn);
977         if (h <= 0)
978                 return h;
979
980         if (tconn->cram_hmac_tfm) {
981                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
982                 switch (drbd_do_auth(tconn)) {
983                 case -1:
984                         conn_err(tconn, "Authentication of peer failed\n");
985                         return -1;
986                 case 0:
987                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
988                         return 0;
989                 }
990         }
991
992         sock->sk->sk_sndtimeo = timeout;
993         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
994
995         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
996                 return -1;
997
998         rcu_read_lock();
999         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1000                 kref_get(&mdev->kref);
1001                 rcu_read_unlock();
1002
1003                 if (discard_my_data)
1004                         set_bit(DISCARD_MY_DATA, &mdev->flags);
1005                 else
1006                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
1007
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
1015                 return 0;
1016
1017         drbd_thread_start(&tconn->asender);
1018
1019         mutex_lock(&tconn->conf_update);
1020         /* The discard_my_data flag is a single-shot modifier to the next
1021          * connection attempt, the handshake of which is now well underway.
1022          * No need for rcu style copying of the whole struct
1023          * just to clear a single value. */
1024         tconn->net_conf->discard_my_data = 0;
1025         mutex_unlock(&tconn->conf_update);
1026
1027         return h;
1028
1029 out_release_sockets:
1030         if (tconn->data.socket) {
1031                 sock_release(tconn->data.socket);
1032                 tconn->data.socket = NULL;
1033         }
1034         if (tconn->meta.socket) {
1035                 sock_release(tconn->meta.socket);
1036                 tconn->meta.socket = NULL;
1037         }
1038         return -1;
1039 }
1040
1041 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1042 {
1043         unsigned int header_size = drbd_header_size(tconn);
1044
1045         if (header_size == sizeof(struct p_header100) &&
1046             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1047                 struct p_header100 *h = header;
1048                 if (h->pad != 0) {
1049                         conn_err(tconn, "Header padding is not zero\n");
1050                         return -EINVAL;
1051                 }
1052                 pi->vnr = be16_to_cpu(h->volume);
1053                 pi->cmd = be16_to_cpu(h->command);
1054                 pi->size = be32_to_cpu(h->length);
1055         } else if (header_size == sizeof(struct p_header95) &&
1056                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1057                 struct p_header95 *h = header;
1058                 pi->cmd = be16_to_cpu(h->command);
1059                 pi->size = be32_to_cpu(h->length);
1060                 pi->vnr = 0;
1061         } else if (header_size == sizeof(struct p_header80) &&
1062                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1063                 struct p_header80 *h = header;
1064                 pi->cmd = be16_to_cpu(h->command);
1065                 pi->size = be16_to_cpu(h->length);
1066                 pi->vnr = 0;
1067         } else {
1068                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1069                          be32_to_cpu(*(__be32 *)header),
1070                          tconn->agreed_pro_version);
1071                 return -EINVAL;
1072         }
1073         pi->data = header + header_size;
1074         return 0;
1075 }
1076
1077 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1078 {
1079         void *buffer = tconn->data.rbuf;
1080         int err;
1081
1082         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1083         if (err)
1084                 return err;
1085
1086         err = decode_header(tconn, buffer, pi);
1087         tconn->last_received = jiffies;
1088
1089         return err;
1090 }
1091
1092 static void drbd_flush(struct drbd_tconn *tconn)
1093 {
1094         int rv;
1095         struct drbd_conf *mdev;
1096         int vnr;
1097
1098         if (tconn->write_ordering >= WO_bdev_flush) {
1099                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1100                         if (get_ldev(mdev)) {
1101                                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1102                                                         NULL);
1103                                 put_ldev(mdev);
1104
1105                                 if (rv) {
1106                                         dev_info(DEV, "local disk flush failed with status %d\n", rv);
1107                                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1108                                          * don't try again for ANY return value != 0
1109                                          * if (rv == -EOPNOTSUPP) */
1110                                         drbd_bump_write_ordering(tconn, WO_drain_io);
1111                                         break;
1112                                 }
1113                         }
1114                 }
1115         }
1116 }
1117
1118 /**
1119  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1120  * @mdev:       DRBD device.
1121  * @epoch:      Epoch object.
1122  * @ev:         Epoch event.
1123  */
1124 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1125                                                struct drbd_epoch *epoch,
1126                                                enum epoch_event ev)
1127 {
1128         int epoch_size;
1129         struct drbd_epoch *next_epoch;
1130         enum finish_epoch rv = FE_STILL_LIVE;
1131         struct drbd_tconn *tconn = mdev->tconn;
1132
1133         spin_lock(&tconn->epoch_lock);
1134         do {
1135                 next_epoch = NULL;
1136
1137                 epoch_size = atomic_read(&epoch->epoch_size);
1138
1139                 switch (ev & ~EV_CLEANUP) {
1140                 case EV_PUT:
1141                         atomic_dec(&epoch->active);
1142                         break;
1143                 case EV_GOT_BARRIER_NR:
1144                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1145                         break;
1146                 case EV_BECAME_LAST:
1147                         /* nothing to do*/
1148                         break;
1149                 }
1150
1151                 if (epoch_size != 0 &&
1152                     atomic_read(&epoch->active) == 0 &&
1153                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1154                         if (!(ev & EV_CLEANUP)) {
1155                                 spin_unlock(&tconn->epoch_lock);
1156                                 drbd_send_b_ack(epoch->mdev, epoch->barrier_nr, epoch_size);
1157                                 spin_lock(&tconn->epoch_lock);
1158                         }
1159                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1160                                 dec_unacked(epoch->mdev);
1161
1162                         if (tconn->current_epoch != epoch) {
1163                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1164                                 list_del(&epoch->list);
1165                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1166                                 tconn->epochs--;
1167                                 kfree(epoch);
1168
1169                                 if (rv == FE_STILL_LIVE)
1170                                         rv = FE_DESTROYED;
1171                         } else {
1172                                 epoch->flags = 0;
1173                                 atomic_set(&epoch->epoch_size, 0);
1174                                 /* atomic_set(&epoch->active, 0); is already zero */
1175                                 if (rv == FE_STILL_LIVE)
1176                                         rv = FE_RECYCLED;
1177                                 wake_up(&mdev->ee_wait);
1178                         }
1179                 }
1180
1181                 if (!next_epoch)
1182                         break;
1183
1184                 epoch = next_epoch;
1185         } while (1);
1186
1187         spin_unlock(&tconn->epoch_lock);
1188
1189         return rv;
1190 }
1191
1192 /**
1193  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1194  * @tconn:      DRBD connection.
1195  * @wo:         Write ordering method to try.
1196  */
1197 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1198 {
1199         struct disk_conf *dc;
1200         struct drbd_conf *mdev;
1201         enum write_ordering_e pwo;
1202         int vnr;
1203         static char *write_ordering_str[] = {
1204                 [WO_none] = "none",
1205                 [WO_drain_io] = "drain",
1206                 [WO_bdev_flush] = "flush",
1207         };
1208
1209         pwo = tconn->write_ordering;
1210         wo = min(pwo, wo);
1211         rcu_read_lock();
1212         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1213                 if (!get_ldev(mdev))
1214                         continue;
1215                 dc = rcu_dereference(mdev->ldev->disk_conf);
1216
1217                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1218                         wo = WO_drain_io;
1219                 if (wo == WO_drain_io && !dc->disk_drain)
1220                         wo = WO_none;
1221                 put_ldev(mdev);
1222         }
1223         rcu_read_unlock();
1224         tconn->write_ordering = wo;
1225         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1226                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1227 }
1228
1229 /**
1230  * drbd_submit_peer_request()
1231  * @mdev:       DRBD device.
1232  * @peer_req:   peer request
1233  * @rw:         flag field, see bio->bi_rw
1234  *
1235  * May spread the pages to multiple bios,
1236  * depending on bio_add_page restrictions.
1237  *
1238  * Returns 0 if all bios have been submitted,
1239  * -ENOMEM if we could not allocate enough bios,
1240  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1241  *  single page to an empty bio (which should never happen and likely indicates
1242  *  that the lower level IO stack is in some way broken). This has been observed
1243  *  on certain Xen deployments.
1244  */
1245 /* TODO allocate from our own bio_set. */
1246 int drbd_submit_peer_request(struct drbd_conf *mdev,
1247                              struct drbd_peer_request *peer_req,
1248                              const unsigned rw, const int fault_type)
1249 {
1250         struct bio *bios = NULL;
1251         struct bio *bio;
1252         struct page *page = peer_req->pages;
1253         sector_t sector = peer_req->i.sector;
1254         unsigned ds = peer_req->i.size;
1255         unsigned n_bios = 0;
1256         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1257         int err = -ENOMEM;
1258
1259         /* In most cases, we will only need one bio.  But in case the lower
1260          * level restrictions happen to be different at this offset on this
1261          * side than those of the sending peer, we may need to submit the
1262          * request in more than one bio.
1263          *
1264          * Plain bio_alloc is good enough here, this is no DRBD internally
1265          * generated bio, but a bio allocated on behalf of the peer.
1266          */
1267 next_bio:
1268         bio = bio_alloc(GFP_NOIO, nr_pages);
1269         if (!bio) {
1270                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1271                 goto fail;
1272         }
1273         /* > peer_req->i.sector, unless this is the first bio */
1274         bio->bi_sector = sector;
1275         bio->bi_bdev = mdev->ldev->backing_bdev;
1276         bio->bi_rw = rw;
1277         bio->bi_private = peer_req;
1278         bio->bi_end_io = drbd_peer_request_endio;
1279
1280         bio->bi_next = bios;
1281         bios = bio;
1282         ++n_bios;
1283
1284         page_chain_for_each(page) {
1285                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1286                 if (!bio_add_page(bio, page, len, 0)) {
1287                         /* A single page must always be possible!
1288                          * But in case it fails anyways,
1289                          * we deal with it, and complain (below). */
1290                         if (bio->bi_vcnt == 0) {
1291                                 dev_err(DEV,
1292                                         "bio_add_page failed for len=%u, "
1293                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1294                                         len, (unsigned long long)bio->bi_sector);
1295                                 err = -ENOSPC;
1296                                 goto fail;
1297                         }
1298                         goto next_bio;
1299                 }
1300                 ds -= len;
1301                 sector += len >> 9;
1302                 --nr_pages;
1303         }
1304         D_ASSERT(page == NULL);
1305         D_ASSERT(ds == 0);
1306
1307         atomic_set(&peer_req->pending_bios, n_bios);
1308         do {
1309                 bio = bios;
1310                 bios = bios->bi_next;
1311                 bio->bi_next = NULL;
1312
1313                 drbd_generic_make_request(mdev, fault_type, bio);
1314         } while (bios);
1315         return 0;
1316
1317 fail:
1318         while (bios) {
1319                 bio = bios;
1320                 bios = bios->bi_next;
1321                 bio_put(bio);
1322         }
1323         return err;
1324 }
1325
1326 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1327                                              struct drbd_peer_request *peer_req)
1328 {
1329         struct drbd_interval *i = &peer_req->i;
1330
1331         drbd_remove_interval(&mdev->write_requests, i);
1332         drbd_clear_interval(i);
1333
1334         /* Wake up any processes waiting for this peer request to complete.  */
1335         if (i->waiting)
1336                 wake_up(&mdev->misc_wait);
1337 }
1338
1339 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1340 {
1341         struct drbd_conf *mdev;
1342         int rv;
1343         struct p_barrier *p = pi->data;
1344         struct drbd_epoch *epoch;
1345
1346         mdev = vnr_to_mdev(tconn, pi->vnr);
1347         if (!mdev)
1348                 return -EIO;
1349
1350         inc_unacked(mdev);
1351
1352         tconn->current_epoch->barrier_nr = p->barrier;
1353         tconn->current_epoch->mdev = mdev;
1354         rv = drbd_may_finish_epoch(mdev, tconn->current_epoch, EV_GOT_BARRIER_NR);
1355
1356         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1357          * the activity log, which means it would not be resynced in case the
1358          * R_PRIMARY crashes now.
1359          * Therefore we must send the barrier_ack after the barrier request was
1360          * completed. */
1361         switch (tconn->write_ordering) {
1362         case WO_none:
1363                 if (rv == FE_RECYCLED)
1364                         return 0;
1365
1366                 /* receiver context, in the writeout path of the other node.
1367                  * avoid potential distributed deadlock */
1368                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1369                 if (epoch)
1370                         break;
1371                 else
1372                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1373                         /* Fall through */
1374
1375         case WO_bdev_flush:
1376         case WO_drain_io:
1377                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1378                 drbd_flush(tconn);
1379
1380                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1381                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1382                         if (epoch)
1383                                 break;
1384                 }
1385
1386                 epoch = tconn->current_epoch;
1387                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1388
1389                 D_ASSERT(atomic_read(&epoch->active) == 0);
1390                 D_ASSERT(epoch->flags == 0);
1391
1392                 return 0;
1393         default:
1394                 dev_err(DEV, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1395                 return -EIO;
1396         }
1397
1398         epoch->flags = 0;
1399         atomic_set(&epoch->epoch_size, 0);
1400         atomic_set(&epoch->active, 0);
1401
1402         spin_lock(&tconn->epoch_lock);
1403         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1404                 list_add(&epoch->list, &tconn->current_epoch->list);
1405                 tconn->current_epoch = epoch;
1406                 tconn->epochs++;
1407         } else {
1408                 /* The current_epoch got recycled while we allocated this one... */
1409                 kfree(epoch);
1410         }
1411         spin_unlock(&tconn->epoch_lock);
1412
1413         return 0;
1414 }
1415
1416 /* used from receive_RSDataReply (recv_resync_read)
1417  * and from receive_Data */
1418 static struct drbd_peer_request *
1419 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1420               int data_size) __must_hold(local)
1421 {
1422         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1423         struct drbd_peer_request *peer_req;
1424         struct page *page;
1425         int dgs, ds, err;
1426         void *dig_in = mdev->tconn->int_dig_in;
1427         void *dig_vv = mdev->tconn->int_dig_vv;
1428         unsigned long *data;
1429
1430         dgs = 0;
1431         if (mdev->tconn->peer_integrity_tfm) {
1432                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1433                 /*
1434                  * FIXME: Receive the incoming digest into the receive buffer
1435                  *        here, together with its struct p_data?
1436                  */
1437                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1438                 if (err)
1439                         return NULL;
1440                 data_size -= dgs;
1441         }
1442
1443         if (!expect(data_size != 0))
1444                 return NULL;
1445         if (!expect(IS_ALIGNED(data_size, 512)))
1446                 return NULL;
1447         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1448                 return NULL;
1449
1450         /* even though we trust out peer,
1451          * we sometimes have to double check. */
1452         if (sector + (data_size>>9) > capacity) {
1453                 dev_err(DEV, "request from peer beyond end of local disk: "
1454                         "capacity: %llus < sector: %llus + size: %u\n",
1455                         (unsigned long long)capacity,
1456                         (unsigned long long)sector, data_size);
1457                 return NULL;
1458         }
1459
1460         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1461          * "criss-cross" setup, that might cause write-out on some other DRBD,
1462          * which in turn might block on the other node at this very place.  */
1463         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1464         if (!peer_req)
1465                 return NULL;
1466
1467         ds = data_size;
1468         page = peer_req->pages;
1469         page_chain_for_each(page) {
1470                 unsigned len = min_t(int, ds, PAGE_SIZE);
1471                 data = kmap(page);
1472                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1473                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1474                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1475                         data[0] = data[0] ^ (unsigned long)-1;
1476                 }
1477                 kunmap(page);
1478                 if (err) {
1479                         drbd_free_peer_req(mdev, peer_req);
1480                         return NULL;
1481                 }
1482                 ds -= len;
1483         }
1484
1485         if (dgs) {
1486                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1487                 if (memcmp(dig_in, dig_vv, dgs)) {
1488                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1489                                 (unsigned long long)sector, data_size);
1490                         drbd_free_peer_req(mdev, peer_req);
1491                         return NULL;
1492                 }
1493         }
1494         mdev->recv_cnt += data_size>>9;
1495         return peer_req;
1496 }
1497
1498 /* drbd_drain_block() just takes a data block
1499  * out of the socket input buffer, and discards it.
1500  */
1501 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1502 {
1503         struct page *page;
1504         int err = 0;
1505         void *data;
1506
1507         if (!data_size)
1508                 return 0;
1509
1510         page = drbd_alloc_pages(mdev, 1, 1);
1511
1512         data = kmap(page);
1513         while (data_size) {
1514                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1515
1516                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1517                 if (err)
1518                         break;
1519                 data_size -= len;
1520         }
1521         kunmap(page);
1522         drbd_free_pages(mdev, page, 0);
1523         return err;
1524 }
1525
1526 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1527                            sector_t sector, int data_size)
1528 {
1529         struct bio_vec *bvec;
1530         struct bio *bio;
1531         int dgs, err, i, expect;
1532         void *dig_in = mdev->tconn->int_dig_in;
1533         void *dig_vv = mdev->tconn->int_dig_vv;
1534
1535         dgs = 0;
1536         if (mdev->tconn->peer_integrity_tfm) {
1537                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1538                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1539                 if (err)
1540                         return err;
1541                 data_size -= dgs;
1542         }
1543
1544         /* optimistically update recv_cnt.  if receiving fails below,
1545          * we disconnect anyways, and counters will be reset. */
1546         mdev->recv_cnt += data_size>>9;
1547
1548         bio = req->master_bio;
1549         D_ASSERT(sector == bio->bi_sector);
1550
1551         bio_for_each_segment(bvec, bio, i) {
1552                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1553                 expect = min_t(int, data_size, bvec->bv_len);
1554                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1555                 kunmap(bvec->bv_page);
1556                 if (err)
1557                         return err;
1558                 data_size -= expect;
1559         }
1560
1561         if (dgs) {
1562                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1563                 if (memcmp(dig_in, dig_vv, dgs)) {
1564                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1565                         return -EINVAL;
1566                 }
1567         }
1568
1569         D_ASSERT(data_size == 0);
1570         return 0;
1571 }
1572
1573 /*
1574  * e_end_resync_block() is called in asender context via
1575  * drbd_finish_peer_reqs().
1576  */
1577 static int e_end_resync_block(struct drbd_work *w, int unused)
1578 {
1579         struct drbd_peer_request *peer_req =
1580                 container_of(w, struct drbd_peer_request, w);
1581         struct drbd_conf *mdev = w->mdev;
1582         sector_t sector = peer_req->i.sector;
1583         int err;
1584
1585         D_ASSERT(drbd_interval_empty(&peer_req->i));
1586
1587         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1588                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1589                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1590         } else {
1591                 /* Record failure to sync */
1592                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1593
1594                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1595         }
1596         dec_unacked(mdev);
1597
1598         return err;
1599 }
1600
1601 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1602 {
1603         struct drbd_peer_request *peer_req;
1604
1605         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1606         if (!peer_req)
1607                 goto fail;
1608
1609         dec_rs_pending(mdev);
1610
1611         inc_unacked(mdev);
1612         /* corresponding dec_unacked() in e_end_resync_block()
1613          * respective _drbd_clear_done_ee */
1614
1615         peer_req->w.cb = e_end_resync_block;
1616
1617         spin_lock_irq(&mdev->tconn->req_lock);
1618         list_add(&peer_req->w.list, &mdev->sync_ee);
1619         spin_unlock_irq(&mdev->tconn->req_lock);
1620
1621         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1622         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1623                 return 0;
1624
1625         /* don't care for the reason here */
1626         dev_err(DEV, "submit failed, triggering re-connect\n");
1627         spin_lock_irq(&mdev->tconn->req_lock);
1628         list_del(&peer_req->w.list);
1629         spin_unlock_irq(&mdev->tconn->req_lock);
1630
1631         drbd_free_peer_req(mdev, peer_req);
1632 fail:
1633         put_ldev(mdev);
1634         return -EIO;
1635 }
1636
1637 static struct drbd_request *
1638 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1639              sector_t sector, bool missing_ok, const char *func)
1640 {
1641         struct drbd_request *req;
1642
1643         /* Request object according to our peer */
1644         req = (struct drbd_request *)(unsigned long)id;
1645         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1646                 return req;
1647         if (!missing_ok) {
1648                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1649                         (unsigned long)id, (unsigned long long)sector);
1650         }
1651         return NULL;
1652 }
1653
1654 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1655 {
1656         struct drbd_conf *mdev;
1657         struct drbd_request *req;
1658         sector_t sector;
1659         int err;
1660         struct p_data *p = pi->data;
1661
1662         mdev = vnr_to_mdev(tconn, pi->vnr);
1663         if (!mdev)
1664                 return -EIO;
1665
1666         sector = be64_to_cpu(p->sector);
1667
1668         spin_lock_irq(&mdev->tconn->req_lock);
1669         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1670         spin_unlock_irq(&mdev->tconn->req_lock);
1671         if (unlikely(!req))
1672                 return -EIO;
1673
1674         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1675          * special casing it there for the various failure cases.
1676          * still no race with drbd_fail_pending_reads */
1677         err = recv_dless_read(mdev, req, sector, pi->size);
1678         if (!err)
1679                 req_mod(req, DATA_RECEIVED);
1680         /* else: nothing. handled from drbd_disconnect...
1681          * I don't think we may complete this just yet
1682          * in case we are "on-disconnect: freeze" */
1683
1684         return err;
1685 }
1686
1687 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1688 {
1689         struct drbd_conf *mdev;
1690         sector_t sector;
1691         int err;
1692         struct p_data *p = pi->data;
1693
1694         mdev = vnr_to_mdev(tconn, pi->vnr);
1695         if (!mdev)
1696                 return -EIO;
1697
1698         sector = be64_to_cpu(p->sector);
1699         D_ASSERT(p->block_id == ID_SYNCER);
1700
1701         if (get_ldev(mdev)) {
1702                 /* data is submitted to disk within recv_resync_read.
1703                  * corresponding put_ldev done below on error,
1704                  * or in drbd_peer_request_endio. */
1705                 err = recv_resync_read(mdev, sector, pi->size);
1706         } else {
1707                 if (__ratelimit(&drbd_ratelimit_state))
1708                         dev_err(DEV, "Can not write resync data to local disk.\n");
1709
1710                 err = drbd_drain_block(mdev, pi->size);
1711
1712                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1713         }
1714
1715         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1716
1717         return err;
1718 }
1719
1720 static int w_restart_write(struct drbd_work *w, int cancel)
1721 {
1722         struct drbd_request *req = container_of(w, struct drbd_request, w);
1723         struct drbd_conf *mdev = w->mdev;
1724         struct bio *bio;
1725         unsigned long start_time;
1726         unsigned long flags;
1727
1728         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1729         if (!expect(req->rq_state & RQ_POSTPONED)) {
1730                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1731                 return -EIO;
1732         }
1733         bio = req->master_bio;
1734         start_time = req->start_time;
1735         /* Postponed requests will not have their master_bio completed!  */
1736         __req_mod(req, DISCARD_WRITE, NULL);
1737         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1738
1739         while (__drbd_make_request(mdev, bio, start_time))
1740                 /* retry */ ;
1741         return 0;
1742 }
1743
1744 static void restart_conflicting_writes(struct drbd_conf *mdev,
1745                                        sector_t sector, int size)
1746 {
1747         struct drbd_interval *i;
1748         struct drbd_request *req;
1749
1750         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1751                 if (!i->local)
1752                         continue;
1753                 req = container_of(i, struct drbd_request, i);
1754                 if (req->rq_state & RQ_LOCAL_PENDING ||
1755                     !(req->rq_state & RQ_POSTPONED))
1756                         continue;
1757                 if (expect(list_empty(&req->w.list))) {
1758                         req->w.mdev = mdev;
1759                         req->w.cb = w_restart_write;
1760                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1761                 }
1762         }
1763 }
1764
1765 /*
1766  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1767  */
1768 static int e_end_block(struct drbd_work *w, int cancel)
1769 {
1770         struct drbd_peer_request *peer_req =
1771                 container_of(w, struct drbd_peer_request, w);
1772         struct drbd_conf *mdev = w->mdev;
1773         sector_t sector = peer_req->i.sector;
1774         int err = 0, pcmd;
1775
1776         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1777                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1778                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1779                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1780                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1781                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1782                         err = drbd_send_ack(mdev, pcmd, peer_req);
1783                         if (pcmd == P_RS_WRITE_ACK)
1784                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1785                 } else {
1786                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1787                         /* we expect it to be marked out of sync anyways...
1788                          * maybe assert this?  */
1789                 }
1790                 dec_unacked(mdev);
1791         }
1792         /* we delete from the conflict detection hash _after_ we sent out the
1793          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1794         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1795                 spin_lock_irq(&mdev->tconn->req_lock);
1796                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1797                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1798                 if (peer_req->flags & EE_RESTART_REQUESTS)
1799                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1800                 spin_unlock_irq(&mdev->tconn->req_lock);
1801         } else
1802                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1803
1804         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1805
1806         return err;
1807 }
1808
1809 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1810 {
1811         struct drbd_conf *mdev = w->mdev;
1812         struct drbd_peer_request *peer_req =
1813                 container_of(w, struct drbd_peer_request, w);
1814         int err;
1815
1816         err = drbd_send_ack(mdev, ack, peer_req);
1817         dec_unacked(mdev);
1818
1819         return err;
1820 }
1821
1822 static int e_send_discard_write(struct drbd_work *w, int unused)
1823 {
1824         return e_send_ack(w, P_DISCARD_WRITE);
1825 }
1826
1827 static int e_send_retry_write(struct drbd_work *w, int unused)
1828 {
1829         struct drbd_tconn *tconn = w->mdev->tconn;
1830
1831         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1832                              P_RETRY_WRITE : P_DISCARD_WRITE);
1833 }
1834
1835 static bool seq_greater(u32 a, u32 b)
1836 {
1837         /*
1838          * We assume 32-bit wrap-around here.
1839          * For 24-bit wrap-around, we would have to shift:
1840          *  a <<= 8; b <<= 8;
1841          */
1842         return (s32)a - (s32)b > 0;
1843 }
1844
1845 static u32 seq_max(u32 a, u32 b)
1846 {
1847         return seq_greater(a, b) ? a : b;
1848 }
1849
1850 static bool need_peer_seq(struct drbd_conf *mdev)
1851 {
1852         struct drbd_tconn *tconn = mdev->tconn;
1853         int tp;
1854
1855         /*
1856          * We only need to keep track of the last packet_seq number of our peer
1857          * if we are in dual-primary mode and we have the discard flag set; see
1858          * handle_write_conflicts().
1859          */
1860
1861         rcu_read_lock();
1862         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1863         rcu_read_unlock();
1864
1865         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1866 }
1867
1868 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1869 {
1870         unsigned int newest_peer_seq;
1871
1872         if (need_peer_seq(mdev)) {
1873                 spin_lock(&mdev->peer_seq_lock);
1874                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1875                 mdev->peer_seq = newest_peer_seq;
1876                 spin_unlock(&mdev->peer_seq_lock);
1877                 /* wake up only if we actually changed mdev->peer_seq */
1878                 if (peer_seq == newest_peer_seq)
1879                         wake_up(&mdev->seq_wait);
1880         }
1881 }
1882
1883 /* Called from receive_Data.
1884  * Synchronize packets on sock with packets on msock.
1885  *
1886  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1887  * packet traveling on msock, they are still processed in the order they have
1888  * been sent.
1889  *
1890  * Note: we don't care for Ack packets overtaking P_DATA packets.
1891  *
1892  * In case packet_seq is larger than mdev->peer_seq number, there are
1893  * outstanding packets on the msock. We wait for them to arrive.
1894  * In case we are the logically next packet, we update mdev->peer_seq
1895  * ourselves. Correctly handles 32bit wrap around.
1896  *
1897  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1898  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1899  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1900  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1901  *
1902  * returns 0 if we may process the packet,
1903  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1904 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1905 {
1906         DEFINE_WAIT(wait);
1907         long timeout;
1908         int ret;
1909
1910         if (!need_peer_seq(mdev))
1911                 return 0;
1912
1913         spin_lock(&mdev->peer_seq_lock);
1914         for (;;) {
1915                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1916                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1917                         ret = 0;
1918                         break;
1919                 }
1920                 if (signal_pending(current)) {
1921                         ret = -ERESTARTSYS;
1922                         break;
1923                 }
1924                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1925                 spin_unlock(&mdev->peer_seq_lock);
1926                 rcu_read_lock();
1927                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1928                 rcu_read_unlock();
1929                 timeout = schedule_timeout(timeout);
1930                 spin_lock(&mdev->peer_seq_lock);
1931                 if (!timeout) {
1932                         ret = -ETIMEDOUT;
1933                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1934                         break;
1935                 }
1936         }
1937         spin_unlock(&mdev->peer_seq_lock);
1938         finish_wait(&mdev->seq_wait, &wait);
1939         return ret;
1940 }
1941
1942 /* see also bio_flags_to_wire()
1943  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1944  * flags and back. We may replicate to other kernel versions. */
1945 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1946 {
1947         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1948                 (dpf & DP_FUA ? REQ_FUA : 0) |
1949                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1950                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1951 }
1952
1953 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1954                                     unsigned int size)
1955 {
1956         struct drbd_interval *i;
1957
1958     repeat:
1959         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1960                 struct drbd_request *req;
1961                 struct bio_and_error m;
1962
1963                 if (!i->local)
1964                         continue;
1965                 req = container_of(i, struct drbd_request, i);
1966                 if (!(req->rq_state & RQ_POSTPONED))
1967                         continue;
1968                 req->rq_state &= ~RQ_POSTPONED;
1969                 __req_mod(req, NEG_ACKED, &m);
1970                 spin_unlock_irq(&mdev->tconn->req_lock);
1971                 if (m.bio)
1972                         complete_master_bio(mdev, &m);
1973                 spin_lock_irq(&mdev->tconn->req_lock);
1974                 goto repeat;
1975         }
1976 }
1977
1978 static int handle_write_conflicts(struct drbd_conf *mdev,
1979                                   struct drbd_peer_request *peer_req)
1980 {
1981         struct drbd_tconn *tconn = mdev->tconn;
1982         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1983         sector_t sector = peer_req->i.sector;
1984         const unsigned int size = peer_req->i.size;
1985         struct drbd_interval *i;
1986         bool equal;
1987         int err;
1988
1989         /*
1990          * Inserting the peer request into the write_requests tree will prevent
1991          * new conflicting local requests from being added.
1992          */
1993         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1994
1995     repeat:
1996         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1997                 if (i == &peer_req->i)
1998                         continue;
1999
2000                 if (!i->local) {
2001                         /*
2002                          * Our peer has sent a conflicting remote request; this
2003                          * should not happen in a two-node setup.  Wait for the
2004                          * earlier peer request to complete.
2005                          */
2006                         err = drbd_wait_misc(mdev, i);
2007                         if (err)
2008                                 goto out;
2009                         goto repeat;
2010                 }
2011
2012                 equal = i->sector == sector && i->size == size;
2013                 if (resolve_conflicts) {
2014                         /*
2015                          * If the peer request is fully contained within the
2016                          * overlapping request, it can be discarded; otherwise,
2017                          * it will be retried once all overlapping requests
2018                          * have completed.
2019                          */
2020                         bool discard = i->sector <= sector && i->sector +
2021                                        (i->size >> 9) >= sector + (size >> 9);
2022
2023                         if (!equal)
2024                                 dev_alert(DEV, "Concurrent writes detected: "
2025                                                "local=%llus +%u, remote=%llus +%u, "
2026                                                "assuming %s came first\n",
2027                                           (unsigned long long)i->sector, i->size,
2028                                           (unsigned long long)sector, size,
2029                                           discard ? "local" : "remote");
2030
2031                         inc_unacked(mdev);
2032                         peer_req->w.cb = discard ? e_send_discard_write :
2033                                                    e_send_retry_write;
2034                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2035                         wake_asender(mdev->tconn);
2036
2037                         err = -ENOENT;
2038                         goto out;
2039                 } else {
2040                         struct drbd_request *req =
2041                                 container_of(i, struct drbd_request, i);
2042
2043                         if (!equal)
2044                                 dev_alert(DEV, "Concurrent writes detected: "
2045                                                "local=%llus +%u, remote=%llus +%u\n",
2046                                           (unsigned long long)i->sector, i->size,
2047                                           (unsigned long long)sector, size);
2048
2049                         if (req->rq_state & RQ_LOCAL_PENDING ||
2050                             !(req->rq_state & RQ_POSTPONED)) {
2051                                 /*
2052                                  * Wait for the node with the discard flag to
2053                                  * decide if this request will be discarded or
2054                                  * retried.  Requests that are discarded will
2055                                  * disappear from the write_requests tree.
2056                                  *
2057                                  * In addition, wait for the conflicting
2058                                  * request to finish locally before submitting
2059                                  * the conflicting peer request.
2060                                  */
2061                                 err = drbd_wait_misc(mdev, &req->i);
2062                                 if (err) {
2063                                         _conn_request_state(mdev->tconn,
2064                                                             NS(conn, C_TIMEOUT),
2065                                                             CS_HARD);
2066                                         fail_postponed_requests(mdev, sector, size);
2067                                         goto out;
2068                                 }
2069                                 goto repeat;
2070                         }
2071                         /*
2072                          * Remember to restart the conflicting requests after
2073                          * the new peer request has completed.
2074                          */
2075                         peer_req->flags |= EE_RESTART_REQUESTS;
2076                 }
2077         }
2078         err = 0;
2079
2080     out:
2081         if (err)
2082                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2083         return err;
2084 }
2085
2086 /* mirrored write */
2087 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2088 {
2089         struct drbd_conf *mdev;
2090         sector_t sector;
2091         struct drbd_peer_request *peer_req;
2092         struct p_data *p = pi->data;
2093         u32 peer_seq = be32_to_cpu(p->seq_num);
2094         int rw = WRITE;
2095         u32 dp_flags;
2096         int err, tp;
2097
2098         mdev = vnr_to_mdev(tconn, pi->vnr);
2099         if (!mdev)
2100                 return -EIO;
2101
2102         if (!get_ldev(mdev)) {
2103                 int err2;
2104
2105                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2106                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2107                 atomic_inc(&tconn->current_epoch->epoch_size);
2108                 err2 = drbd_drain_block(mdev, pi->size);
2109                 if (!err)
2110                         err = err2;
2111                 return err;
2112         }
2113
2114         /*
2115          * Corresponding put_ldev done either below (on various errors), or in
2116          * drbd_peer_request_endio, if we successfully submit the data at the
2117          * end of this function.
2118          */
2119
2120         sector = be64_to_cpu(p->sector);
2121         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2122         if (!peer_req) {
2123                 put_ldev(mdev);
2124                 return -EIO;
2125         }
2126
2127         peer_req->w.cb = e_end_block;
2128
2129         dp_flags = be32_to_cpu(p->dp_flags);
2130         rw |= wire_flags_to_bio(mdev, dp_flags);
2131
2132         if (dp_flags & DP_MAY_SET_IN_SYNC)
2133                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2134
2135         spin_lock(&tconn->epoch_lock);
2136         peer_req->epoch = tconn->current_epoch;
2137         atomic_inc(&peer_req->epoch->epoch_size);
2138         atomic_inc(&peer_req->epoch->active);
2139         spin_unlock(&tconn->epoch_lock);
2140
2141         rcu_read_lock();
2142         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2143         rcu_read_unlock();
2144         if (tp) {
2145                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2146                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2147                 if (err)
2148                         goto out_interrupted;
2149                 spin_lock_irq(&mdev->tconn->req_lock);
2150                 err = handle_write_conflicts(mdev, peer_req);
2151                 if (err) {
2152                         spin_unlock_irq(&mdev->tconn->req_lock);
2153                         if (err == -ENOENT) {
2154                                 put_ldev(mdev);
2155                                 return 0;
2156                         }
2157                         goto out_interrupted;
2158                 }
2159         } else
2160                 spin_lock_irq(&mdev->tconn->req_lock);
2161         list_add(&peer_req->w.list, &mdev->active_ee);
2162         spin_unlock_irq(&mdev->tconn->req_lock);
2163
2164         if (mdev->tconn->agreed_pro_version < 100) {
2165                 rcu_read_lock();
2166                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2167                 case DRBD_PROT_C:
2168                         dp_flags |= DP_SEND_WRITE_ACK;
2169                         break;
2170                 case DRBD_PROT_B:
2171                         dp_flags |= DP_SEND_RECEIVE_ACK;
2172                         break;
2173                 }
2174                 rcu_read_unlock();
2175         }
2176
2177         if (dp_flags & DP_SEND_WRITE_ACK) {
2178                 peer_req->flags |= EE_SEND_WRITE_ACK;
2179                 inc_unacked(mdev);
2180                 /* corresponding dec_unacked() in e_end_block()
2181                  * respective _drbd_clear_done_ee */
2182         }
2183
2184         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2185                 /* I really don't like it that the receiver thread
2186                  * sends on the msock, but anyways */
2187                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2188         }
2189
2190         if (mdev->state.pdsk < D_INCONSISTENT) {
2191                 /* In case we have the only disk of the cluster, */
2192                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2193                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2194                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2195                 drbd_al_begin_io(mdev, &peer_req->i);
2196         }
2197
2198         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2199         if (!err)
2200                 return 0;
2201
2202         /* don't care for the reason here */
2203         dev_err(DEV, "submit failed, triggering re-connect\n");
2204         spin_lock_irq(&mdev->tconn->req_lock);
2205         list_del(&peer_req->w.list);
2206         drbd_remove_epoch_entry_interval(mdev, peer_req);
2207         spin_unlock_irq(&mdev->tconn->req_lock);
2208         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2209                 drbd_al_complete_io(mdev, &peer_req->i);
2210
2211 out_interrupted:
2212         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2213         put_ldev(mdev);
2214         drbd_free_peer_req(mdev, peer_req);
2215         return err;
2216 }
2217
2218 /* We may throttle resync, if the lower device seems to be busy,
2219  * and current sync rate is above c_min_rate.
2220  *
2221  * To decide whether or not the lower device is busy, we use a scheme similar
2222  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2223  * (more than 64 sectors) of activity we cannot account for with our own resync
2224  * activity, it obviously is "busy".
2225  *
2226  * The current sync rate used here uses only the most recent two step marks,
2227  * to have a short time average so we can react faster.
2228  */
2229 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2230 {
2231         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2232         unsigned long db, dt, dbdt;
2233         struct lc_element *tmp;
2234         int curr_events;
2235         int throttle = 0;
2236         unsigned int c_min_rate;
2237
2238         rcu_read_lock();
2239         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2240         rcu_read_unlock();
2241
2242         /* feature disabled? */
2243         if (c_min_rate == 0)
2244                 return 0;
2245
2246         spin_lock_irq(&mdev->al_lock);
2247         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2248         if (tmp) {
2249                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2250                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2251                         spin_unlock_irq(&mdev->al_lock);
2252                         return 0;
2253                 }
2254                 /* Do not slow down if app IO is already waiting for this extent */
2255         }
2256         spin_unlock_irq(&mdev->al_lock);
2257
2258         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2259                       (int)part_stat_read(&disk->part0, sectors[1]) -
2260                         atomic_read(&mdev->rs_sect_ev);
2261
2262         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2263                 unsigned long rs_left;
2264                 int i;
2265
2266                 mdev->rs_last_events = curr_events;
2267
2268                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2269                  * approx. */
2270                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2271
2272                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2273                         rs_left = mdev->ov_left;
2274                 else
2275                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2276
2277                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2278                 if (!dt)
2279                         dt++;
2280                 db = mdev->rs_mark_left[i] - rs_left;
2281                 dbdt = Bit2KB(db/dt);
2282
2283                 if (dbdt > c_min_rate)
2284                         throttle = 1;
2285         }
2286         return throttle;
2287 }
2288
2289
2290 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2291 {
2292         struct drbd_conf *mdev;
2293         sector_t sector;
2294         sector_t capacity;
2295         struct drbd_peer_request *peer_req;
2296         struct digest_info *di = NULL;
2297         int size, verb;
2298         unsigned int fault_type;
2299         struct p_block_req *p = pi->data;
2300
2301         mdev = vnr_to_mdev(tconn, pi->vnr);
2302         if (!mdev)
2303                 return -EIO;
2304         capacity = drbd_get_capacity(mdev->this_bdev);
2305
2306         sector = be64_to_cpu(p->sector);
2307         size   = be32_to_cpu(p->blksize);
2308
2309         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2310                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2311                                 (unsigned long long)sector, size);
2312                 return -EINVAL;
2313         }
2314         if (sector + (size>>9) > capacity) {
2315                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2316                                 (unsigned long long)sector, size);
2317                 return -EINVAL;
2318         }
2319
2320         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2321                 verb = 1;
2322                 switch (pi->cmd) {
2323                 case P_DATA_REQUEST:
2324                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2325                         break;
2326                 case P_RS_DATA_REQUEST:
2327                 case P_CSUM_RS_REQUEST:
2328                 case P_OV_REQUEST:
2329                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2330                         break;
2331                 case P_OV_REPLY:
2332                         verb = 0;
2333                         dec_rs_pending(mdev);
2334                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2335                         break;
2336                 default:
2337                         BUG();
2338                 }
2339                 if (verb && __ratelimit(&drbd_ratelimit_state))
2340                         dev_err(DEV, "Can not satisfy peer's read request, "
2341                             "no local data.\n");
2342
2343                 /* drain possibly payload */
2344                 return drbd_drain_block(mdev, pi->size);
2345         }
2346
2347         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2348          * "criss-cross" setup, that might cause write-out on some other DRBD,
2349          * which in turn might block on the other node at this very place.  */
2350         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2351         if (!peer_req) {
2352                 put_ldev(mdev);
2353                 return -ENOMEM;
2354         }
2355
2356         switch (pi->cmd) {
2357         case P_DATA_REQUEST:
2358                 peer_req->w.cb = w_e_end_data_req;
2359                 fault_type = DRBD_FAULT_DT_RD;
2360                 /* application IO, don't drbd_rs_begin_io */
2361                 goto submit;
2362
2363         case P_RS_DATA_REQUEST:
2364                 peer_req->w.cb = w_e_end_rsdata_req;
2365                 fault_type = DRBD_FAULT_RS_RD;
2366                 /* used in the sector offset progress display */
2367                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2368                 break;
2369
2370         case P_OV_REPLY:
2371         case P_CSUM_RS_REQUEST:
2372                 fault_type = DRBD_FAULT_RS_RD;
2373                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2374                 if (!di)
2375                         goto out_free_e;
2376
2377                 di->digest_size = pi->size;
2378                 di->digest = (((char *)di)+sizeof(struct digest_info));
2379
2380                 peer_req->digest = di;
2381                 peer_req->flags |= EE_HAS_DIGEST;
2382
2383                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2384                         goto out_free_e;
2385
2386                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2387                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2388                         peer_req->w.cb = w_e_end_csum_rs_req;
2389                         /* used in the sector offset progress display */
2390                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2391                 } else if (pi->cmd == P_OV_REPLY) {
2392                         /* track progress, we may need to throttle */
2393                         atomic_add(size >> 9, &mdev->rs_sect_in);
2394                         peer_req->w.cb = w_e_end_ov_reply;
2395                         dec_rs_pending(mdev);
2396                         /* drbd_rs_begin_io done when we sent this request,
2397                          * but accounting still needs to be done. */
2398                         goto submit_for_resync;
2399                 }
2400                 break;
2401
2402         case P_OV_REQUEST:
2403                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2404                     mdev->tconn->agreed_pro_version >= 90) {
2405                         unsigned long now = jiffies;
2406                         int i;
2407                         mdev->ov_start_sector = sector;
2408                         mdev->ov_position = sector;
2409                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2410                         mdev->rs_total = mdev->ov_left;
2411                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2412                                 mdev->rs_mark_left[i] = mdev->ov_left;
2413                                 mdev->rs_mark_time[i] = now;
2414                         }
2415                         dev_info(DEV, "Online Verify start sector: %llu\n",
2416                                         (unsigned long long)sector);
2417                 }
2418                 peer_req->w.cb = w_e_end_ov_req;
2419                 fault_type = DRBD_FAULT_RS_RD;
2420                 break;
2421
2422         default:
2423                 BUG();
2424         }
2425
2426         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2427          * wrt the receiver, but it is not as straightforward as it may seem.
2428          * Various places in the resync start and stop logic assume resync
2429          * requests are processed in order, requeuing this on the worker thread
2430          * introduces a bunch of new code for synchronization between threads.
2431          *
2432          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2433          * "forever", throttling after drbd_rs_begin_io will lock that extent
2434          * for application writes for the same time.  For now, just throttle
2435          * here, where the rest of the code expects the receiver to sleep for
2436          * a while, anyways.
2437          */
2438
2439         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2440          * this defers syncer requests for some time, before letting at least
2441          * on request through.  The resync controller on the receiving side
2442          * will adapt to the incoming rate accordingly.
2443          *
2444          * We cannot throttle here if remote is Primary/SyncTarget:
2445          * we would also throttle its application reads.
2446          * In that case, throttling is done on the SyncTarget only.
2447          */
2448         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2449                 schedule_timeout_uninterruptible(HZ/10);
2450         if (drbd_rs_begin_io(mdev, sector))
2451                 goto out_free_e;
2452
2453 submit_for_resync:
2454         atomic_add(size >> 9, &mdev->rs_sect_ev);
2455
2456 submit:
2457         inc_unacked(mdev);
2458         spin_lock_irq(&mdev->tconn->req_lock);
2459         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2460         spin_unlock_irq(&mdev->tconn->req_lock);
2461
2462         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2463                 return 0;
2464
2465         /* don't care for the reason here */
2466         dev_err(DEV, "submit failed, triggering re-connect\n");
2467         spin_lock_irq(&mdev->tconn->req_lock);
2468         list_del(&peer_req->w.list);
2469         spin_unlock_irq(&mdev->tconn->req_lock);
2470         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2471
2472 out_free_e:
2473         put_ldev(mdev);
2474         drbd_free_peer_req(mdev, peer_req);
2475         return -EIO;
2476 }
2477
2478 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2479 {
2480         int self, peer, rv = -100;
2481         unsigned long ch_self, ch_peer;
2482         enum drbd_after_sb_p after_sb_0p;
2483
2484         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2485         peer = mdev->p_uuid[UI_BITMAP] & 1;
2486
2487         ch_peer = mdev->p_uuid[UI_SIZE];
2488         ch_self = mdev->comm_bm_set;
2489
2490         rcu_read_lock();
2491         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2492         rcu_read_unlock();
2493         switch (after_sb_0p) {
2494         case ASB_CONSENSUS:
2495         case ASB_DISCARD_SECONDARY:
2496         case ASB_CALL_HELPER:
2497         case ASB_VIOLENTLY:
2498                 dev_err(DEV, "Configuration error.\n");
2499                 break;
2500         case ASB_DISCONNECT:
2501                 break;
2502         case ASB_DISCARD_YOUNGER_PRI:
2503                 if (self == 0 && peer == 1) {
2504                         rv = -1;
2505                         break;
2506                 }
2507                 if (self == 1 && peer == 0) {
2508                         rv =  1;
2509                         break;
2510                 }
2511                 /* Else fall through to one of the other strategies... */
2512         case ASB_DISCARD_OLDER_PRI:
2513                 if (self == 0 && peer == 1) {
2514                         rv = 1;
2515                         break;
2516                 }
2517                 if (self == 1 && peer == 0) {
2518                         rv = -1;
2519                         break;
2520                 }
2521                 /* Else fall through to one of the other strategies... */
2522                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2523                      "Using discard-least-changes instead\n");
2524         case ASB_DISCARD_ZERO_CHG:
2525                 if (ch_peer == 0 && ch_self == 0) {
2526                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2527                                 ? -1 : 1;
2528                         break;
2529                 } else {
2530                         if (ch_peer == 0) { rv =  1; break; }
2531                         if (ch_self == 0) { rv = -1; break; }
2532                 }
2533                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2534                         break;
2535         case ASB_DISCARD_LEAST_CHG:
2536                 if      (ch_self < ch_peer)
2537                         rv = -1;
2538                 else if (ch_self > ch_peer)
2539                         rv =  1;
2540                 else /* ( ch_self == ch_peer ) */
2541                      /* Well, then use something else. */
2542                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2543                                 ? -1 : 1;
2544                 break;
2545         case ASB_DISCARD_LOCAL:
2546                 rv = -1;
2547                 break;
2548         case ASB_DISCARD_REMOTE:
2549                 rv =  1;
2550         }
2551
2552         return rv;
2553 }
2554
2555 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2556 {
2557         int hg, rv = -100;
2558         enum drbd_after_sb_p after_sb_1p;
2559
2560         rcu_read_lock();
2561         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2562         rcu_read_unlock();
2563         switch (after_sb_1p) {
2564         case ASB_DISCARD_YOUNGER_PRI:
2565         case ASB_DISCARD_OLDER_PRI:
2566         case ASB_DISCARD_LEAST_CHG:
2567         case ASB_DISCARD_LOCAL:
2568         case ASB_DISCARD_REMOTE:
2569         case ASB_DISCARD_ZERO_CHG:
2570                 dev_err(DEV, "Configuration error.\n");
2571                 break;
2572         case ASB_DISCONNECT:
2573                 break;
2574         case ASB_CONSENSUS:
2575                 hg = drbd_asb_recover_0p(mdev);
2576                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2577                         rv = hg;
2578                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2579                         rv = hg;
2580                 break;
2581         case ASB_VIOLENTLY:
2582                 rv = drbd_asb_recover_0p(mdev);
2583                 break;
2584         case ASB_DISCARD_SECONDARY:
2585                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2586         case ASB_CALL_HELPER:
2587                 hg = drbd_asb_recover_0p(mdev);
2588                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2589                         enum drbd_state_rv rv2;
2590
2591                         drbd_set_role(mdev, R_SECONDARY, 0);
2592                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2593                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2594                           * we do not need to wait for the after state change work either. */
2595                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2596                         if (rv2 != SS_SUCCESS) {
2597                                 drbd_khelper(mdev, "pri-lost-after-sb");
2598                         } else {
2599                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2600                                 rv = hg;
2601                         }
2602                 } else
2603                         rv = hg;
2604         }
2605
2606         return rv;
2607 }
2608
2609 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2610 {
2611         int hg, rv = -100;
2612         enum drbd_after_sb_p after_sb_2p;
2613
2614         rcu_read_lock();
2615         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2616         rcu_read_unlock();
2617         switch (after_sb_2p) {
2618         case ASB_DISCARD_YOUNGER_PRI:
2619         case ASB_DISCARD_OLDER_PRI:
2620         case ASB_DISCARD_LEAST_CHG:
2621         case ASB_DISCARD_LOCAL:
2622         case ASB_DISCARD_REMOTE:
2623         case ASB_CONSENSUS:
2624         case ASB_DISCARD_SECONDARY:
2625         case ASB_DISCARD_ZERO_CHG:
2626                 dev_err(DEV, "Configuration error.\n");
2627                 break;
2628         case ASB_VIOLENTLY:
2629                 rv = drbd_asb_recover_0p(mdev);
2630                 break;
2631         case ASB_DISCONNECT:
2632                 break;
2633         case ASB_CALL_HELPER:
2634                 hg = drbd_asb_recover_0p(mdev);
2635                 if (hg == -1) {
2636                         enum drbd_state_rv rv2;
2637
2638                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2639                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2640                           * we do not need to wait for the after state change work either. */
2641                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2642                         if (rv2 != SS_SUCCESS) {
2643                                 drbd_khelper(mdev, "pri-lost-after-sb");
2644                         } else {
2645                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2646                                 rv = hg;
2647                         }
2648                 } else
2649                         rv = hg;
2650         }
2651
2652         return rv;
2653 }
2654
2655 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2656                            u64 bits, u64 flags)
2657 {
2658         if (!uuid) {
2659                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2660                 return;
2661         }
2662         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2663              text,
2664              (unsigned long long)uuid[UI_CURRENT],
2665              (unsigned long long)uuid[UI_BITMAP],
2666              (unsigned long long)uuid[UI_HISTORY_START],
2667              (unsigned long long)uuid[UI_HISTORY_END],
2668              (unsigned long long)bits,
2669              (unsigned long long)flags);
2670 }
2671
2672 /*
2673   100   after split brain try auto recover
2674     2   C_SYNC_SOURCE set BitMap
2675     1   C_SYNC_SOURCE use BitMap
2676     0   no Sync
2677    -1   C_SYNC_TARGET use BitMap
2678    -2   C_SYNC_TARGET set BitMap
2679  -100   after split brain, disconnect
2680 -1000   unrelated data
2681 -1091   requires proto 91
2682 -1096   requires proto 96
2683  */
2684 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2685 {
2686         u64 self, peer;
2687         int i, j;
2688
2689         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2690         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2691
2692         *rule_nr = 10;
2693         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2694                 return 0;
2695
2696         *rule_nr = 20;
2697         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2698              peer != UUID_JUST_CREATED)
2699                 return -2;
2700
2701         *rule_nr = 30;
2702         if (self != UUID_JUST_CREATED &&
2703             (peer == UUID_JUST_CREATED || peer == (u64)0))
2704                 return 2;
2705
2706         if (self == peer) {
2707                 int rct, dc; /* roles at crash time */
2708
2709                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2710
2711                         if (mdev->tconn->agreed_pro_version < 91)
2712                                 return -1091;
2713
2714                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2715                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2716                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2717                                 drbd_uuid_set_bm(mdev, 0UL);
2718
2719                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2720                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2721                                 *rule_nr = 34;
2722                         } else {
2723                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2724                                 *rule_nr = 36;
2725                         }
2726
2727                         return 1;
2728                 }
2729
2730                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2731
2732                         if (mdev->tconn->agreed_pro_version < 91)
2733                                 return -1091;
2734
2735                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2736                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2737                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2738
2739                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2740                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2741                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2742
2743                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2744                                 *rule_nr = 35;
2745                         } else {
2746                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2747                                 *rule_nr = 37;
2748                         }
2749
2750                         return -1;
2751                 }
2752
2753                 /* Common power [off|failure] */
2754                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2755                         (mdev->p_uuid[UI_FLAGS] & 2);
2756                 /* lowest bit is set when we were primary,
2757                  * next bit (weight 2) is set when peer was primary */
2758                 *rule_nr = 40;
2759
2760                 switch (rct) {
2761                 case 0: /* !self_pri && !peer_pri */ return 0;
2762                 case 1: /*  self_pri && !peer_pri */ return 1;
2763                 case 2: /* !self_pri &&  peer_pri */ return -1;
2764                 case 3: /*  self_pri &&  peer_pri */
2765                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2766                         return dc ? -1 : 1;
2767                 }
2768         }
2769
2770         *rule_nr = 50;
2771         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2772         if (self == peer)
2773                 return -1;
2774
2775         *rule_nr = 51;
2776         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2777         if (self == peer) {
2778                 if (mdev->tconn->agreed_pro_version < 96 ?
2779                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2780                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2781                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2782                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2783                            resync as sync source modifications of the peer's UUIDs. */
2784
2785                         if (mdev->tconn->agreed_pro_version < 91)
2786                                 return -1091;
2787
2788                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2789                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2790
2791                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2792                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2793
2794                         return -1;
2795                 }
2796         }
2797
2798         *rule_nr = 60;
2799         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2800         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2801                 peer = mdev->p_uuid[i] & ~((u64)1);
2802                 if (self == peer)
2803                         return -2;
2804         }
2805
2806         *rule_nr = 70;
2807         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2808         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2809         if (self == peer)
2810                 return 1;
2811
2812         *rule_nr = 71;
2813         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2814         if (self == peer) {
2815                 if (mdev->tconn->agreed_pro_version < 96 ?
2816                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2817                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2818                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2819                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2820                            resync as sync source modifications of our UUIDs. */
2821
2822                         if (mdev->tconn->agreed_pro_version < 91)
2823                                 return -1091;
2824
2825                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2826                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2827
2828                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2829                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2830                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2831
2832                         return 1;
2833                 }
2834         }
2835
2836
2837         *rule_nr = 80;
2838         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2839         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2840                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2841                 if (self == peer)
2842                         return 2;
2843         }
2844
2845         *rule_nr = 90;
2846         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2847         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2848         if (self == peer && self != ((u64)0))
2849                 return 100;
2850
2851         *rule_nr = 100;
2852         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2853                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2854                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2855                         peer = mdev->p_uuid[j] & ~((u64)1);
2856                         if (self == peer)
2857                                 return -100;
2858                 }
2859         }
2860
2861         return -1000;
2862 }
2863
2864 /* drbd_sync_handshake() returns the new conn state on success, or
2865    CONN_MASK (-1) on failure.
2866  */
2867 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2868                                            enum drbd_disk_state peer_disk) __must_hold(local)
2869 {
2870         enum drbd_conns rv = C_MASK;
2871         enum drbd_disk_state mydisk;
2872         struct net_conf *nc;
2873         int hg, rule_nr, rr_conflict, tentative;
2874
2875         mydisk = mdev->state.disk;
2876         if (mydisk == D_NEGOTIATING)
2877                 mydisk = mdev->new_state_tmp.disk;
2878
2879         dev_info(DEV, "drbd_sync_handshake:\n");
2880         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2881         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2882                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2883
2884         hg = drbd_uuid_compare(mdev, &rule_nr);
2885
2886         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2887
2888         if (hg == -1000) {
2889                 dev_alert(DEV, "Unrelated data, aborting!\n");
2890                 return C_MASK;
2891         }
2892         if (hg < -1000) {
2893                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2894                 return C_MASK;
2895         }
2896
2897         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2898             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2899                 int f = (hg == -100) || abs(hg) == 2;
2900                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2901                 if (f)
2902                         hg = hg*2;
2903                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2904                      hg > 0 ? "source" : "target");
2905         }
2906
2907         if (abs(hg) == 100)
2908                 drbd_khelper(mdev, "initial-split-brain");
2909
2910         rcu_read_lock();
2911         nc = rcu_dereference(mdev->tconn->net_conf);
2912
2913         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2914                 int pcount = (mdev->state.role == R_PRIMARY)
2915                            + (peer_role == R_PRIMARY);
2916                 int forced = (hg == -100);
2917
2918                 switch (pcount) {
2919                 case 0:
2920                         hg = drbd_asb_recover_0p(mdev);
2921                         break;
2922                 case 1:
2923                         hg = drbd_asb_recover_1p(mdev);
2924                         break;
2925                 case 2:
2926                         hg = drbd_asb_recover_2p(mdev);
2927                         break;
2928                 }
2929                 if (abs(hg) < 100) {
2930                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2931                              "automatically solved. Sync from %s node\n",
2932                              pcount, (hg < 0) ? "peer" : "this");
2933                         if (forced) {
2934                                 dev_warn(DEV, "Doing a full sync, since"
2935                                      " UUIDs where ambiguous.\n");
2936                                 hg = hg*2;
2937                         }
2938                 }
2939         }
2940
2941         if (hg == -100) {
2942                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2943                         hg = -1;
2944                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2945                         hg = 1;
2946
2947                 if (abs(hg) < 100)
2948                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2949                              "Sync from %s node\n",
2950                              (hg < 0) ? "peer" : "this");
2951         }
2952         rr_conflict = nc->rr_conflict;
2953         tentative = nc->tentative;
2954         rcu_read_unlock();
2955
2956         if (hg == -100) {
2957                 /* FIXME this log message is not correct if we end up here
2958                  * after an attempted attach on a diskless node.
2959                  * We just refuse to attach -- well, we drop the "connection"
2960                  * to that disk, in a way... */
2961                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2962                 drbd_khelper(mdev, "split-brain");
2963                 return C_MASK;
2964         }
2965
2966         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2967                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2968                 return C_MASK;
2969         }
2970
2971         if (hg < 0 && /* by intention we do not use mydisk here. */
2972             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2973                 switch (rr_conflict) {
2974                 case ASB_CALL_HELPER:
2975                         drbd_khelper(mdev, "pri-lost");
2976                         /* fall through */
2977                 case ASB_DISCONNECT:
2978                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2979                         return C_MASK;
2980                 case ASB_VIOLENTLY:
2981                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2982                              "assumption\n");
2983                 }
2984         }
2985
2986         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2987                 if (hg == 0)
2988                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2989                 else
2990                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2991                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2992                                  abs(hg) >= 2 ? "full" : "bit-map based");
2993                 return C_MASK;
2994         }
2995
2996         if (abs(hg) >= 2) {
2997                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2998                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2999                                         BM_LOCKED_SET_ALLOWED))
3000                         return C_MASK;
3001         }
3002
3003         if (hg > 0) { /* become sync source. */
3004                 rv = C_WF_BITMAP_S;
3005         } else if (hg < 0) { /* become sync target */
3006                 rv = C_WF_BITMAP_T;
3007         } else {
3008                 rv = C_CONNECTED;
3009                 if (drbd_bm_total_weight(mdev)) {
3010                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3011                              drbd_bm_total_weight(mdev));
3012                 }
3013         }
3014
3015         return rv;
3016 }
3017
3018 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3019 {
3020         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3021         if (peer == ASB_DISCARD_REMOTE)
3022                 return ASB_DISCARD_LOCAL;
3023
3024         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3025         if (peer == ASB_DISCARD_LOCAL)
3026                 return ASB_DISCARD_REMOTE;
3027
3028         /* everything else is valid if they are equal on both sides. */
3029         return peer;
3030 }
3031
3032 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3033 {
3034         struct p_protocol *p = pi->data;
3035         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3036         int p_proto, p_discard_my_data, p_two_primaries, cf;
3037         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3038         char integrity_alg[SHARED_SECRET_MAX] = "";
3039         struct crypto_hash *peer_integrity_tfm = NULL;
3040         void *int_dig_in = NULL, *int_dig_vv = NULL;
3041
3042         p_proto         = be32_to_cpu(p->protocol);
3043         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3044         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3045         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3046         p_two_primaries = be32_to_cpu(p->two_primaries);
3047         cf              = be32_to_cpu(p->conn_flags);
3048         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3049
3050         if (tconn->agreed_pro_version >= 87) {
3051                 int err;
3052
3053                 if (pi->size > sizeof(integrity_alg))
3054                         return -EIO;
3055                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3056                 if (err)
3057                         return err;
3058                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3059         }
3060
3061         if (pi->cmd != P_PROTOCOL_UPDATE) {
3062                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3063
3064                 if (cf & CF_DRY_RUN)
3065                         set_bit(CONN_DRY_RUN, &tconn->flags);
3066
3067                 rcu_read_lock();
3068                 nc = rcu_dereference(tconn->net_conf);
3069
3070                 if (p_proto != nc->wire_protocol) {
3071                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3072                         goto disconnect_rcu_unlock;
3073                 }
3074
3075                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3076                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3077                         goto disconnect_rcu_unlock;
3078                 }
3079
3080                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3081                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3082                         goto disconnect_rcu_unlock;
3083                 }
3084
3085                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3086                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3087                         goto disconnect_rcu_unlock;
3088                 }
3089
3090                 if (p_discard_my_data && nc->discard_my_data) {
3091                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3092                         goto disconnect_rcu_unlock;
3093                 }
3094
3095                 if (p_two_primaries != nc->two_primaries) {
3096                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3097                         goto disconnect_rcu_unlock;
3098                 }
3099
3100                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3101                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3102                         goto disconnect_rcu_unlock;
3103                 }
3104
3105                 rcu_read_unlock();
3106         }
3107
3108         if (integrity_alg[0]) {
3109                 int hash_size;
3110
3111                 /*
3112                  * We can only change the peer data integrity algorithm
3113                  * here.  Changing our own data integrity algorithm
3114                  * requires that we send a P_PROTOCOL_UPDATE packet at
3115                  * the same time; otherwise, the peer has no way to
3116                  * tell between which packets the algorithm should
3117                  * change.
3118                  */
3119
3120                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3121                 if (!peer_integrity_tfm) {
3122                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3123                                  integrity_alg);
3124                         goto disconnect;
3125                 }
3126
3127                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3128                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3129                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3130                 if (!(int_dig_in && int_dig_vv)) {
3131                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3132                         goto disconnect;
3133                 }
3134         }
3135
3136         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3137         if (!new_net_conf) {
3138                 conn_err(tconn, "Allocation of new net_conf failed\n");
3139                 goto disconnect;
3140         }
3141
3142         mutex_lock(&tconn->data.mutex);
3143         mutex_lock(&tconn->conf_update);
3144         old_net_conf = tconn->net_conf;
3145         *new_net_conf = *old_net_conf;
3146
3147         new_net_conf->wire_protocol = p_proto;
3148         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3149         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3150         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3151         new_net_conf->two_primaries = p_two_primaries;
3152
3153         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3154         mutex_unlock(&tconn->conf_update);
3155         mutex_unlock(&tconn->data.mutex);
3156
3157         crypto_free_hash(tconn->peer_integrity_tfm);
3158         kfree(tconn->int_dig_in);
3159         kfree(tconn->int_dig_vv);
3160         tconn->peer_integrity_tfm = peer_integrity_tfm;
3161         tconn->int_dig_in = int_dig_in;
3162         tconn->int_dig_vv = int_dig_vv;
3163
3164         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3165                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3166                           integrity_alg[0] ? integrity_alg : "(none)");
3167
3168         synchronize_rcu();
3169         kfree(old_net_conf);
3170         return 0;
3171
3172 disconnect_rcu_unlock:
3173         rcu_read_unlock();
3174 disconnect:
3175         crypto_free_hash(peer_integrity_tfm);
3176         kfree(int_dig_in);
3177         kfree(int_dig_vv);
3178         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3179         return -EIO;
3180 }
3181
3182 /* helper function
3183  * input: alg name, feature name
3184  * return: NULL (alg name was "")
3185  *         ERR_PTR(error) if something goes wrong
3186  *         or the crypto hash ptr, if it worked out ok. */
3187 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3188                 const char *alg, const char *name)
3189 {
3190         struct crypto_hash *tfm;
3191
3192         if (!alg[0])
3193                 return NULL;
3194
3195         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3196         if (IS_ERR(tfm)) {
3197                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3198                         alg, name, PTR_ERR(tfm));
3199                 return tfm;
3200         }
3201         return tfm;
3202 }
3203
3204 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3205 {
3206         void *buffer = tconn->data.rbuf;
3207         int size = pi->size;
3208
3209         while (size) {
3210                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3211                 s = drbd_recv(tconn, buffer, s);
3212                 if (s <= 0) {
3213                         if (s < 0)
3214                                 return s;
3215                         break;
3216                 }
3217                 size -= s;
3218         }
3219         if (size)
3220                 return -EIO;
3221         return 0;
3222 }
3223
3224 /*
3225  * config_unknown_volume  -  device configuration command for unknown volume
3226  *
3227  * When a device is added to an existing connection, the node on which the
3228  * device is added first will send configuration commands to its peer but the
3229  * peer will not know about the device yet.  It will warn and ignore these
3230  * commands.  Once the device is added on the second node, the second node will
3231  * send the same device configuration commands, but in the other direction.
3232  *
3233  * (We can also end up here if drbd is misconfigured.)
3234  */
3235 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3236 {
3237         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3238                   cmdname(pi->cmd), pi->vnr);
3239         return ignore_remaining_packet(tconn, pi);
3240 }
3241
3242 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3243 {
3244         struct drbd_conf *mdev;
3245         struct p_rs_param_95 *p;
3246         unsigned int header_size, data_size, exp_max_sz;
3247         struct crypto_hash *verify_tfm = NULL;
3248         struct crypto_hash *csums_tfm = NULL;
3249         struct net_conf *old_net_conf, *new_net_conf = NULL;
3250         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3251         const int apv = tconn->agreed_pro_version;
3252         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3253         int fifo_size = 0;
3254         int err;
3255
3256         mdev = vnr_to_mdev(tconn, pi->vnr);
3257         if (!mdev)
3258                 return config_unknown_volume(tconn, pi);
3259
3260         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3261                     : apv == 88 ? sizeof(struct p_rs_param)
3262                                         + SHARED_SECRET_MAX
3263                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3264                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3265
3266         if (pi->size > exp_max_sz) {
3267                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3268                     pi->size, exp_max_sz);
3269                 return -EIO;
3270         }
3271
3272         if (apv <= 88) {
3273                 header_size = sizeof(struct p_rs_param);
3274                 data_size = pi->size - header_size;
3275         } else if (apv <= 94) {
3276                 header_size = sizeof(struct p_rs_param_89);
3277                 data_size = pi->size - header_size;
3278                 D_ASSERT(data_size == 0);
3279         } else {
3280                 header_size = sizeof(struct p_rs_param_95);
3281                 data_size = pi->size - header_size;
3282                 D_ASSERT(data_size == 0);
3283         }
3284
3285         /* initialize verify_alg and csums_alg */
3286         p = pi->data;
3287         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3288
3289         err = drbd_recv_all(mdev->tconn, p, header_size);
3290         if (err)
3291                 return err;
3292
3293         mutex_lock(&mdev->tconn->conf_update);
3294         old_net_conf = mdev->tconn->net_conf;
3295         if (get_ldev(mdev)) {
3296                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3297                 if (!new_disk_conf) {
3298                         put_ldev(mdev);
3299                         mutex_unlock(&mdev->tconn->conf_update);
3300                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3301                         return -ENOMEM;
3302                 }
3303
3304                 old_disk_conf = mdev->ldev->disk_conf;
3305                 *new_disk_conf = *old_disk_conf;
3306
3307                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3308         }
3309
3310         if (apv >= 88) {
3311                 if (apv == 88) {
3312                         if (data_size > SHARED_SECRET_MAX) {
3313                                 dev_err(DEV, "verify-alg too long, "
3314                                     "peer wants %u, accepting only %u byte\n",
3315                                                 data_size, SHARED_SECRET_MAX);
3316                                 err = -EIO;
3317                                 goto reconnect;
3318                         }
3319
3320                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3321                         if (err)
3322                                 goto reconnect;
3323                         /* we expect NUL terminated string */
3324                         /* but just in case someone tries to be evil */
3325                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3326                         p->verify_alg[data_size-1] = 0;
3327
3328                 } else /* apv >= 89 */ {
3329                         /* we still expect NUL terminated strings */
3330                         /* but just in case someone tries to be evil */
3331                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3332                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3333                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3334                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3335                 }
3336
3337                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3338                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3339                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3340                                     old_net_conf->verify_alg, p->verify_alg);
3341                                 goto disconnect;
3342                         }
3343                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3344                                         p->verify_alg, "verify-alg");
3345                         if (IS_ERR(verify_tfm)) {
3346                                 verify_tfm = NULL;
3347                                 goto disconnect;
3348                         }
3349                 }
3350
3351                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3352                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3353                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3354                                     old_net_conf->csums_alg, p->csums_alg);
3355                                 goto disconnect;
3356                         }
3357                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3358                                         p->csums_alg, "csums-alg");
3359                         if (IS_ERR(csums_tfm)) {
3360                                 csums_tfm = NULL;
3361                                 goto disconnect;
3362                         }
3363                 }
3364
3365                 if (apv > 94 && new_disk_conf) {
3366                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3367                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3368                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3369                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3370
3371                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3372                         if (fifo_size != mdev->rs_plan_s->size) {
3373                                 new_plan = fifo_alloc(fifo_size);
3374                                 if (!new_plan) {
3375                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3376                                         put_ldev(mdev);
3377                                         goto disconnect;
3378                                 }
3379                         }
3380                 }
3381
3382                 if (verify_tfm || csums_tfm) {
3383                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3384                         if (!new_net_conf) {
3385                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3386                                 goto disconnect;
3387                         }
3388
3389                         *new_net_conf = *old_net_conf;
3390
3391                         if (verify_tfm) {
3392                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3393                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3394                                 crypto_free_hash(mdev->tconn->verify_tfm);
3395                                 mdev->tconn->verify_tfm = verify_tfm;
3396                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3397                         }
3398                         if (csums_tfm) {
3399                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3400                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3401                                 crypto_free_hash(mdev->tconn->csums_tfm);
3402                                 mdev->tconn->csums_tfm = csums_tfm;
3403                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3404                         }
3405                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3406                 }
3407         }
3408
3409         if (new_disk_conf) {
3410                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3411                 put_ldev(mdev);
3412         }
3413
3414         if (new_plan) {
3415                 old_plan = mdev->rs_plan_s;
3416                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3417         }
3418
3419         mutex_unlock(&mdev->tconn->conf_update);
3420         synchronize_rcu();
3421         if (new_net_conf)
3422                 kfree(old_net_conf);
3423         kfree(old_disk_conf);
3424         kfree(old_plan);
3425
3426         return 0;
3427
3428 reconnect:
3429         if (new_disk_conf) {
3430                 put_ldev(mdev);
3431                 kfree(new_disk_conf);
3432         }
3433         mutex_unlock(&mdev->tconn->conf_update);
3434         return -EIO;
3435
3436 disconnect:
3437         kfree(new_plan);
3438         if (new_disk_conf) {
3439                 put_ldev(mdev);
3440                 kfree(new_disk_conf);
3441         }
3442         mutex_unlock(&mdev->tconn->conf_update);
3443         /* just for completeness: actually not needed,
3444          * as this is not reached if csums_tfm was ok. */
3445         crypto_free_hash(csums_tfm);
3446         /* but free the verify_tfm again, if csums_tfm did not work out */
3447         crypto_free_hash(verify_tfm);
3448         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3449         return -EIO;
3450 }
3451
3452 /* warn if the arguments differ by more than 12.5% */
3453 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3454         const char *s, sector_t a, sector_t b)
3455 {
3456         sector_t d;
3457         if (a == 0 || b == 0)
3458                 return;
3459         d = (a > b) ? (a - b) : (b - a);
3460         if (d > (a>>3) || d > (b>>3))
3461                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3462                      (unsigned long long)a, (unsigned long long)b);
3463 }
3464
3465 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3466 {
3467         struct drbd_conf *mdev;
3468         struct p_sizes *p = pi->data;
3469         enum determine_dev_size dd = unchanged;
3470         sector_t p_size, p_usize, my_usize;
3471         int ldsc = 0; /* local disk size changed */
3472         enum dds_flags ddsf;
3473
3474         mdev = vnr_to_mdev(tconn, pi->vnr);
3475         if (!mdev)
3476                 return config_unknown_volume(tconn, pi);
3477
3478         p_size = be64_to_cpu(p->d_size);
3479         p_usize = be64_to_cpu(p->u_size);
3480
3481         /* just store the peer's disk size for now.
3482          * we still need to figure out whether we accept that. */
3483         mdev->p_size = p_size;
3484
3485         if (get_ldev(mdev)) {
3486                 rcu_read_lock();
3487                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3488                 rcu_read_unlock();
3489
3490                 warn_if_differ_considerably(mdev, "lower level device sizes",
3491                            p_size, drbd_get_max_capacity(mdev->ldev));
3492                 warn_if_differ_considerably(mdev, "user requested size",
3493                                             p_usize, my_usize);
3494
3495                 /* if this is the first connect, or an otherwise expected
3496                  * param exchange, choose the minimum */
3497                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3498                         p_usize = min_not_zero(my_usize, p_usize);
3499
3500                 /* Never shrink a device with usable data during connect.
3501                    But allow online shrinking if we are connected. */
3502                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3503                     drbd_get_capacity(mdev->this_bdev) &&
3504                     mdev->state.disk >= D_OUTDATED &&
3505                     mdev->state.conn < C_CONNECTED) {
3506                         dev_err(DEV, "The peer's disk size is too small!\n");
3507                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3508                         put_ldev(mdev);
3509                         return -EIO;
3510                 }
3511
3512                 if (my_usize != p_usize) {
3513                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3514
3515                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3516                         if (!new_disk_conf) {
3517                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3518                                 put_ldev(mdev);
3519                                 return -ENOMEM;
3520                         }
3521
3522                         mutex_lock(&mdev->tconn->conf_update);
3523                         old_disk_conf = mdev->ldev->disk_conf;
3524                         *new_disk_conf = *old_disk_conf;
3525                         new_disk_conf->disk_size = p_usize;
3526
3527                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3528                         mutex_unlock(&mdev->tconn->conf_update);
3529                         synchronize_rcu();
3530                         kfree(old_disk_conf);
3531
3532                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3533                                  (unsigned long)my_usize);
3534                 }
3535
3536                 put_ldev(mdev);
3537         }
3538
3539         ddsf = be16_to_cpu(p->dds_flags);
3540         if (get_ldev(mdev)) {
3541                 dd = drbd_determine_dev_size(mdev, ddsf);
3542                 put_ldev(mdev);
3543                 if (dd == dev_size_error)
3544                         return -EIO;
3545                 drbd_md_sync(mdev);
3546         } else {
3547                 /* I am diskless, need to accept the peer's size. */
3548                 drbd_set_my_capacity(mdev, p_size);
3549         }
3550
3551         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3552         drbd_reconsider_max_bio_size(mdev);
3553
3554         if (get_ldev(mdev)) {
3555                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3556                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3557                         ldsc = 1;
3558                 }
3559
3560                 put_ldev(mdev);
3561         }
3562
3563         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3564                 if (be64_to_cpu(p->c_size) !=
3565                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3566                         /* we have different sizes, probably peer
3567                          * needs to know my new size... */
3568                         drbd_send_sizes(mdev, 0, ddsf);
3569                 }
3570                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3571                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3572                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3573                             mdev->state.disk >= D_INCONSISTENT) {
3574                                 if (ddsf & DDSF_NO_RESYNC)
3575                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3576                                 else
3577                                         resync_after_online_grow(mdev);
3578                         } else
3579                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3580                 }
3581         }
3582
3583         return 0;
3584 }
3585
3586 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3587 {
3588         struct drbd_conf *mdev;
3589         struct p_uuids *p = pi->data;
3590         u64 *p_uuid;
3591         int i, updated_uuids = 0;
3592
3593         mdev = vnr_to_mdev(tconn, pi->vnr);
3594         if (!mdev)
3595                 return config_unknown_volume(tconn, pi);
3596
3597         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3598
3599         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3600                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3601
3602         kfree(mdev->p_uuid);
3603         mdev->p_uuid = p_uuid;
3604
3605         if (mdev->state.conn < C_CONNECTED &&
3606             mdev->state.disk < D_INCONSISTENT &&
3607             mdev->state.role == R_PRIMARY &&
3608             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3609                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3610                     (unsigned long long)mdev->ed_uuid);
3611                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3612                 return -EIO;
3613         }
3614
3615         if (get_ldev(mdev)) {
3616                 int skip_initial_sync =
3617                         mdev->state.conn == C_CONNECTED &&
3618                         mdev->tconn->agreed_pro_version >= 90 &&
3619                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3620                         (p_uuid[UI_FLAGS] & 8);
3621                 if (skip_initial_sync) {
3622                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3623                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3624                                         "clear_n_write from receive_uuids",
3625                                         BM_LOCKED_TEST_ALLOWED);
3626                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3627                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3628                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3629                                         CS_VERBOSE, NULL);
3630                         drbd_md_sync(mdev);
3631                         updated_uuids = 1;
3632                 }
3633                 put_ldev(mdev);
3634         } else if (mdev->state.disk < D_INCONSISTENT &&
3635                    mdev->state.role == R_PRIMARY) {
3636                 /* I am a diskless primary, the peer just created a new current UUID
3637                    for me. */
3638                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3639         }
3640
3641         /* Before we test for the disk state, we should wait until an eventually
3642            ongoing cluster wide state change is finished. That is important if
3643            we are primary and are detaching from our disk. We need to see the
3644            new disk state... */
3645         mutex_lock(mdev->state_mutex);
3646         mutex_unlock(mdev->state_mutex);
3647         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3648                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3649
3650         if (updated_uuids)
3651                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3652
3653         return 0;
3654 }
3655
3656 /**
3657  * convert_state() - Converts the peer's view of the cluster state to our point of view
3658  * @ps:         The state as seen by the peer.
3659  */
3660 static union drbd_state convert_state(union drbd_state ps)
3661 {
3662         union drbd_state ms;
3663
3664         static enum drbd_conns c_tab[] = {
3665                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3666                 [C_CONNECTED] = C_CONNECTED,
3667
3668                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3669                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3670                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3671                 [C_VERIFY_S]       = C_VERIFY_T,
3672                 [C_MASK]   = C_MASK,
3673         };
3674
3675         ms.i = ps.i;
3676
3677         ms.conn = c_tab[ps.conn];
3678         ms.peer = ps.role;
3679         ms.role = ps.peer;
3680         ms.pdsk = ps.disk;
3681         ms.disk = ps.pdsk;
3682         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3683
3684         return ms;
3685 }
3686
3687 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3688 {
3689         struct drbd_conf *mdev;
3690         struct p_req_state *p = pi->data;
3691         union drbd_state mask, val;
3692         enum drbd_state_rv rv;
3693
3694         mdev = vnr_to_mdev(tconn, pi->vnr);
3695         if (!mdev)
3696                 return -EIO;
3697
3698         mask.i = be32_to_cpu(p->mask);
3699         val.i = be32_to_cpu(p->val);
3700
3701         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3702             mutex_is_locked(mdev->state_mutex)) {
3703                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3704                 return 0;
3705         }
3706
3707         mask = convert_state(mask);
3708         val = convert_state(val);
3709
3710         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3711         drbd_send_sr_reply(mdev, rv);
3712
3713         drbd_md_sync(mdev);
3714
3715         return 0;
3716 }
3717
3718 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3719 {
3720         struct p_req_state *p = pi->data;
3721         union drbd_state mask, val;
3722         enum drbd_state_rv rv;
3723
3724         mask.i = be32_to_cpu(p->mask);
3725         val.i = be32_to_cpu(p->val);
3726
3727         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3728             mutex_is_locked(&tconn->cstate_mutex)) {
3729                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3730                 return 0;
3731         }
3732
3733         mask = convert_state(mask);
3734         val = convert_state(val);
3735
3736         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3737         conn_send_sr_reply(tconn, rv);
3738
3739         return 0;
3740 }
3741
3742 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3743 {
3744         struct drbd_conf *mdev;
3745         struct p_state *p = pi->data;
3746         union drbd_state os, ns, peer_state;
3747         enum drbd_disk_state real_peer_disk;
3748         enum chg_state_flags cs_flags;
3749         int rv;
3750
3751         mdev = vnr_to_mdev(tconn, pi->vnr);
3752         if (!mdev)
3753                 return config_unknown_volume(tconn, pi);
3754
3755         peer_state.i = be32_to_cpu(p->state);
3756
3757         real_peer_disk = peer_state.disk;
3758         if (peer_state.disk == D_NEGOTIATING) {
3759                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3760                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3761         }
3762
3763         spin_lock_irq(&mdev->tconn->req_lock);
3764  retry:
3765         os = ns = drbd_read_state(mdev);
3766         spin_unlock_irq(&mdev->tconn->req_lock);
3767
3768         /* If this is the "end of sync" confirmation, usually the peer disk
3769          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3770          * set) resync started in PausedSyncT, or if the timing of pause-/
3771          * unpause-sync events has been "just right", the peer disk may
3772          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3773          */
3774         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3775             real_peer_disk == D_UP_TO_DATE &&
3776             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3777                 /* If we are (becoming) SyncSource, but peer is still in sync
3778                  * preparation, ignore its uptodate-ness to avoid flapping, it
3779                  * will change to inconsistent once the peer reaches active
3780                  * syncing states.
3781                  * It may have changed syncer-paused flags, however, so we
3782                  * cannot ignore this completely. */
3783                 if (peer_state.conn > C_CONNECTED &&
3784                     peer_state.conn < C_SYNC_SOURCE)
3785                         real_peer_disk = D_INCONSISTENT;
3786
3787                 /* if peer_state changes to connected at the same time,
3788                  * it explicitly notifies us that it finished resync.
3789                  * Maybe we should finish it up, too? */
3790                 else if (os.conn >= C_SYNC_SOURCE &&
3791                          peer_state.conn == C_CONNECTED) {
3792                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3793                                 drbd_resync_finished(mdev);
3794                         return 0;
3795                 }
3796         }
3797
3798         /* peer says his disk is inconsistent, while we think it is uptodate,
3799          * and this happens while the peer still thinks we have a sync going on,
3800          * but we think we are already done with the sync.
3801          * We ignore this to avoid flapping pdsk.
3802          * This should not happen, if the peer is a recent version of drbd. */
3803         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3804             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3805                 real_peer_disk = D_UP_TO_DATE;
3806
3807         if (ns.conn == C_WF_REPORT_PARAMS)
3808                 ns.conn = C_CONNECTED;
3809
3810         if (peer_state.conn == C_AHEAD)
3811                 ns.conn = C_BEHIND;
3812
3813         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3814             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3815                 int cr; /* consider resync */
3816
3817                 /* if we established a new connection */
3818                 cr  = (os.conn < C_CONNECTED);
3819                 /* if we had an established connection
3820                  * and one of the nodes newly attaches a disk */
3821                 cr |= (os.conn == C_CONNECTED &&
3822                        (peer_state.disk == D_NEGOTIATING ||
3823                         os.disk == D_NEGOTIATING));
3824                 /* if we have both been inconsistent, and the peer has been
3825                  * forced to be UpToDate with --overwrite-data */
3826                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3827                 /* if we had been plain connected, and the admin requested to
3828                  * start a sync by "invalidate" or "invalidate-remote" */
3829                 cr |= (os.conn == C_CONNECTED &&
3830                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3831                                  peer_state.conn <= C_WF_BITMAP_T));
3832
3833                 if (cr)
3834                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3835
3836                 put_ldev(mdev);
3837                 if (ns.conn == C_MASK) {
3838                         ns.conn = C_CONNECTED;
3839                         if (mdev->state.disk == D_NEGOTIATING) {
3840                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3841                         } else if (peer_state.disk == D_NEGOTIATING) {
3842                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3843                                 peer_state.disk = D_DISKLESS;
3844                                 real_peer_disk = D_DISKLESS;
3845                         } else {
3846                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3847                                         return -EIO;
3848                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3849                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3850                                 return -EIO;
3851                         }
3852                 }
3853         }
3854
3855         spin_lock_irq(&mdev->tconn->req_lock);
3856         if (os.i != drbd_read_state(mdev).i)
3857                 goto retry;
3858         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3859         ns.peer = peer_state.role;
3860         ns.pdsk = real_peer_disk;
3861         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3862         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3863                 ns.disk = mdev->new_state_tmp.disk;
3864         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3865         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3866             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3867                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3868                    for temporal network outages! */
3869                 spin_unlock_irq(&mdev->tconn->req_lock);
3870                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3871                 tl_clear(mdev->tconn);
3872                 drbd_uuid_new_current(mdev);
3873                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3874                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3875                 return -EIO;
3876         }
3877         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3878         ns = drbd_read_state(mdev);
3879         spin_unlock_irq(&mdev->tconn->req_lock);
3880
3881         if (rv < SS_SUCCESS) {
3882                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3883                 return -EIO;
3884         }
3885
3886         if (os.conn > C_WF_REPORT_PARAMS) {
3887                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3888                     peer_state.disk != D_NEGOTIATING ) {
3889                         /* we want resync, peer has not yet decided to sync... */
3890                         /* Nowadays only used when forcing a node into primary role and
3891                            setting its disk to UpToDate with that */
3892                         drbd_send_uuids(mdev);
3893                         drbd_send_current_state(mdev);
3894                 }
3895         }
3896
3897         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3898
3899         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3900
3901         return 0;
3902 }
3903
3904 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3905 {
3906         struct drbd_conf *mdev;
3907         struct p_rs_uuid *p = pi->data;
3908
3909         mdev = vnr_to_mdev(tconn, pi->vnr);
3910         if (!mdev)
3911                 return -EIO;
3912
3913         wait_event(mdev->misc_wait,
3914                    mdev->state.conn == C_WF_SYNC_UUID ||
3915                    mdev->state.conn == C_BEHIND ||
3916                    mdev->state.conn < C_CONNECTED ||
3917                    mdev->state.disk < D_NEGOTIATING);
3918
3919         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3920
3921         /* Here the _drbd_uuid_ functions are right, current should
3922            _not_ be rotated into the history */
3923         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3924                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3925                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3926
3927                 drbd_print_uuids(mdev, "updated sync uuid");
3928                 drbd_start_resync(mdev, C_SYNC_TARGET);
3929
3930                 put_ldev(mdev);
3931         } else
3932                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3933
3934         return 0;
3935 }
3936
3937 /**
3938  * receive_bitmap_plain
3939  *
3940  * Return 0 when done, 1 when another iteration is needed, and a negative error
3941  * code upon failure.
3942  */
3943 static int
3944 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3945                      unsigned long *p, struct bm_xfer_ctx *c)
3946 {
3947         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3948                                  drbd_header_size(mdev->tconn);
3949         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3950                                        c->bm_words - c->word_offset);
3951         unsigned int want = num_words * sizeof(*p);
3952         int err;
3953
3954         if (want != size) {
3955                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3956                 return -EIO;
3957         }
3958         if (want == 0)
3959                 return 0;
3960         err = drbd_recv_all(mdev->tconn, p, want);
3961         if (err)
3962                 return err;
3963
3964         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3965
3966         c->word_offset += num_words;
3967         c->bit_offset = c->word_offset * BITS_PER_LONG;
3968         if (c->bit_offset > c->bm_bits)
3969                 c->bit_offset = c->bm_bits;
3970
3971         return 1;
3972 }
3973
3974 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3975 {
3976         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3977 }
3978
3979 static int dcbp_get_start(struct p_compressed_bm *p)
3980 {
3981         return (p->encoding & 0x80) != 0;
3982 }
3983
3984 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3985 {
3986         return (p->encoding >> 4) & 0x7;
3987 }
3988
3989 /**
3990  * recv_bm_rle_bits
3991  *
3992  * Return 0 when done, 1 when another iteration is needed, and a negative error
3993  * code upon failure.
3994  */
3995 static int
3996 recv_bm_rle_bits(struct drbd_conf *mdev,
3997                 struct p_compressed_bm *p,
3998                  struct bm_xfer_ctx *c,
3999                  unsigned int len)
4000 {
4001         struct bitstream bs;
4002         u64 look_ahead;
4003         u64 rl;
4004         u64 tmp;
4005         unsigned long s = c->bit_offset;
4006         unsigned long e;
4007         int toggle = dcbp_get_start(p);
4008         int have;
4009         int bits;
4010
4011         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4012
4013         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4014         if (bits < 0)
4015                 return -EIO;
4016
4017         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4018                 bits = vli_decode_bits(&rl, look_ahead);
4019                 if (bits <= 0)
4020                         return -EIO;
4021
4022                 if (toggle) {
4023                         e = s + rl -1;
4024                         if (e >= c->bm_bits) {
4025                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4026                                 return -EIO;
4027                         }
4028                         _drbd_bm_set_bits(mdev, s, e);
4029                 }
4030
4031                 if (have < bits) {
4032                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4033                                 have, bits, look_ahead,
4034                                 (unsigned int)(bs.cur.b - p->code),
4035                                 (unsigned int)bs.buf_len);
4036                         return -EIO;
4037                 }
4038                 look_ahead >>= bits;
4039                 have -= bits;
4040
4041                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4042                 if (bits < 0)
4043                         return -EIO;
4044                 look_ahead |= tmp << have;
4045                 have += bits;
4046         }
4047
4048         c->bit_offset = s;
4049         bm_xfer_ctx_bit_to_word_offset(c);
4050
4051         return (s != c->bm_bits);
4052 }
4053
4054 /**
4055  * decode_bitmap_c
4056  *
4057  * Return 0 when done, 1 when another iteration is needed, and a negative error
4058  * code upon failure.
4059  */
4060 static int
4061 decode_bitmap_c(struct drbd_conf *mdev,
4062                 struct p_compressed_bm *p,
4063                 struct bm_xfer_ctx *c,
4064                 unsigned int len)
4065 {
4066         if (dcbp_get_code(p) == RLE_VLI_Bits)
4067                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4068
4069         /* other variants had been implemented for evaluation,
4070          * but have been dropped as this one turned out to be "best"
4071          * during all our tests. */
4072
4073         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4074         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4075         return -EIO;
4076 }
4077
4078 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4079                 const char *direction, struct bm_xfer_ctx *c)
4080 {
4081         /* what would it take to transfer it "plaintext" */
4082         unsigned int header_size = drbd_header_size(mdev->tconn);
4083         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4084         unsigned int plain =
4085                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4086                 c->bm_words * sizeof(unsigned long);
4087         unsigned int total = c->bytes[0] + c->bytes[1];
4088         unsigned int r;
4089
4090         /* total can not be zero. but just in case: */
4091         if (total == 0)
4092                 return;
4093
4094         /* don't report if not compressed */
4095         if (total >= plain)
4096                 return;
4097
4098         /* total < plain. check for overflow, still */
4099         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4100                                     : (1000 * total / plain);
4101
4102         if (r > 1000)
4103                 r = 1000;
4104
4105         r = 1000 - r;
4106         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4107              "total %u; compression: %u.%u%%\n",
4108                         direction,
4109                         c->bytes[1], c->packets[1],
4110                         c->bytes[0], c->packets[0],
4111                         total, r/10, r % 10);
4112 }
4113
4114 /* Since we are processing the bitfield from lower addresses to higher,
4115    it does not matter if the process it in 32 bit chunks or 64 bit
4116    chunks as long as it is little endian. (Understand it as byte stream,
4117    beginning with the lowest byte...) If we would use big endian
4118    we would need to process it from the highest address to the lowest,
4119    in order to be agnostic to the 32 vs 64 bits issue.
4120
4121    returns 0 on failure, 1 if we successfully received it. */
4122 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4123 {
4124         struct drbd_conf *mdev;
4125         struct bm_xfer_ctx c;
4126         int err;
4127
4128         mdev = vnr_to_mdev(tconn, pi->vnr);
4129         if (!mdev)
4130                 return -EIO;
4131
4132         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4133         /* you are supposed to send additional out-of-sync information
4134          * if you actually set bits during this phase */
4135
4136         c = (struct bm_xfer_ctx) {
4137                 .bm_bits = drbd_bm_bits(mdev),
4138                 .bm_words = drbd_bm_words(mdev),
4139         };
4140
4141         for(;;) {
4142                 if (pi->cmd == P_BITMAP)
4143                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4144                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4145                         /* MAYBE: sanity check that we speak proto >= 90,
4146                          * and the feature is enabled! */
4147                         struct p_compressed_bm *p = pi->data;
4148
4149                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4150                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4151                                 err = -EIO;
4152                                 goto out;
4153                         }
4154                         if (pi->size <= sizeof(*p)) {
4155                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4156                                 err = -EIO;
4157                                 goto out;
4158                         }
4159                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4160                         if (err)
4161                                goto out;
4162                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4163                 } else {
4164                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4165                         err = -EIO;
4166                         goto out;
4167                 }
4168
4169                 c.packets[pi->cmd == P_BITMAP]++;
4170                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4171
4172                 if (err <= 0) {
4173                         if (err < 0)
4174                                 goto out;
4175                         break;
4176                 }
4177                 err = drbd_recv_header(mdev->tconn, pi);
4178                 if (err)
4179                         goto out;
4180         }
4181
4182         INFO_bm_xfer_stats(mdev, "receive", &c);
4183
4184         if (mdev->state.conn == C_WF_BITMAP_T) {
4185                 enum drbd_state_rv rv;
4186
4187                 err = drbd_send_bitmap(mdev);
4188                 if (err)
4189                         goto out;
4190                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4191                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4192                 D_ASSERT(rv == SS_SUCCESS);
4193         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4194                 /* admin may have requested C_DISCONNECTING,
4195                  * other threads may have noticed network errors */
4196                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4197                     drbd_conn_str(mdev->state.conn));
4198         }
4199         err = 0;
4200
4201  out:
4202         drbd_bm_unlock(mdev);
4203         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4204                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4205         return err;
4206 }
4207
4208 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4209 {
4210         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4211                  pi->cmd, pi->size);
4212
4213         return ignore_remaining_packet(tconn, pi);
4214 }
4215
4216 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4217 {
4218         /* Make sure we've acked all the TCP data associated
4219          * with the data requests being unplugged */
4220         drbd_tcp_quickack(tconn->data.socket);
4221
4222         return 0;
4223 }
4224
4225 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4226 {
4227         struct drbd_conf *mdev;
4228         struct p_block_desc *p = pi->data;
4229
4230         mdev = vnr_to_mdev(tconn, pi->vnr);
4231         if (!mdev)
4232                 return -EIO;
4233
4234         switch (mdev->state.conn) {
4235         case C_WF_SYNC_UUID:
4236         case C_WF_BITMAP_T:
4237         case C_BEHIND:
4238                         break;
4239         default:
4240                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4241                                 drbd_conn_str(mdev->state.conn));
4242         }
4243
4244         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4245
4246         return 0;
4247 }
4248
4249 struct data_cmd {
4250         int expect_payload;
4251         size_t pkt_size;
4252         int (*fn)(struct drbd_tconn *, struct packet_info *);
4253 };
4254
4255 static struct data_cmd drbd_cmd_handler[] = {
4256         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4257         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4258         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4259         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4260         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4261         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4262         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4263         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4264         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4265         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4266         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4267         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4268         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4269         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4270         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4271         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4272         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4273         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4274         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4275         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4276         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4277         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4278         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4279         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4280 };
4281
4282 static void drbdd(struct drbd_tconn *tconn)
4283 {
4284         struct packet_info pi;
4285         size_t shs; /* sub header size */
4286         int err;
4287
4288         while (get_t_state(&tconn->receiver) == RUNNING) {
4289                 struct data_cmd *cmd;
4290
4291                 drbd_thread_current_set_cpu(&tconn->receiver);
4292                 if (drbd_recv_header(tconn, &pi))
4293                         goto err_out;
4294
4295                 cmd = &drbd_cmd_handler[pi.cmd];
4296                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4297                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4298                                  cmdname(pi.cmd), pi.cmd);
4299                         goto err_out;
4300                 }
4301
4302                 shs = cmd->pkt_size;
4303                 if (pi.size > shs && !cmd->expect_payload) {
4304                         conn_err(tconn, "No payload expected %s l:%d\n",
4305                                  cmdname(pi.cmd), pi.size);
4306                         goto err_out;
4307                 }
4308
4309                 if (shs) {
4310                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4311                         if (err)
4312                                 goto err_out;
4313                         pi.size -= shs;
4314                 }
4315
4316                 err = cmd->fn(tconn, &pi);
4317                 if (err) {
4318                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4319                                  cmdname(pi.cmd), err, pi.size);
4320                         goto err_out;
4321                 }
4322         }
4323         return;
4324
4325     err_out:
4326         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4327 }
4328
4329 void conn_flush_workqueue(struct drbd_tconn *tconn)
4330 {
4331         struct drbd_wq_barrier barr;
4332
4333         barr.w.cb = w_prev_work_done;
4334         barr.w.tconn = tconn;
4335         init_completion(&barr.done);
4336         drbd_queue_work(&tconn->data.work, &barr.w);
4337         wait_for_completion(&barr.done);
4338 }
4339
4340 static void conn_disconnect(struct drbd_tconn *tconn)
4341 {
4342         struct drbd_conf *mdev;
4343         enum drbd_conns oc;
4344         int vnr;
4345
4346         if (tconn->cstate == C_STANDALONE)
4347                 return;
4348
4349         /* asender does not clean up anything. it must not interfere, either */
4350         drbd_thread_stop(&tconn->asender);
4351         drbd_free_sock(tconn);
4352
4353         rcu_read_lock();
4354         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4355                 kref_get(&mdev->kref);
4356                 rcu_read_unlock();
4357                 drbd_disconnected(mdev);
4358                 kref_put(&mdev->kref, &drbd_minor_destroy);
4359                 rcu_read_lock();
4360         }
4361         rcu_read_unlock();
4362
4363         if (!list_empty(&tconn->current_epoch->list))
4364                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4365         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4366         atomic_set(&tconn->current_epoch->epoch_size, 0);
4367
4368         conn_info(tconn, "Connection closed\n");
4369
4370         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4371                 conn_try_outdate_peer_async(tconn);
4372
4373         spin_lock_irq(&tconn->req_lock);
4374         oc = tconn->cstate;
4375         if (oc >= C_UNCONNECTED)
4376                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4377
4378         spin_unlock_irq(&tconn->req_lock);
4379
4380         if (oc == C_DISCONNECTING)
4381                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4382 }
4383
4384 static int drbd_disconnected(struct drbd_conf *mdev)
4385 {
4386         unsigned int i;
4387
4388         /* wait for current activity to cease. */
4389         spin_lock_irq(&mdev->tconn->req_lock);
4390         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4391         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4392         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4393         spin_unlock_irq(&mdev->tconn->req_lock);
4394
4395         /* We do not have data structures that would allow us to
4396          * get the rs_pending_cnt down to 0 again.
4397          *  * On C_SYNC_TARGET we do not have any data structures describing
4398          *    the pending RSDataRequest's we have sent.
4399          *  * On C_SYNC_SOURCE there is no data structure that tracks
4400          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4401          *  And no, it is not the sum of the reference counts in the
4402          *  resync_LRU. The resync_LRU tracks the whole operation including
4403          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4404          *  on the fly. */
4405         drbd_rs_cancel_all(mdev);
4406         mdev->rs_total = 0;
4407         mdev->rs_failed = 0;
4408         atomic_set(&mdev->rs_pending_cnt, 0);
4409         wake_up(&mdev->misc_wait);
4410
4411         del_timer_sync(&mdev->resync_timer);
4412         resync_timer_fn((unsigned long)mdev);
4413
4414         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4415          * w_make_resync_request etc. which may still be on the worker queue
4416          * to be "canceled" */
4417         drbd_flush_workqueue(mdev);
4418
4419         drbd_finish_peer_reqs(mdev);
4420
4421         kfree(mdev->p_uuid);
4422         mdev->p_uuid = NULL;
4423
4424         if (!drbd_suspended(mdev))
4425                 tl_clear(mdev->tconn);
4426
4427         drbd_md_sync(mdev);
4428
4429         /* serialize with bitmap writeout triggered by the state change,
4430          * if any. */
4431         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4432
4433         /* tcp_close and release of sendpage pages can be deferred.  I don't
4434          * want to use SO_LINGER, because apparently it can be deferred for
4435          * more than 20 seconds (longest time I checked).
4436          *
4437          * Actually we don't care for exactly when the network stack does its
4438          * put_page(), but release our reference on these pages right here.
4439          */
4440         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4441         if (i)
4442                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4443         i = atomic_read(&mdev->pp_in_use_by_net);
4444         if (i)
4445                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4446         i = atomic_read(&mdev->pp_in_use);
4447         if (i)
4448                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4449
4450         D_ASSERT(list_empty(&mdev->read_ee));
4451         D_ASSERT(list_empty(&mdev->active_ee));
4452         D_ASSERT(list_empty(&mdev->sync_ee));
4453         D_ASSERT(list_empty(&mdev->done_ee));
4454
4455         return 0;
4456 }
4457
4458 /*
4459  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4460  * we can agree on is stored in agreed_pro_version.
4461  *
4462  * feature flags and the reserved array should be enough room for future
4463  * enhancements of the handshake protocol, and possible plugins...
4464  *
4465  * for now, they are expected to be zero, but ignored.
4466  */
4467 static int drbd_send_features(struct drbd_tconn *tconn)
4468 {
4469         struct drbd_socket *sock;
4470         struct p_connection_features *p;
4471
4472         sock = &tconn->data;
4473         p = conn_prepare_command(tconn, sock);
4474         if (!p)
4475                 return -EIO;
4476         memset(p, 0, sizeof(*p));
4477         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4478         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4479         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4480 }
4481
4482 /*
4483  * return values:
4484  *   1 yes, we have a valid connection
4485  *   0 oops, did not work out, please try again
4486  *  -1 peer talks different language,
4487  *     no point in trying again, please go standalone.
4488  */
4489 static int drbd_do_features(struct drbd_tconn *tconn)
4490 {
4491         /* ASSERT current == tconn->receiver ... */
4492         struct p_connection_features *p;
4493         const int expect = sizeof(struct p_connection_features);
4494         struct packet_info pi;
4495         int err;
4496
4497         err = drbd_send_features(tconn);
4498         if (err)
4499                 return 0;
4500
4501         err = drbd_recv_header(tconn, &pi);
4502         if (err)
4503                 return 0;
4504
4505         if (pi.cmd != P_CONNECTION_FEATURES) {
4506                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4507                          cmdname(pi.cmd), pi.cmd);
4508                 return -1;
4509         }
4510
4511         if (pi.size != expect) {
4512                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4513                      expect, pi.size);
4514                 return -1;
4515         }
4516
4517         p = pi.data;
4518         err = drbd_recv_all_warn(tconn, p, expect);
4519         if (err)
4520                 return 0;
4521
4522         p->protocol_min = be32_to_cpu(p->protocol_min);
4523         p->protocol_max = be32_to_cpu(p->protocol_max);
4524         if (p->protocol_max == 0)
4525                 p->protocol_max = p->protocol_min;
4526
4527         if (PRO_VERSION_MAX < p->protocol_min ||
4528             PRO_VERSION_MIN > p->protocol_max)
4529                 goto incompat;
4530
4531         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4532
4533         conn_info(tconn, "Handshake successful: "
4534              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4535
4536         return 1;
4537
4538  incompat:
4539         conn_err(tconn, "incompatible DRBD dialects: "
4540             "I support %d-%d, peer supports %d-%d\n",
4541             PRO_VERSION_MIN, PRO_VERSION_MAX,
4542             p->protocol_min, p->protocol_max);
4543         return -1;
4544 }
4545
4546 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4547 static int drbd_do_auth(struct drbd_tconn *tconn)
4548 {
4549         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4550         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4551         return -1;
4552 }
4553 #else
4554 #define CHALLENGE_LEN 64
4555
4556 /* Return value:
4557         1 - auth succeeded,
4558         0 - failed, try again (network error),
4559         -1 - auth failed, don't try again.
4560 */
4561
4562 static int drbd_do_auth(struct drbd_tconn *tconn)
4563 {
4564         struct drbd_socket *sock;
4565         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4566         struct scatterlist sg;
4567         char *response = NULL;
4568         char *right_response = NULL;
4569         char *peers_ch = NULL;
4570         unsigned int key_len;
4571         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4572         unsigned int resp_size;
4573         struct hash_desc desc;
4574         struct packet_info pi;
4575         struct net_conf *nc;
4576         int err, rv;
4577
4578         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4579
4580         rcu_read_lock();
4581         nc = rcu_dereference(tconn->net_conf);
4582         key_len = strlen(nc->shared_secret);
4583         memcpy(secret, nc->shared_secret, key_len);
4584         rcu_read_unlock();
4585
4586         desc.tfm = tconn->cram_hmac_tfm;
4587         desc.flags = 0;
4588
4589         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4590         if (rv) {
4591                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4592                 rv = -1;
4593                 goto fail;
4594         }
4595
4596         get_random_bytes(my_challenge, CHALLENGE_LEN);
4597
4598         sock = &tconn->data;
4599         if (!conn_prepare_command(tconn, sock)) {
4600                 rv = 0;
4601                 goto fail;
4602         }
4603         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4604                                 my_challenge, CHALLENGE_LEN);
4605         if (!rv)
4606                 goto fail;
4607
4608         err = drbd_recv_header(tconn, &pi);
4609         if (err) {
4610                 rv = 0;
4611                 goto fail;
4612         }
4613
4614         if (pi.cmd != P_AUTH_CHALLENGE) {
4615                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4616                          cmdname(pi.cmd), pi.cmd);
4617                 rv = 0;
4618                 goto fail;
4619         }
4620
4621         if (pi.size > CHALLENGE_LEN * 2) {
4622                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4623                 rv = -1;
4624                 goto fail;
4625         }
4626
4627         peers_ch = kmalloc(pi.size, GFP_NOIO);
4628         if (peers_ch == NULL) {
4629                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4630                 rv = -1;
4631                 goto fail;
4632         }
4633
4634         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4635         if (err) {
4636                 rv = 0;
4637                 goto fail;
4638         }
4639
4640         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4641         response = kmalloc(resp_size, GFP_NOIO);
4642         if (response == NULL) {
4643                 conn_err(tconn, "kmalloc of response failed\n");
4644                 rv = -1;
4645                 goto fail;
4646         }
4647
4648         sg_init_table(&sg, 1);
4649         sg_set_buf(&sg, peers_ch, pi.size);
4650
4651         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4652         if (rv) {
4653                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4654                 rv = -1;
4655                 goto fail;
4656         }
4657
4658         if (!conn_prepare_command(tconn, sock)) {
4659                 rv = 0;
4660                 goto fail;
4661         }
4662         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4663                                 response, resp_size);
4664         if (!rv)
4665                 goto fail;
4666
4667         err = drbd_recv_header(tconn, &pi);
4668         if (err) {
4669                 rv = 0;
4670                 goto fail;
4671         }
4672
4673         if (pi.cmd != P_AUTH_RESPONSE) {
4674                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4675                          cmdname(pi.cmd), pi.cmd);
4676                 rv = 0;
4677                 goto fail;
4678         }
4679
4680         if (pi.size != resp_size) {
4681                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4682                 rv = 0;
4683                 goto fail;
4684         }
4685
4686         err = drbd_recv_all_warn(tconn, response , resp_size);
4687         if (err) {
4688                 rv = 0;
4689                 goto fail;
4690         }
4691
4692         right_response = kmalloc(resp_size, GFP_NOIO);
4693         if (right_response == NULL) {
4694                 conn_err(tconn, "kmalloc of right_response failed\n");
4695                 rv = -1;
4696                 goto fail;
4697         }
4698
4699         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4700
4701         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4702         if (rv) {
4703                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4704                 rv = -1;
4705                 goto fail;
4706         }
4707
4708         rv = !memcmp(response, right_response, resp_size);
4709
4710         if (rv)
4711                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4712                      resp_size);
4713         else
4714                 rv = -1;
4715
4716  fail:
4717         kfree(peers_ch);
4718         kfree(response);
4719         kfree(right_response);
4720
4721         return rv;
4722 }
4723 #endif
4724
4725 int drbdd_init(struct drbd_thread *thi)
4726 {
4727         struct drbd_tconn *tconn = thi->tconn;
4728         int h;
4729
4730         conn_info(tconn, "receiver (re)started\n");
4731
4732         do {
4733                 h = conn_connect(tconn);
4734                 if (h == 0) {
4735                         conn_disconnect(tconn);
4736                         schedule_timeout_interruptible(HZ);
4737                 }
4738                 if (h == -1) {
4739                         conn_warn(tconn, "Discarding network configuration.\n");
4740                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4741                 }
4742         } while (h == 0);
4743
4744         if (h > 0)
4745                 drbdd(tconn);
4746
4747         conn_disconnect(tconn);
4748
4749         conn_info(tconn, "receiver terminated\n");
4750         return 0;
4751 }
4752
4753 /* ********* acknowledge sender ******** */
4754
4755 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4756 {
4757         struct p_req_state_reply *p = pi->data;
4758         int retcode = be32_to_cpu(p->retcode);
4759
4760         if (retcode >= SS_SUCCESS) {
4761                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4762         } else {
4763                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4764                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4765                          drbd_set_st_err_str(retcode), retcode);
4766         }
4767         wake_up(&tconn->ping_wait);
4768
4769         return 0;
4770 }
4771
4772 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4773 {
4774         struct drbd_conf *mdev;
4775         struct p_req_state_reply *p = pi->data;
4776         int retcode = be32_to_cpu(p->retcode);
4777
4778         mdev = vnr_to_mdev(tconn, pi->vnr);
4779         if (!mdev)
4780                 return -EIO;
4781
4782         if (retcode >= SS_SUCCESS) {
4783                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4784         } else {
4785                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4786                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4787                         drbd_set_st_err_str(retcode), retcode);
4788         }
4789         wake_up(&mdev->state_wait);
4790
4791         return 0;
4792 }
4793
4794 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4795 {
4796         return drbd_send_ping_ack(tconn);
4797
4798 }
4799
4800 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4801 {
4802         /* restore idle timeout */
4803         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4804         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4805                 wake_up(&tconn->ping_wait);
4806
4807         return 0;
4808 }
4809
4810 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4811 {
4812         struct drbd_conf *mdev;
4813         struct p_block_ack *p = pi->data;
4814         sector_t sector = be64_to_cpu(p->sector);
4815         int blksize = be32_to_cpu(p->blksize);
4816
4817         mdev = vnr_to_mdev(tconn, pi->vnr);
4818         if (!mdev)
4819                 return -EIO;
4820
4821         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4822
4823         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4824
4825         if (get_ldev(mdev)) {
4826                 drbd_rs_complete_io(mdev, sector);
4827                 drbd_set_in_sync(mdev, sector, blksize);
4828                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4829                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4830                 put_ldev(mdev);
4831         }
4832         dec_rs_pending(mdev);
4833         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4834
4835         return 0;
4836 }
4837
4838 static int
4839 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4840                               struct rb_root *root, const char *func,
4841                               enum drbd_req_event what, bool missing_ok)
4842 {
4843         struct drbd_request *req;
4844         struct bio_and_error m;
4845
4846         spin_lock_irq(&mdev->tconn->req_lock);
4847         req = find_request(mdev, root, id, sector, missing_ok, func);
4848         if (unlikely(!req)) {
4849                 spin_unlock_irq(&mdev->tconn->req_lock);
4850                 return -EIO;
4851         }
4852         __req_mod(req, what, &m);
4853         spin_unlock_irq(&mdev->tconn->req_lock);
4854
4855         if (m.bio)
4856                 complete_master_bio(mdev, &m);
4857         return 0;
4858 }
4859
4860 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4861 {
4862         struct drbd_conf *mdev;
4863         struct p_block_ack *p = pi->data;
4864         sector_t sector = be64_to_cpu(p->sector);
4865         int blksize = be32_to_cpu(p->blksize);
4866         enum drbd_req_event what;
4867
4868         mdev = vnr_to_mdev(tconn, pi->vnr);
4869         if (!mdev)
4870                 return -EIO;
4871
4872         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4873
4874         if (p->block_id == ID_SYNCER) {
4875                 drbd_set_in_sync(mdev, sector, blksize);
4876                 dec_rs_pending(mdev);
4877                 return 0;
4878         }
4879         switch (pi->cmd) {
4880         case P_RS_WRITE_ACK:
4881                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4882                 break;
4883         case P_WRITE_ACK:
4884                 what = WRITE_ACKED_BY_PEER;
4885                 break;
4886         case P_RECV_ACK:
4887                 what = RECV_ACKED_BY_PEER;
4888                 break;
4889         case P_DISCARD_WRITE:
4890                 what = DISCARD_WRITE;
4891                 break;
4892         case P_RETRY_WRITE:
4893                 what = POSTPONE_WRITE;
4894                 break;
4895         default:
4896                 BUG();
4897         }
4898
4899         return validate_req_change_req_state(mdev, p->block_id, sector,
4900                                              &mdev->write_requests, __func__,
4901                                              what, false);
4902 }
4903
4904 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4905 {
4906         struct drbd_conf *mdev;
4907         struct p_block_ack *p = pi->data;
4908         sector_t sector = be64_to_cpu(p->sector);
4909         int size = be32_to_cpu(p->blksize);
4910         int err;
4911
4912         mdev = vnr_to_mdev(tconn, pi->vnr);
4913         if (!mdev)
4914                 return -EIO;
4915
4916         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4917
4918         if (p->block_id == ID_SYNCER) {
4919                 dec_rs_pending(mdev);
4920                 drbd_rs_failed_io(mdev, sector, size);
4921                 return 0;
4922         }
4923
4924         err = validate_req_change_req_state(mdev, p->block_id, sector,
4925                                             &mdev->write_requests, __func__,
4926                                             NEG_ACKED, true);
4927         if (err) {
4928                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4929                    The master bio might already be completed, therefore the
4930                    request is no longer in the collision hash. */
4931                 /* In Protocol B we might already have got a P_RECV_ACK
4932                    but then get a P_NEG_ACK afterwards. */
4933                 drbd_set_out_of_sync(mdev, sector, size);
4934         }
4935         return 0;
4936 }
4937
4938 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4939 {
4940         struct drbd_conf *mdev;
4941         struct p_block_ack *p = pi->data;
4942         sector_t sector = be64_to_cpu(p->sector);
4943
4944         mdev = vnr_to_mdev(tconn, pi->vnr);
4945         if (!mdev)
4946                 return -EIO;
4947
4948         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4949
4950         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4951             (unsigned long long)sector, be32_to_cpu(p->blksize));
4952
4953         return validate_req_change_req_state(mdev, p->block_id, sector,
4954                                              &mdev->read_requests, __func__,
4955                                              NEG_ACKED, false);
4956 }
4957
4958 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4959 {
4960         struct drbd_conf *mdev;
4961         sector_t sector;
4962         int size;
4963         struct p_block_ack *p = pi->data;
4964
4965         mdev = vnr_to_mdev(tconn, pi->vnr);
4966         if (!mdev)
4967                 return -EIO;
4968
4969         sector = be64_to_cpu(p->sector);
4970         size = be32_to_cpu(p->blksize);
4971
4972         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4973
4974         dec_rs_pending(mdev);
4975
4976         if (get_ldev_if_state(mdev, D_FAILED)) {
4977                 drbd_rs_complete_io(mdev, sector);
4978                 switch (pi->cmd) {
4979                 case P_NEG_RS_DREPLY:
4980                         drbd_rs_failed_io(mdev, sector, size);
4981                 case P_RS_CANCEL:
4982                         break;
4983                 default:
4984                         BUG();
4985                 }
4986                 put_ldev(mdev);
4987         }
4988
4989         return 0;
4990 }
4991
4992 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4993 {
4994         struct drbd_conf *mdev;
4995         struct p_barrier_ack *p = pi->data;
4996
4997         mdev = vnr_to_mdev(tconn, pi->vnr);
4998         if (!mdev)
4999                 return -EIO;
5000
5001         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
5002
5003         if (mdev->state.conn == C_AHEAD &&
5004             atomic_read(&mdev->ap_in_flight) == 0 &&
5005             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5006                 mdev->start_resync_timer.expires = jiffies + HZ;
5007                 add_timer(&mdev->start_resync_timer);
5008         }
5009
5010         return 0;
5011 }
5012
5013 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5014 {
5015         struct drbd_conf *mdev;
5016         struct p_block_ack *p = pi->data;
5017         struct drbd_work *w;
5018         sector_t sector;
5019         int size;
5020
5021         mdev = vnr_to_mdev(tconn, pi->vnr);
5022         if (!mdev)
5023                 return -EIO;
5024
5025         sector = be64_to_cpu(p->sector);
5026         size = be32_to_cpu(p->blksize);
5027
5028         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5029
5030         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5031                 drbd_ov_out_of_sync_found(mdev, sector, size);
5032         else
5033                 ov_out_of_sync_print(mdev);
5034
5035         if (!get_ldev(mdev))
5036                 return 0;
5037
5038         drbd_rs_complete_io(mdev, sector);
5039         dec_rs_pending(mdev);
5040
5041         --mdev->ov_left;
5042
5043         /* let's advance progress step marks only for every other megabyte */
5044         if ((mdev->ov_left & 0x200) == 0x200)
5045                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5046
5047         if (mdev->ov_left == 0) {
5048                 w = kmalloc(sizeof(*w), GFP_NOIO);
5049                 if (w) {
5050                         w->cb = w_ov_finished;
5051                         w->mdev = mdev;
5052                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5053                 } else {
5054                         dev_err(DEV, "kmalloc(w) failed.");
5055                         ov_out_of_sync_print(mdev);
5056                         drbd_resync_finished(mdev);
5057                 }
5058         }
5059         put_ldev(mdev);
5060         return 0;
5061 }
5062
5063 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5064 {
5065         return 0;
5066 }
5067
5068 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5069 {
5070         struct drbd_conf *mdev;
5071         int vnr, not_empty = 0;
5072
5073         do {
5074                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5075                 flush_signals(current);
5076
5077                 rcu_read_lock();
5078                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5079                         kref_get(&mdev->kref);
5080                         rcu_read_unlock();
5081                         if (drbd_finish_peer_reqs(mdev)) {
5082                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5083                                 return 1;
5084                         }
5085                         kref_put(&mdev->kref, &drbd_minor_destroy);
5086                         rcu_read_lock();
5087                 }
5088                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5089
5090                 spin_lock_irq(&tconn->req_lock);
5091                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5092                         not_empty = !list_empty(&mdev->done_ee);
5093                         if (not_empty)
5094                                 break;
5095                 }
5096                 spin_unlock_irq(&tconn->req_lock);
5097                 rcu_read_unlock();
5098         } while (not_empty);
5099
5100         return 0;
5101 }
5102
5103 struct asender_cmd {
5104         size_t pkt_size;
5105         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5106 };
5107
5108 static struct asender_cmd asender_tbl[] = {
5109         [P_PING]            = { 0, got_Ping },
5110         [P_PING_ACK]        = { 0, got_PingAck },
5111         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5112         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5113         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5114         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5115         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5116         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5117         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5118         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5119         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5120         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5121         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5122         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5123         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5124         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5125         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5126 };
5127
5128 int drbd_asender(struct drbd_thread *thi)
5129 {
5130         struct drbd_tconn *tconn = thi->tconn;
5131         struct asender_cmd *cmd = NULL;
5132         struct packet_info pi;
5133         int rv;
5134         void *buf    = tconn->meta.rbuf;
5135         int received = 0;
5136         unsigned int header_size = drbd_header_size(tconn);
5137         int expect   = header_size;
5138         bool ping_timeout_active = false;
5139         struct net_conf *nc;
5140         int ping_timeo, tcp_cork, ping_int;
5141
5142         current->policy = SCHED_RR;  /* Make this a realtime task! */
5143         current->rt_priority = 2;    /* more important than all other tasks */
5144
5145         while (get_t_state(thi) == RUNNING) {
5146                 drbd_thread_current_set_cpu(thi);
5147
5148                 rcu_read_lock();
5149                 nc = rcu_dereference(tconn->net_conf);
5150                 ping_timeo = nc->ping_timeo;
5151                 tcp_cork = nc->tcp_cork;
5152                 ping_int = nc->ping_int;
5153                 rcu_read_unlock();
5154
5155                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5156                         if (drbd_send_ping(tconn)) {
5157                                 conn_err(tconn, "drbd_send_ping has failed\n");
5158                                 goto reconnect;
5159                         }
5160                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5161                         ping_timeout_active = true;
5162                 }
5163
5164                 /* TODO: conditionally cork; it may hurt latency if we cork without
5165                    much to send */
5166                 if (tcp_cork)
5167                         drbd_tcp_cork(tconn->meta.socket);
5168                 if (tconn_finish_peer_reqs(tconn)) {
5169                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5170                         goto reconnect;
5171                 }
5172                 /* but unconditionally uncork unless disabled */
5173                 if (tcp_cork)
5174                         drbd_tcp_uncork(tconn->meta.socket);
5175
5176                 /* short circuit, recv_msg would return EINTR anyways. */
5177                 if (signal_pending(current))
5178                         continue;
5179
5180                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5181                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5182
5183                 flush_signals(current);
5184
5185                 /* Note:
5186                  * -EINTR        (on meta) we got a signal
5187                  * -EAGAIN       (on meta) rcvtimeo expired
5188                  * -ECONNRESET   other side closed the connection
5189                  * -ERESTARTSYS  (on data) we got a signal
5190                  * rv <  0       other than above: unexpected error!
5191                  * rv == expected: full header or command
5192                  * rv <  expected: "woken" by signal during receive
5193                  * rv == 0       : "connection shut down by peer"
5194                  */
5195                 if (likely(rv > 0)) {
5196                         received += rv;
5197                         buf      += rv;
5198                 } else if (rv == 0) {
5199                         conn_err(tconn, "meta connection shut down by peer.\n");
5200                         goto reconnect;
5201                 } else if (rv == -EAGAIN) {
5202                         /* If the data socket received something meanwhile,
5203                          * that is good enough: peer is still alive. */
5204                         if (time_after(tconn->last_received,
5205                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5206                                 continue;
5207                         if (ping_timeout_active) {
5208                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5209                                 goto reconnect;
5210                         }
5211                         set_bit(SEND_PING, &tconn->flags);
5212                         continue;
5213                 } else if (rv == -EINTR) {
5214                         continue;
5215                 } else {
5216                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5217                         goto reconnect;
5218                 }
5219
5220                 if (received == expect && cmd == NULL) {
5221                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5222                                 goto reconnect;
5223                         cmd = &asender_tbl[pi.cmd];
5224                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5225                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5226                                          cmdname(pi.cmd), pi.cmd);
5227                                 goto disconnect;
5228                         }
5229                         expect = header_size + cmd->pkt_size;
5230                         if (pi.size != expect - header_size) {
5231                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5232                                         pi.cmd, pi.size);
5233                                 goto reconnect;
5234                         }
5235                 }
5236                 if (received == expect) {
5237                         bool err;
5238
5239                         err = cmd->fn(tconn, &pi);
5240                         if (err) {
5241                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5242                                 goto reconnect;
5243                         }
5244
5245                         tconn->last_received = jiffies;
5246
5247                         if (cmd == &asender_tbl[P_PING_ACK]) {
5248                                 /* restore idle timeout */
5249                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5250                                 ping_timeout_active = false;
5251                         }
5252
5253                         buf      = tconn->meta.rbuf;
5254                         received = 0;
5255                         expect   = header_size;
5256                         cmd      = NULL;
5257                 }
5258         }
5259
5260         if (0) {
5261 reconnect:
5262                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5263         }
5264         if (0) {
5265 disconnect:
5266                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5267         }
5268         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5269
5270         conn_info(tconn, "asender terminated\n");
5271
5272         return 0;
5273 }