drbd: Allocation of int_dig_in and int_dig_vv was missing
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         connect_int = nc->connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         connect_int = nc->connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(struct drbd_conf *mdev)
815 {
816         int err;
817
818         atomic_set(&mdev->packet_seq, 0);
819         mdev->peer_seq = 0;
820
821         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
822                 &mdev->tconn->cstate_mutex :
823                 &mdev->own_state_mutex;
824
825         err = drbd_send_sync_param(mdev);
826         if (!err)
827                 err = drbd_send_sizes(mdev, 0, 0);
828         if (!err)
829                 err = drbd_send_uuids(mdev);
830         if (!err)
831                 err = drbd_send_state(mdev);
832         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
833         clear_bit(RESIZE_PENDING, &mdev->flags);
834         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
835         return err;
836 }
837
838 /*
839  * return values:
840  *   1 yes, we have a valid connection
841  *   0 oops, did not work out, please try again
842  *  -1 peer talks different language,
843  *     no point in trying again, please go standalone.
844  *  -2 We do not have a network config...
845  */
846 static int conn_connect(struct drbd_tconn *tconn)
847 {
848         struct socket *sock, *msock;
849         struct drbd_conf *mdev;
850         struct net_conf *nc;
851         int vnr, timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in conn_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         rcu_read_lock();
1005         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1006                 kref_get(&mdev->kref);
1007                 rcu_read_unlock();
1008                 drbd_connected(mdev);
1009                 kref_put(&mdev->kref, &drbd_minor_destroy);
1010                 rcu_read_lock();
1011         }
1012         rcu_read_unlock();
1013
1014         return h;
1015
1016 out_release_sockets:
1017         if (tconn->data.socket) {
1018                 sock_release(tconn->data.socket);
1019                 tconn->data.socket = NULL;
1020         }
1021         if (tconn->meta.socket) {
1022                 sock_release(tconn->meta.socket);
1023                 tconn->meta.socket = NULL;
1024         }
1025         return -1;
1026 }
1027
1028 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1029 {
1030         unsigned int header_size = drbd_header_size(tconn);
1031
1032         if (header_size == sizeof(struct p_header100) &&
1033             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1034                 struct p_header100 *h = header;
1035                 if (h->pad != 0) {
1036                         conn_err(tconn, "Header padding is not zero\n");
1037                         return -EINVAL;
1038                 }
1039                 pi->vnr = be16_to_cpu(h->volume);
1040                 pi->cmd = be16_to_cpu(h->command);
1041                 pi->size = be32_to_cpu(h->length);
1042         } else if (header_size == sizeof(struct p_header95) &&
1043                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1044                 struct p_header95 *h = header;
1045                 pi->cmd = be16_to_cpu(h->command);
1046                 pi->size = be32_to_cpu(h->length);
1047                 pi->vnr = 0;
1048         } else if (header_size == sizeof(struct p_header80) &&
1049                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1050                 struct p_header80 *h = header;
1051                 pi->cmd = be16_to_cpu(h->command);
1052                 pi->size = be16_to_cpu(h->length);
1053                 pi->vnr = 0;
1054         } else {
1055                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1056                          be32_to_cpu(*(__be32 *)header),
1057                          tconn->agreed_pro_version);
1058                 return -EINVAL;
1059         }
1060         pi->data = header + header_size;
1061         return 0;
1062 }
1063
1064 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1065 {
1066         void *buffer = tconn->data.rbuf;
1067         int err;
1068
1069         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1070         if (err)
1071                 return err;
1072
1073         err = decode_header(tconn, buffer, pi);
1074         tconn->last_received = jiffies;
1075
1076         return err;
1077 }
1078
1079 static void drbd_flush(struct drbd_conf *mdev)
1080 {
1081         int rv;
1082
1083         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1084                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1085                                         NULL);
1086                 if (rv) {
1087                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1088                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1089                          * don't try again for ANY return value != 0
1090                          * if (rv == -EOPNOTSUPP) */
1091                         drbd_bump_write_ordering(mdev, WO_drain_io);
1092                 }
1093                 put_ldev(mdev);
1094         }
1095 }
1096
1097 /**
1098  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1099  * @mdev:       DRBD device.
1100  * @epoch:      Epoch object.
1101  * @ev:         Epoch event.
1102  */
1103 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1104                                                struct drbd_epoch *epoch,
1105                                                enum epoch_event ev)
1106 {
1107         int epoch_size;
1108         struct drbd_epoch *next_epoch;
1109         enum finish_epoch rv = FE_STILL_LIVE;
1110
1111         spin_lock(&mdev->epoch_lock);
1112         do {
1113                 next_epoch = NULL;
1114
1115                 epoch_size = atomic_read(&epoch->epoch_size);
1116
1117                 switch (ev & ~EV_CLEANUP) {
1118                 case EV_PUT:
1119                         atomic_dec(&epoch->active);
1120                         break;
1121                 case EV_GOT_BARRIER_NR:
1122                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1123                         break;
1124                 case EV_BECAME_LAST:
1125                         /* nothing to do*/
1126                         break;
1127                 }
1128
1129                 if (epoch_size != 0 &&
1130                     atomic_read(&epoch->active) == 0 &&
1131                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1132                         if (!(ev & EV_CLEANUP)) {
1133                                 spin_unlock(&mdev->epoch_lock);
1134                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1135                                 spin_lock(&mdev->epoch_lock);
1136                         }
1137                         dec_unacked(mdev);
1138
1139                         if (mdev->current_epoch != epoch) {
1140                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1141                                 list_del(&epoch->list);
1142                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1143                                 mdev->epochs--;
1144                                 kfree(epoch);
1145
1146                                 if (rv == FE_STILL_LIVE)
1147                                         rv = FE_DESTROYED;
1148                         } else {
1149                                 epoch->flags = 0;
1150                                 atomic_set(&epoch->epoch_size, 0);
1151                                 /* atomic_set(&epoch->active, 0); is already zero */
1152                                 if (rv == FE_STILL_LIVE)
1153                                         rv = FE_RECYCLED;
1154                                 wake_up(&mdev->ee_wait);
1155                         }
1156                 }
1157
1158                 if (!next_epoch)
1159                         break;
1160
1161                 epoch = next_epoch;
1162         } while (1);
1163
1164         spin_unlock(&mdev->epoch_lock);
1165
1166         return rv;
1167 }
1168
1169 /**
1170  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1171  * @mdev:       DRBD device.
1172  * @wo:         Write ordering method to try.
1173  */
1174 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1175 {
1176         struct disk_conf *dc;
1177         enum write_ordering_e pwo;
1178         static char *write_ordering_str[] = {
1179                 [WO_none] = "none",
1180                 [WO_drain_io] = "drain",
1181                 [WO_bdev_flush] = "flush",
1182         };
1183
1184         pwo = mdev->write_ordering;
1185         wo = min(pwo, wo);
1186         rcu_read_lock();
1187         dc = rcu_dereference(mdev->ldev->disk_conf);
1188
1189         if (wo == WO_bdev_flush && !dc->disk_flushes)
1190                 wo = WO_drain_io;
1191         if (wo == WO_drain_io && !dc->disk_drain)
1192                 wo = WO_none;
1193         rcu_read_unlock();
1194         mdev->write_ordering = wo;
1195         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1196                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1197 }
1198
1199 /**
1200  * drbd_submit_peer_request()
1201  * @mdev:       DRBD device.
1202  * @peer_req:   peer request
1203  * @rw:         flag field, see bio->bi_rw
1204  *
1205  * May spread the pages to multiple bios,
1206  * depending on bio_add_page restrictions.
1207  *
1208  * Returns 0 if all bios have been submitted,
1209  * -ENOMEM if we could not allocate enough bios,
1210  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1211  *  single page to an empty bio (which should never happen and likely indicates
1212  *  that the lower level IO stack is in some way broken). This has been observed
1213  *  on certain Xen deployments.
1214  */
1215 /* TODO allocate from our own bio_set. */
1216 int drbd_submit_peer_request(struct drbd_conf *mdev,
1217                              struct drbd_peer_request *peer_req,
1218                              const unsigned rw, const int fault_type)
1219 {
1220         struct bio *bios = NULL;
1221         struct bio *bio;
1222         struct page *page = peer_req->pages;
1223         sector_t sector = peer_req->i.sector;
1224         unsigned ds = peer_req->i.size;
1225         unsigned n_bios = 0;
1226         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1227         int err = -ENOMEM;
1228
1229         /* In most cases, we will only need one bio.  But in case the lower
1230          * level restrictions happen to be different at this offset on this
1231          * side than those of the sending peer, we may need to submit the
1232          * request in more than one bio.
1233          *
1234          * Plain bio_alloc is good enough here, this is no DRBD internally
1235          * generated bio, but a bio allocated on behalf of the peer.
1236          */
1237 next_bio:
1238         bio = bio_alloc(GFP_NOIO, nr_pages);
1239         if (!bio) {
1240                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1241                 goto fail;
1242         }
1243         /* > peer_req->i.sector, unless this is the first bio */
1244         bio->bi_sector = sector;
1245         bio->bi_bdev = mdev->ldev->backing_bdev;
1246         bio->bi_rw = rw;
1247         bio->bi_private = peer_req;
1248         bio->bi_end_io = drbd_peer_request_endio;
1249
1250         bio->bi_next = bios;
1251         bios = bio;
1252         ++n_bios;
1253
1254         page_chain_for_each(page) {
1255                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1256                 if (!bio_add_page(bio, page, len, 0)) {
1257                         /* A single page must always be possible!
1258                          * But in case it fails anyways,
1259                          * we deal with it, and complain (below). */
1260                         if (bio->bi_vcnt == 0) {
1261                                 dev_err(DEV,
1262                                         "bio_add_page failed for len=%u, "
1263                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1264                                         len, (unsigned long long)bio->bi_sector);
1265                                 err = -ENOSPC;
1266                                 goto fail;
1267                         }
1268                         goto next_bio;
1269                 }
1270                 ds -= len;
1271                 sector += len >> 9;
1272                 --nr_pages;
1273         }
1274         D_ASSERT(page == NULL);
1275         D_ASSERT(ds == 0);
1276
1277         atomic_set(&peer_req->pending_bios, n_bios);
1278         do {
1279                 bio = bios;
1280                 bios = bios->bi_next;
1281                 bio->bi_next = NULL;
1282
1283                 drbd_generic_make_request(mdev, fault_type, bio);
1284         } while (bios);
1285         return 0;
1286
1287 fail:
1288         while (bios) {
1289                 bio = bios;
1290                 bios = bios->bi_next;
1291                 bio_put(bio);
1292         }
1293         return err;
1294 }
1295
1296 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1297                                              struct drbd_peer_request *peer_req)
1298 {
1299         struct drbd_interval *i = &peer_req->i;
1300
1301         drbd_remove_interval(&mdev->write_requests, i);
1302         drbd_clear_interval(i);
1303
1304         /* Wake up any processes waiting for this peer request to complete.  */
1305         if (i->waiting)
1306                 wake_up(&mdev->misc_wait);
1307 }
1308
1309 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1310 {
1311         struct drbd_conf *mdev;
1312         int rv;
1313         struct p_barrier *p = pi->data;
1314         struct drbd_epoch *epoch;
1315
1316         mdev = vnr_to_mdev(tconn, pi->vnr);
1317         if (!mdev)
1318                 return -EIO;
1319
1320         inc_unacked(mdev);
1321
1322         mdev->current_epoch->barrier_nr = p->barrier;
1323         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1324
1325         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1326          * the activity log, which means it would not be resynced in case the
1327          * R_PRIMARY crashes now.
1328          * Therefore we must send the barrier_ack after the barrier request was
1329          * completed. */
1330         switch (mdev->write_ordering) {
1331         case WO_none:
1332                 if (rv == FE_RECYCLED)
1333                         return 0;
1334
1335                 /* receiver context, in the writeout path of the other node.
1336                  * avoid potential distributed deadlock */
1337                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1338                 if (epoch)
1339                         break;
1340                 else
1341                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1342                         /* Fall through */
1343
1344         case WO_bdev_flush:
1345         case WO_drain_io:
1346                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1347                 drbd_flush(mdev);
1348
1349                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1350                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1351                         if (epoch)
1352                                 break;
1353                 }
1354
1355                 epoch = mdev->current_epoch;
1356                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1357
1358                 D_ASSERT(atomic_read(&epoch->active) == 0);
1359                 D_ASSERT(epoch->flags == 0);
1360
1361                 return 0;
1362         default:
1363                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1364                 return -EIO;
1365         }
1366
1367         epoch->flags = 0;
1368         atomic_set(&epoch->epoch_size, 0);
1369         atomic_set(&epoch->active, 0);
1370
1371         spin_lock(&mdev->epoch_lock);
1372         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1373                 list_add(&epoch->list, &mdev->current_epoch->list);
1374                 mdev->current_epoch = epoch;
1375                 mdev->epochs++;
1376         } else {
1377                 /* The current_epoch got recycled while we allocated this one... */
1378                 kfree(epoch);
1379         }
1380         spin_unlock(&mdev->epoch_lock);
1381
1382         return 0;
1383 }
1384
1385 /* used from receive_RSDataReply (recv_resync_read)
1386  * and from receive_Data */
1387 static struct drbd_peer_request *
1388 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1389               int data_size) __must_hold(local)
1390 {
1391         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1392         struct drbd_peer_request *peer_req;
1393         struct page *page;
1394         int dgs, ds, err;
1395         void *dig_in = mdev->tconn->int_dig_in;
1396         void *dig_vv = mdev->tconn->int_dig_vv;
1397         unsigned long *data;
1398
1399         dgs = 0;
1400         if (mdev->tconn->peer_integrity_tfm) {
1401                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1402                 /*
1403                  * FIXME: Receive the incoming digest into the receive buffer
1404                  *        here, together with its struct p_data?
1405                  */
1406                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1407                 if (err)
1408                         return NULL;
1409                 data_size -= dgs;
1410         }
1411
1412         if (!expect(data_size != 0))
1413                 return NULL;
1414         if (!expect(IS_ALIGNED(data_size, 512)))
1415                 return NULL;
1416         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1417                 return NULL;
1418
1419         /* even though we trust out peer,
1420          * we sometimes have to double check. */
1421         if (sector + (data_size>>9) > capacity) {
1422                 dev_err(DEV, "request from peer beyond end of local disk: "
1423                         "capacity: %llus < sector: %llus + size: %u\n",
1424                         (unsigned long long)capacity,
1425                         (unsigned long long)sector, data_size);
1426                 return NULL;
1427         }
1428
1429         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1430          * "criss-cross" setup, that might cause write-out on some other DRBD,
1431          * which in turn might block on the other node at this very place.  */
1432         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1433         if (!peer_req)
1434                 return NULL;
1435
1436         ds = data_size;
1437         page = peer_req->pages;
1438         page_chain_for_each(page) {
1439                 unsigned len = min_t(int, ds, PAGE_SIZE);
1440                 data = kmap(page);
1441                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1442                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1443                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1444                         data[0] = data[0] ^ (unsigned long)-1;
1445                 }
1446                 kunmap(page);
1447                 if (err) {
1448                         drbd_free_peer_req(mdev, peer_req);
1449                         return NULL;
1450                 }
1451                 ds -= len;
1452         }
1453
1454         if (dgs) {
1455                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1456                 if (memcmp(dig_in, dig_vv, dgs)) {
1457                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1458                                 (unsigned long long)sector, data_size);
1459                         drbd_free_peer_req(mdev, peer_req);
1460                         return NULL;
1461                 }
1462         }
1463         mdev->recv_cnt += data_size>>9;
1464         return peer_req;
1465 }
1466
1467 /* drbd_drain_block() just takes a data block
1468  * out of the socket input buffer, and discards it.
1469  */
1470 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1471 {
1472         struct page *page;
1473         int err = 0;
1474         void *data;
1475
1476         if (!data_size)
1477                 return 0;
1478
1479         page = drbd_alloc_pages(mdev, 1, 1);
1480
1481         data = kmap(page);
1482         while (data_size) {
1483                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1484
1485                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1486                 if (err)
1487                         break;
1488                 data_size -= len;
1489         }
1490         kunmap(page);
1491         drbd_free_pages(mdev, page, 0);
1492         return err;
1493 }
1494
1495 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1496                            sector_t sector, int data_size)
1497 {
1498         struct bio_vec *bvec;
1499         struct bio *bio;
1500         int dgs, err, i, expect;
1501         void *dig_in = mdev->tconn->int_dig_in;
1502         void *dig_vv = mdev->tconn->int_dig_vv;
1503
1504         dgs = 0;
1505         if (mdev->tconn->peer_integrity_tfm) {
1506                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1507                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1508                 if (err)
1509                         return err;
1510                 data_size -= dgs;
1511         }
1512
1513         /* optimistically update recv_cnt.  if receiving fails below,
1514          * we disconnect anyways, and counters will be reset. */
1515         mdev->recv_cnt += data_size>>9;
1516
1517         bio = req->master_bio;
1518         D_ASSERT(sector == bio->bi_sector);
1519
1520         bio_for_each_segment(bvec, bio, i) {
1521                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1522                 expect = min_t(int, data_size, bvec->bv_len);
1523                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1524                 kunmap(bvec->bv_page);
1525                 if (err)
1526                         return err;
1527                 data_size -= expect;
1528         }
1529
1530         if (dgs) {
1531                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1532                 if (memcmp(dig_in, dig_vv, dgs)) {
1533                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1534                         return -EINVAL;
1535                 }
1536         }
1537
1538         D_ASSERT(data_size == 0);
1539         return 0;
1540 }
1541
1542 /*
1543  * e_end_resync_block() is called in asender context via
1544  * drbd_finish_peer_reqs().
1545  */
1546 static int e_end_resync_block(struct drbd_work *w, int unused)
1547 {
1548         struct drbd_peer_request *peer_req =
1549                 container_of(w, struct drbd_peer_request, w);
1550         struct drbd_conf *mdev = w->mdev;
1551         sector_t sector = peer_req->i.sector;
1552         int err;
1553
1554         D_ASSERT(drbd_interval_empty(&peer_req->i));
1555
1556         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1557                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1558                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1559         } else {
1560                 /* Record failure to sync */
1561                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1562
1563                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1564         }
1565         dec_unacked(mdev);
1566
1567         return err;
1568 }
1569
1570 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1571 {
1572         struct drbd_peer_request *peer_req;
1573
1574         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1575         if (!peer_req)
1576                 goto fail;
1577
1578         dec_rs_pending(mdev);
1579
1580         inc_unacked(mdev);
1581         /* corresponding dec_unacked() in e_end_resync_block()
1582          * respective _drbd_clear_done_ee */
1583
1584         peer_req->w.cb = e_end_resync_block;
1585
1586         spin_lock_irq(&mdev->tconn->req_lock);
1587         list_add(&peer_req->w.list, &mdev->sync_ee);
1588         spin_unlock_irq(&mdev->tconn->req_lock);
1589
1590         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1591         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1592                 return 0;
1593
1594         /* don't care for the reason here */
1595         dev_err(DEV, "submit failed, triggering re-connect\n");
1596         spin_lock_irq(&mdev->tconn->req_lock);
1597         list_del(&peer_req->w.list);
1598         spin_unlock_irq(&mdev->tconn->req_lock);
1599
1600         drbd_free_peer_req(mdev, peer_req);
1601 fail:
1602         put_ldev(mdev);
1603         return -EIO;
1604 }
1605
1606 static struct drbd_request *
1607 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1608              sector_t sector, bool missing_ok, const char *func)
1609 {
1610         struct drbd_request *req;
1611
1612         /* Request object according to our peer */
1613         req = (struct drbd_request *)(unsigned long)id;
1614         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1615                 return req;
1616         if (!missing_ok) {
1617                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1618                         (unsigned long)id, (unsigned long long)sector);
1619         }
1620         return NULL;
1621 }
1622
1623 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1624 {
1625         struct drbd_conf *mdev;
1626         struct drbd_request *req;
1627         sector_t sector;
1628         int err;
1629         struct p_data *p = pi->data;
1630
1631         mdev = vnr_to_mdev(tconn, pi->vnr);
1632         if (!mdev)
1633                 return -EIO;
1634
1635         sector = be64_to_cpu(p->sector);
1636
1637         spin_lock_irq(&mdev->tconn->req_lock);
1638         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1639         spin_unlock_irq(&mdev->tconn->req_lock);
1640         if (unlikely(!req))
1641                 return -EIO;
1642
1643         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1644          * special casing it there for the various failure cases.
1645          * still no race with drbd_fail_pending_reads */
1646         err = recv_dless_read(mdev, req, sector, pi->size);
1647         if (!err)
1648                 req_mod(req, DATA_RECEIVED);
1649         /* else: nothing. handled from drbd_disconnect...
1650          * I don't think we may complete this just yet
1651          * in case we are "on-disconnect: freeze" */
1652
1653         return err;
1654 }
1655
1656 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1657 {
1658         struct drbd_conf *mdev;
1659         sector_t sector;
1660         int err;
1661         struct p_data *p = pi->data;
1662
1663         mdev = vnr_to_mdev(tconn, pi->vnr);
1664         if (!mdev)
1665                 return -EIO;
1666
1667         sector = be64_to_cpu(p->sector);
1668         D_ASSERT(p->block_id == ID_SYNCER);
1669
1670         if (get_ldev(mdev)) {
1671                 /* data is submitted to disk within recv_resync_read.
1672                  * corresponding put_ldev done below on error,
1673                  * or in drbd_peer_request_endio. */
1674                 err = recv_resync_read(mdev, sector, pi->size);
1675         } else {
1676                 if (__ratelimit(&drbd_ratelimit_state))
1677                         dev_err(DEV, "Can not write resync data to local disk.\n");
1678
1679                 err = drbd_drain_block(mdev, pi->size);
1680
1681                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1682         }
1683
1684         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1685
1686         return err;
1687 }
1688
1689 static int w_restart_write(struct drbd_work *w, int cancel)
1690 {
1691         struct drbd_request *req = container_of(w, struct drbd_request, w);
1692         struct drbd_conf *mdev = w->mdev;
1693         struct bio *bio;
1694         unsigned long start_time;
1695         unsigned long flags;
1696
1697         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1698         if (!expect(req->rq_state & RQ_POSTPONED)) {
1699                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1700                 return -EIO;
1701         }
1702         bio = req->master_bio;
1703         start_time = req->start_time;
1704         /* Postponed requests will not have their master_bio completed!  */
1705         __req_mod(req, DISCARD_WRITE, NULL);
1706         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1707
1708         while (__drbd_make_request(mdev, bio, start_time))
1709                 /* retry */ ;
1710         return 0;
1711 }
1712
1713 static void restart_conflicting_writes(struct drbd_conf *mdev,
1714                                        sector_t sector, int size)
1715 {
1716         struct drbd_interval *i;
1717         struct drbd_request *req;
1718
1719         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1720                 if (!i->local)
1721                         continue;
1722                 req = container_of(i, struct drbd_request, i);
1723                 if (req->rq_state & RQ_LOCAL_PENDING ||
1724                     !(req->rq_state & RQ_POSTPONED))
1725                         continue;
1726                 if (expect(list_empty(&req->w.list))) {
1727                         req->w.mdev = mdev;
1728                         req->w.cb = w_restart_write;
1729                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1730                 }
1731         }
1732 }
1733
1734 /*
1735  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1736  */
1737 static int e_end_block(struct drbd_work *w, int cancel)
1738 {
1739         struct drbd_peer_request *peer_req =
1740                 container_of(w, struct drbd_peer_request, w);
1741         struct drbd_conf *mdev = w->mdev;
1742         sector_t sector = peer_req->i.sector;
1743         int err = 0, pcmd;
1744
1745         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1746                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1747                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1748                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1749                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1750                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1751                         err = drbd_send_ack(mdev, pcmd, peer_req);
1752                         if (pcmd == P_RS_WRITE_ACK)
1753                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1754                 } else {
1755                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1756                         /* we expect it to be marked out of sync anyways...
1757                          * maybe assert this?  */
1758                 }
1759                 dec_unacked(mdev);
1760         }
1761         /* we delete from the conflict detection hash _after_ we sent out the
1762          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1763         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1764                 spin_lock_irq(&mdev->tconn->req_lock);
1765                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1766                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1767                 if (peer_req->flags & EE_RESTART_REQUESTS)
1768                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1769                 spin_unlock_irq(&mdev->tconn->req_lock);
1770         } else
1771                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1772
1773         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1774
1775         return err;
1776 }
1777
1778 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1779 {
1780         struct drbd_conf *mdev = w->mdev;
1781         struct drbd_peer_request *peer_req =
1782                 container_of(w, struct drbd_peer_request, w);
1783         int err;
1784
1785         err = drbd_send_ack(mdev, ack, peer_req);
1786         dec_unacked(mdev);
1787
1788         return err;
1789 }
1790
1791 static int e_send_discard_write(struct drbd_work *w, int unused)
1792 {
1793         return e_send_ack(w, P_DISCARD_WRITE);
1794 }
1795
1796 static int e_send_retry_write(struct drbd_work *w, int unused)
1797 {
1798         struct drbd_tconn *tconn = w->mdev->tconn;
1799
1800         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1801                              P_RETRY_WRITE : P_DISCARD_WRITE);
1802 }
1803
1804 static bool seq_greater(u32 a, u32 b)
1805 {
1806         /*
1807          * We assume 32-bit wrap-around here.
1808          * For 24-bit wrap-around, we would have to shift:
1809          *  a <<= 8; b <<= 8;
1810          */
1811         return (s32)a - (s32)b > 0;
1812 }
1813
1814 static u32 seq_max(u32 a, u32 b)
1815 {
1816         return seq_greater(a, b) ? a : b;
1817 }
1818
1819 static bool need_peer_seq(struct drbd_conf *mdev)
1820 {
1821         struct drbd_tconn *tconn = mdev->tconn;
1822         int tp;
1823
1824         /*
1825          * We only need to keep track of the last packet_seq number of our peer
1826          * if we are in dual-primary mode and we have the discard flag set; see
1827          * handle_write_conflicts().
1828          */
1829
1830         rcu_read_lock();
1831         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1832         rcu_read_unlock();
1833
1834         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1835 }
1836
1837 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1838 {
1839         unsigned int newest_peer_seq;
1840
1841         if (need_peer_seq(mdev)) {
1842                 spin_lock(&mdev->peer_seq_lock);
1843                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1844                 mdev->peer_seq = newest_peer_seq;
1845                 spin_unlock(&mdev->peer_seq_lock);
1846                 /* wake up only if we actually changed mdev->peer_seq */
1847                 if (peer_seq == newest_peer_seq)
1848                         wake_up(&mdev->seq_wait);
1849         }
1850 }
1851
1852 /* Called from receive_Data.
1853  * Synchronize packets on sock with packets on msock.
1854  *
1855  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1856  * packet traveling on msock, they are still processed in the order they have
1857  * been sent.
1858  *
1859  * Note: we don't care for Ack packets overtaking P_DATA packets.
1860  *
1861  * In case packet_seq is larger than mdev->peer_seq number, there are
1862  * outstanding packets on the msock. We wait for them to arrive.
1863  * In case we are the logically next packet, we update mdev->peer_seq
1864  * ourselves. Correctly handles 32bit wrap around.
1865  *
1866  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1867  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1868  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1869  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1870  *
1871  * returns 0 if we may process the packet,
1872  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1873 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1874 {
1875         DEFINE_WAIT(wait);
1876         long timeout;
1877         int ret;
1878
1879         if (!need_peer_seq(mdev))
1880                 return 0;
1881
1882         spin_lock(&mdev->peer_seq_lock);
1883         for (;;) {
1884                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1885                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1886                         ret = 0;
1887                         break;
1888                 }
1889                 if (signal_pending(current)) {
1890                         ret = -ERESTARTSYS;
1891                         break;
1892                 }
1893                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1894                 spin_unlock(&mdev->peer_seq_lock);
1895                 rcu_read_lock();
1896                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1897                 rcu_read_unlock();
1898                 timeout = schedule_timeout(timeout);
1899                 spin_lock(&mdev->peer_seq_lock);
1900                 if (!timeout) {
1901                         ret = -ETIMEDOUT;
1902                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1903                         break;
1904                 }
1905         }
1906         spin_unlock(&mdev->peer_seq_lock);
1907         finish_wait(&mdev->seq_wait, &wait);
1908         return ret;
1909 }
1910
1911 /* see also bio_flags_to_wire()
1912  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1913  * flags and back. We may replicate to other kernel versions. */
1914 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1915 {
1916         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1917                 (dpf & DP_FUA ? REQ_FUA : 0) |
1918                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1919                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1920 }
1921
1922 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1923                                     unsigned int size)
1924 {
1925         struct drbd_interval *i;
1926
1927     repeat:
1928         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1929                 struct drbd_request *req;
1930                 struct bio_and_error m;
1931
1932                 if (!i->local)
1933                         continue;
1934                 req = container_of(i, struct drbd_request, i);
1935                 if (!(req->rq_state & RQ_POSTPONED))
1936                         continue;
1937                 req->rq_state &= ~RQ_POSTPONED;
1938                 __req_mod(req, NEG_ACKED, &m);
1939                 spin_unlock_irq(&mdev->tconn->req_lock);
1940                 if (m.bio)
1941                         complete_master_bio(mdev, &m);
1942                 spin_lock_irq(&mdev->tconn->req_lock);
1943                 goto repeat;
1944         }
1945 }
1946
1947 static int handle_write_conflicts(struct drbd_conf *mdev,
1948                                   struct drbd_peer_request *peer_req)
1949 {
1950         struct drbd_tconn *tconn = mdev->tconn;
1951         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1952         sector_t sector = peer_req->i.sector;
1953         const unsigned int size = peer_req->i.size;
1954         struct drbd_interval *i;
1955         bool equal;
1956         int err;
1957
1958         /*
1959          * Inserting the peer request into the write_requests tree will prevent
1960          * new conflicting local requests from being added.
1961          */
1962         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1963
1964     repeat:
1965         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1966                 if (i == &peer_req->i)
1967                         continue;
1968
1969                 if (!i->local) {
1970                         /*
1971                          * Our peer has sent a conflicting remote request; this
1972                          * should not happen in a two-node setup.  Wait for the
1973                          * earlier peer request to complete.
1974                          */
1975                         err = drbd_wait_misc(mdev, i);
1976                         if (err)
1977                                 goto out;
1978                         goto repeat;
1979                 }
1980
1981                 equal = i->sector == sector && i->size == size;
1982                 if (resolve_conflicts) {
1983                         /*
1984                          * If the peer request is fully contained within the
1985                          * overlapping request, it can be discarded; otherwise,
1986                          * it will be retried once all overlapping requests
1987                          * have completed.
1988                          */
1989                         bool discard = i->sector <= sector && i->sector +
1990                                        (i->size >> 9) >= sector + (size >> 9);
1991
1992                         if (!equal)
1993                                 dev_alert(DEV, "Concurrent writes detected: "
1994                                                "local=%llus +%u, remote=%llus +%u, "
1995                                                "assuming %s came first\n",
1996                                           (unsigned long long)i->sector, i->size,
1997                                           (unsigned long long)sector, size,
1998                                           discard ? "local" : "remote");
1999
2000                         inc_unacked(mdev);
2001                         peer_req->w.cb = discard ? e_send_discard_write :
2002                                                    e_send_retry_write;
2003                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2004                         wake_asender(mdev->tconn);
2005
2006                         err = -ENOENT;
2007                         goto out;
2008                 } else {
2009                         struct drbd_request *req =
2010                                 container_of(i, struct drbd_request, i);
2011
2012                         if (!equal)
2013                                 dev_alert(DEV, "Concurrent writes detected: "
2014                                                "local=%llus +%u, remote=%llus +%u\n",
2015                                           (unsigned long long)i->sector, i->size,
2016                                           (unsigned long long)sector, size);
2017
2018                         if (req->rq_state & RQ_LOCAL_PENDING ||
2019                             !(req->rq_state & RQ_POSTPONED)) {
2020                                 /*
2021                                  * Wait for the node with the discard flag to
2022                                  * decide if this request will be discarded or
2023                                  * retried.  Requests that are discarded will
2024                                  * disappear from the write_requests tree.
2025                                  *
2026                                  * In addition, wait for the conflicting
2027                                  * request to finish locally before submitting
2028                                  * the conflicting peer request.
2029                                  */
2030                                 err = drbd_wait_misc(mdev, &req->i);
2031                                 if (err) {
2032                                         _conn_request_state(mdev->tconn,
2033                                                             NS(conn, C_TIMEOUT),
2034                                                             CS_HARD);
2035                                         fail_postponed_requests(mdev, sector, size);
2036                                         goto out;
2037                                 }
2038                                 goto repeat;
2039                         }
2040                         /*
2041                          * Remember to restart the conflicting requests after
2042                          * the new peer request has completed.
2043                          */
2044                         peer_req->flags |= EE_RESTART_REQUESTS;
2045                 }
2046         }
2047         err = 0;
2048
2049     out:
2050         if (err)
2051                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2052         return err;
2053 }
2054
2055 /* mirrored write */
2056 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2057 {
2058         struct drbd_conf *mdev;
2059         sector_t sector;
2060         struct drbd_peer_request *peer_req;
2061         struct p_data *p = pi->data;
2062         u32 peer_seq = be32_to_cpu(p->seq_num);
2063         int rw = WRITE;
2064         u32 dp_flags;
2065         int err, tp;
2066
2067         mdev = vnr_to_mdev(tconn, pi->vnr);
2068         if (!mdev)
2069                 return -EIO;
2070
2071         if (!get_ldev(mdev)) {
2072                 int err2;
2073
2074                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2075                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2076                 atomic_inc(&mdev->current_epoch->epoch_size);
2077                 err2 = drbd_drain_block(mdev, pi->size);
2078                 if (!err)
2079                         err = err2;
2080                 return err;
2081         }
2082
2083         /*
2084          * Corresponding put_ldev done either below (on various errors), or in
2085          * drbd_peer_request_endio, if we successfully submit the data at the
2086          * end of this function.
2087          */
2088
2089         sector = be64_to_cpu(p->sector);
2090         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2091         if (!peer_req) {
2092                 put_ldev(mdev);
2093                 return -EIO;
2094         }
2095
2096         peer_req->w.cb = e_end_block;
2097
2098         dp_flags = be32_to_cpu(p->dp_flags);
2099         rw |= wire_flags_to_bio(mdev, dp_flags);
2100
2101         if (dp_flags & DP_MAY_SET_IN_SYNC)
2102                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2103
2104         spin_lock(&mdev->epoch_lock);
2105         peer_req->epoch = mdev->current_epoch;
2106         atomic_inc(&peer_req->epoch->epoch_size);
2107         atomic_inc(&peer_req->epoch->active);
2108         spin_unlock(&mdev->epoch_lock);
2109
2110         rcu_read_lock();
2111         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2112         rcu_read_unlock();
2113         if (tp) {
2114                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2115                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2116                 if (err)
2117                         goto out_interrupted;
2118                 spin_lock_irq(&mdev->tconn->req_lock);
2119                 err = handle_write_conflicts(mdev, peer_req);
2120                 if (err) {
2121                         spin_unlock_irq(&mdev->tconn->req_lock);
2122                         if (err == -ENOENT) {
2123                                 put_ldev(mdev);
2124                                 return 0;
2125                         }
2126                         goto out_interrupted;
2127                 }
2128         } else
2129                 spin_lock_irq(&mdev->tconn->req_lock);
2130         list_add(&peer_req->w.list, &mdev->active_ee);
2131         spin_unlock_irq(&mdev->tconn->req_lock);
2132
2133         if (mdev->tconn->agreed_pro_version < 100) {
2134                 rcu_read_lock();
2135                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2136                 case DRBD_PROT_C:
2137                         dp_flags |= DP_SEND_WRITE_ACK;
2138                         break;
2139                 case DRBD_PROT_B:
2140                         dp_flags |= DP_SEND_RECEIVE_ACK;
2141                         break;
2142                 }
2143                 rcu_read_unlock();
2144         }
2145
2146         if (dp_flags & DP_SEND_WRITE_ACK) {
2147                 peer_req->flags |= EE_SEND_WRITE_ACK;
2148                 inc_unacked(mdev);
2149                 /* corresponding dec_unacked() in e_end_block()
2150                  * respective _drbd_clear_done_ee */
2151         }
2152
2153         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2154                 /* I really don't like it that the receiver thread
2155                  * sends on the msock, but anyways */
2156                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2157         }
2158
2159         if (mdev->state.pdsk < D_INCONSISTENT) {
2160                 /* In case we have the only disk of the cluster, */
2161                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2162                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2163                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2164                 drbd_al_begin_io(mdev, &peer_req->i);
2165         }
2166
2167         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2168         if (!err)
2169                 return 0;
2170
2171         /* don't care for the reason here */
2172         dev_err(DEV, "submit failed, triggering re-connect\n");
2173         spin_lock_irq(&mdev->tconn->req_lock);
2174         list_del(&peer_req->w.list);
2175         drbd_remove_epoch_entry_interval(mdev, peer_req);
2176         spin_unlock_irq(&mdev->tconn->req_lock);
2177         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2178                 drbd_al_complete_io(mdev, &peer_req->i);
2179
2180 out_interrupted:
2181         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2182         put_ldev(mdev);
2183         drbd_free_peer_req(mdev, peer_req);
2184         return err;
2185 }
2186
2187 /* We may throttle resync, if the lower device seems to be busy,
2188  * and current sync rate is above c_min_rate.
2189  *
2190  * To decide whether or not the lower device is busy, we use a scheme similar
2191  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2192  * (more than 64 sectors) of activity we cannot account for with our own resync
2193  * activity, it obviously is "busy".
2194  *
2195  * The current sync rate used here uses only the most recent two step marks,
2196  * to have a short time average so we can react faster.
2197  */
2198 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2199 {
2200         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2201         unsigned long db, dt, dbdt;
2202         struct lc_element *tmp;
2203         int curr_events;
2204         int throttle = 0;
2205         unsigned int c_min_rate;
2206
2207         rcu_read_lock();
2208         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2209         rcu_read_unlock();
2210
2211         /* feature disabled? */
2212         if (c_min_rate == 0)
2213                 return 0;
2214
2215         spin_lock_irq(&mdev->al_lock);
2216         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2217         if (tmp) {
2218                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2219                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2220                         spin_unlock_irq(&mdev->al_lock);
2221                         return 0;
2222                 }
2223                 /* Do not slow down if app IO is already waiting for this extent */
2224         }
2225         spin_unlock_irq(&mdev->al_lock);
2226
2227         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2228                       (int)part_stat_read(&disk->part0, sectors[1]) -
2229                         atomic_read(&mdev->rs_sect_ev);
2230
2231         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2232                 unsigned long rs_left;
2233                 int i;
2234
2235                 mdev->rs_last_events = curr_events;
2236
2237                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2238                  * approx. */
2239                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2240
2241                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2242                         rs_left = mdev->ov_left;
2243                 else
2244                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2245
2246                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2247                 if (!dt)
2248                         dt++;
2249                 db = mdev->rs_mark_left[i] - rs_left;
2250                 dbdt = Bit2KB(db/dt);
2251
2252                 if (dbdt > c_min_rate)
2253                         throttle = 1;
2254         }
2255         return throttle;
2256 }
2257
2258
2259 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2260 {
2261         struct drbd_conf *mdev;
2262         sector_t sector;
2263         sector_t capacity;
2264         struct drbd_peer_request *peer_req;
2265         struct digest_info *di = NULL;
2266         int size, verb;
2267         unsigned int fault_type;
2268         struct p_block_req *p = pi->data;
2269
2270         mdev = vnr_to_mdev(tconn, pi->vnr);
2271         if (!mdev)
2272                 return -EIO;
2273         capacity = drbd_get_capacity(mdev->this_bdev);
2274
2275         sector = be64_to_cpu(p->sector);
2276         size   = be32_to_cpu(p->blksize);
2277
2278         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2279                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2280                                 (unsigned long long)sector, size);
2281                 return -EINVAL;
2282         }
2283         if (sector + (size>>9) > capacity) {
2284                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2285                                 (unsigned long long)sector, size);
2286                 return -EINVAL;
2287         }
2288
2289         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2290                 verb = 1;
2291                 switch (pi->cmd) {
2292                 case P_DATA_REQUEST:
2293                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2294                         break;
2295                 case P_RS_DATA_REQUEST:
2296                 case P_CSUM_RS_REQUEST:
2297                 case P_OV_REQUEST:
2298                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2299                         break;
2300                 case P_OV_REPLY:
2301                         verb = 0;
2302                         dec_rs_pending(mdev);
2303                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2304                         break;
2305                 default:
2306                         BUG();
2307                 }
2308                 if (verb && __ratelimit(&drbd_ratelimit_state))
2309                         dev_err(DEV, "Can not satisfy peer's read request, "
2310                             "no local data.\n");
2311
2312                 /* drain possibly payload */
2313                 return drbd_drain_block(mdev, pi->size);
2314         }
2315
2316         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2317          * "criss-cross" setup, that might cause write-out on some other DRBD,
2318          * which in turn might block on the other node at this very place.  */
2319         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2320         if (!peer_req) {
2321                 put_ldev(mdev);
2322                 return -ENOMEM;
2323         }
2324
2325         switch (pi->cmd) {
2326         case P_DATA_REQUEST:
2327                 peer_req->w.cb = w_e_end_data_req;
2328                 fault_type = DRBD_FAULT_DT_RD;
2329                 /* application IO, don't drbd_rs_begin_io */
2330                 goto submit;
2331
2332         case P_RS_DATA_REQUEST:
2333                 peer_req->w.cb = w_e_end_rsdata_req;
2334                 fault_type = DRBD_FAULT_RS_RD;
2335                 /* used in the sector offset progress display */
2336                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2337                 break;
2338
2339         case P_OV_REPLY:
2340         case P_CSUM_RS_REQUEST:
2341                 fault_type = DRBD_FAULT_RS_RD;
2342                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2343                 if (!di)
2344                         goto out_free_e;
2345
2346                 di->digest_size = pi->size;
2347                 di->digest = (((char *)di)+sizeof(struct digest_info));
2348
2349                 peer_req->digest = di;
2350                 peer_req->flags |= EE_HAS_DIGEST;
2351
2352                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2353                         goto out_free_e;
2354
2355                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2356                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2357                         peer_req->w.cb = w_e_end_csum_rs_req;
2358                         /* used in the sector offset progress display */
2359                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2360                 } else if (pi->cmd == P_OV_REPLY) {
2361                         /* track progress, we may need to throttle */
2362                         atomic_add(size >> 9, &mdev->rs_sect_in);
2363                         peer_req->w.cb = w_e_end_ov_reply;
2364                         dec_rs_pending(mdev);
2365                         /* drbd_rs_begin_io done when we sent this request,
2366                          * but accounting still needs to be done. */
2367                         goto submit_for_resync;
2368                 }
2369                 break;
2370
2371         case P_OV_REQUEST:
2372                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2373                     mdev->tconn->agreed_pro_version >= 90) {
2374                         unsigned long now = jiffies;
2375                         int i;
2376                         mdev->ov_start_sector = sector;
2377                         mdev->ov_position = sector;
2378                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2379                         mdev->rs_total = mdev->ov_left;
2380                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2381                                 mdev->rs_mark_left[i] = mdev->ov_left;
2382                                 mdev->rs_mark_time[i] = now;
2383                         }
2384                         dev_info(DEV, "Online Verify start sector: %llu\n",
2385                                         (unsigned long long)sector);
2386                 }
2387                 peer_req->w.cb = w_e_end_ov_req;
2388                 fault_type = DRBD_FAULT_RS_RD;
2389                 break;
2390
2391         default:
2392                 BUG();
2393         }
2394
2395         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2396          * wrt the receiver, but it is not as straightforward as it may seem.
2397          * Various places in the resync start and stop logic assume resync
2398          * requests are processed in order, requeuing this on the worker thread
2399          * introduces a bunch of new code for synchronization between threads.
2400          *
2401          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2402          * "forever", throttling after drbd_rs_begin_io will lock that extent
2403          * for application writes for the same time.  For now, just throttle
2404          * here, where the rest of the code expects the receiver to sleep for
2405          * a while, anyways.
2406          */
2407
2408         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2409          * this defers syncer requests for some time, before letting at least
2410          * on request through.  The resync controller on the receiving side
2411          * will adapt to the incoming rate accordingly.
2412          *
2413          * We cannot throttle here if remote is Primary/SyncTarget:
2414          * we would also throttle its application reads.
2415          * In that case, throttling is done on the SyncTarget only.
2416          */
2417         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2418                 schedule_timeout_uninterruptible(HZ/10);
2419         if (drbd_rs_begin_io(mdev, sector))
2420                 goto out_free_e;
2421
2422 submit_for_resync:
2423         atomic_add(size >> 9, &mdev->rs_sect_ev);
2424
2425 submit:
2426         inc_unacked(mdev);
2427         spin_lock_irq(&mdev->tconn->req_lock);
2428         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2429         spin_unlock_irq(&mdev->tconn->req_lock);
2430
2431         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2432                 return 0;
2433
2434         /* don't care for the reason here */
2435         dev_err(DEV, "submit failed, triggering re-connect\n");
2436         spin_lock_irq(&mdev->tconn->req_lock);
2437         list_del(&peer_req->w.list);
2438         spin_unlock_irq(&mdev->tconn->req_lock);
2439         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2440
2441 out_free_e:
2442         put_ldev(mdev);
2443         drbd_free_peer_req(mdev, peer_req);
2444         return -EIO;
2445 }
2446
2447 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2448 {
2449         int self, peer, rv = -100;
2450         unsigned long ch_self, ch_peer;
2451         enum drbd_after_sb_p after_sb_0p;
2452
2453         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2454         peer = mdev->p_uuid[UI_BITMAP] & 1;
2455
2456         ch_peer = mdev->p_uuid[UI_SIZE];
2457         ch_self = mdev->comm_bm_set;
2458
2459         rcu_read_lock();
2460         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2461         rcu_read_unlock();
2462         switch (after_sb_0p) {
2463         case ASB_CONSENSUS:
2464         case ASB_DISCARD_SECONDARY:
2465         case ASB_CALL_HELPER:
2466         case ASB_VIOLENTLY:
2467                 dev_err(DEV, "Configuration error.\n");
2468                 break;
2469         case ASB_DISCONNECT:
2470                 break;
2471         case ASB_DISCARD_YOUNGER_PRI:
2472                 if (self == 0 && peer == 1) {
2473                         rv = -1;
2474                         break;
2475                 }
2476                 if (self == 1 && peer == 0) {
2477                         rv =  1;
2478                         break;
2479                 }
2480                 /* Else fall through to one of the other strategies... */
2481         case ASB_DISCARD_OLDER_PRI:
2482                 if (self == 0 && peer == 1) {
2483                         rv = 1;
2484                         break;
2485                 }
2486                 if (self == 1 && peer == 0) {
2487                         rv = -1;
2488                         break;
2489                 }
2490                 /* Else fall through to one of the other strategies... */
2491                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2492                      "Using discard-least-changes instead\n");
2493         case ASB_DISCARD_ZERO_CHG:
2494                 if (ch_peer == 0 && ch_self == 0) {
2495                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2496                                 ? -1 : 1;
2497                         break;
2498                 } else {
2499                         if (ch_peer == 0) { rv =  1; break; }
2500                         if (ch_self == 0) { rv = -1; break; }
2501                 }
2502                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2503                         break;
2504         case ASB_DISCARD_LEAST_CHG:
2505                 if      (ch_self < ch_peer)
2506                         rv = -1;
2507                 else if (ch_self > ch_peer)
2508                         rv =  1;
2509                 else /* ( ch_self == ch_peer ) */
2510                      /* Well, then use something else. */
2511                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2512                                 ? -1 : 1;
2513                 break;
2514         case ASB_DISCARD_LOCAL:
2515                 rv = -1;
2516                 break;
2517         case ASB_DISCARD_REMOTE:
2518                 rv =  1;
2519         }
2520
2521         return rv;
2522 }
2523
2524 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2525 {
2526         int hg, rv = -100;
2527         enum drbd_after_sb_p after_sb_1p;
2528
2529         rcu_read_lock();
2530         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2531         rcu_read_unlock();
2532         switch (after_sb_1p) {
2533         case ASB_DISCARD_YOUNGER_PRI:
2534         case ASB_DISCARD_OLDER_PRI:
2535         case ASB_DISCARD_LEAST_CHG:
2536         case ASB_DISCARD_LOCAL:
2537         case ASB_DISCARD_REMOTE:
2538         case ASB_DISCARD_ZERO_CHG:
2539                 dev_err(DEV, "Configuration error.\n");
2540                 break;
2541         case ASB_DISCONNECT:
2542                 break;
2543         case ASB_CONSENSUS:
2544                 hg = drbd_asb_recover_0p(mdev);
2545                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2546                         rv = hg;
2547                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2548                         rv = hg;
2549                 break;
2550         case ASB_VIOLENTLY:
2551                 rv = drbd_asb_recover_0p(mdev);
2552                 break;
2553         case ASB_DISCARD_SECONDARY:
2554                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2555         case ASB_CALL_HELPER:
2556                 hg = drbd_asb_recover_0p(mdev);
2557                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2558                         enum drbd_state_rv rv2;
2559
2560                         drbd_set_role(mdev, R_SECONDARY, 0);
2561                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2562                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2563                           * we do not need to wait for the after state change work either. */
2564                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2565                         if (rv2 != SS_SUCCESS) {
2566                                 drbd_khelper(mdev, "pri-lost-after-sb");
2567                         } else {
2568                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2569                                 rv = hg;
2570                         }
2571                 } else
2572                         rv = hg;
2573         }
2574
2575         return rv;
2576 }
2577
2578 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2579 {
2580         int hg, rv = -100;
2581         enum drbd_after_sb_p after_sb_2p;
2582
2583         rcu_read_lock();
2584         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2585         rcu_read_unlock();
2586         switch (after_sb_2p) {
2587         case ASB_DISCARD_YOUNGER_PRI:
2588         case ASB_DISCARD_OLDER_PRI:
2589         case ASB_DISCARD_LEAST_CHG:
2590         case ASB_DISCARD_LOCAL:
2591         case ASB_DISCARD_REMOTE:
2592         case ASB_CONSENSUS:
2593         case ASB_DISCARD_SECONDARY:
2594         case ASB_DISCARD_ZERO_CHG:
2595                 dev_err(DEV, "Configuration error.\n");
2596                 break;
2597         case ASB_VIOLENTLY:
2598                 rv = drbd_asb_recover_0p(mdev);
2599                 break;
2600         case ASB_DISCONNECT:
2601                 break;
2602         case ASB_CALL_HELPER:
2603                 hg = drbd_asb_recover_0p(mdev);
2604                 if (hg == -1) {
2605                         enum drbd_state_rv rv2;
2606
2607                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2608                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2609                           * we do not need to wait for the after state change work either. */
2610                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2611                         if (rv2 != SS_SUCCESS) {
2612                                 drbd_khelper(mdev, "pri-lost-after-sb");
2613                         } else {
2614                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2615                                 rv = hg;
2616                         }
2617                 } else
2618                         rv = hg;
2619         }
2620
2621         return rv;
2622 }
2623
2624 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2625                            u64 bits, u64 flags)
2626 {
2627         if (!uuid) {
2628                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2629                 return;
2630         }
2631         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2632              text,
2633              (unsigned long long)uuid[UI_CURRENT],
2634              (unsigned long long)uuid[UI_BITMAP],
2635              (unsigned long long)uuid[UI_HISTORY_START],
2636              (unsigned long long)uuid[UI_HISTORY_END],
2637              (unsigned long long)bits,
2638              (unsigned long long)flags);
2639 }
2640
2641 /*
2642   100   after split brain try auto recover
2643     2   C_SYNC_SOURCE set BitMap
2644     1   C_SYNC_SOURCE use BitMap
2645     0   no Sync
2646    -1   C_SYNC_TARGET use BitMap
2647    -2   C_SYNC_TARGET set BitMap
2648  -100   after split brain, disconnect
2649 -1000   unrelated data
2650 -1091   requires proto 91
2651 -1096   requires proto 96
2652  */
2653 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2654 {
2655         u64 self, peer;
2656         int i, j;
2657
2658         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2659         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2660
2661         *rule_nr = 10;
2662         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2663                 return 0;
2664
2665         *rule_nr = 20;
2666         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2667              peer != UUID_JUST_CREATED)
2668                 return -2;
2669
2670         *rule_nr = 30;
2671         if (self != UUID_JUST_CREATED &&
2672             (peer == UUID_JUST_CREATED || peer == (u64)0))
2673                 return 2;
2674
2675         if (self == peer) {
2676                 int rct, dc; /* roles at crash time */
2677
2678                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2679
2680                         if (mdev->tconn->agreed_pro_version < 91)
2681                                 return -1091;
2682
2683                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2684                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2685                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2686                                 drbd_uuid_set_bm(mdev, 0UL);
2687
2688                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2689                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2690                                 *rule_nr = 34;
2691                         } else {
2692                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2693                                 *rule_nr = 36;
2694                         }
2695
2696                         return 1;
2697                 }
2698
2699                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2700
2701                         if (mdev->tconn->agreed_pro_version < 91)
2702                                 return -1091;
2703
2704                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2705                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2706                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2707
2708                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2709                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2710                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2711
2712                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2713                                 *rule_nr = 35;
2714                         } else {
2715                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2716                                 *rule_nr = 37;
2717                         }
2718
2719                         return -1;
2720                 }
2721
2722                 /* Common power [off|failure] */
2723                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2724                         (mdev->p_uuid[UI_FLAGS] & 2);
2725                 /* lowest bit is set when we were primary,
2726                  * next bit (weight 2) is set when peer was primary */
2727                 *rule_nr = 40;
2728
2729                 switch (rct) {
2730                 case 0: /* !self_pri && !peer_pri */ return 0;
2731                 case 1: /*  self_pri && !peer_pri */ return 1;
2732                 case 2: /* !self_pri &&  peer_pri */ return -1;
2733                 case 3: /*  self_pri &&  peer_pri */
2734                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2735                         return dc ? -1 : 1;
2736                 }
2737         }
2738
2739         *rule_nr = 50;
2740         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2741         if (self == peer)
2742                 return -1;
2743
2744         *rule_nr = 51;
2745         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2746         if (self == peer) {
2747                 if (mdev->tconn->agreed_pro_version < 96 ?
2748                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2749                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2750                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2751                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2752                            resync as sync source modifications of the peer's UUIDs. */
2753
2754                         if (mdev->tconn->agreed_pro_version < 91)
2755                                 return -1091;
2756
2757                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2758                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2759
2760                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2761                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2762
2763                         return -1;
2764                 }
2765         }
2766
2767         *rule_nr = 60;
2768         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2769         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2770                 peer = mdev->p_uuid[i] & ~((u64)1);
2771                 if (self == peer)
2772                         return -2;
2773         }
2774
2775         *rule_nr = 70;
2776         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2777         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2778         if (self == peer)
2779                 return 1;
2780
2781         *rule_nr = 71;
2782         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2783         if (self == peer) {
2784                 if (mdev->tconn->agreed_pro_version < 96 ?
2785                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2786                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2787                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2788                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2789                            resync as sync source modifications of our UUIDs. */
2790
2791                         if (mdev->tconn->agreed_pro_version < 91)
2792                                 return -1091;
2793
2794                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2795                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2796
2797                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2798                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2799                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2800
2801                         return 1;
2802                 }
2803         }
2804
2805
2806         *rule_nr = 80;
2807         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2808         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810                 if (self == peer)
2811                         return 2;
2812         }
2813
2814         *rule_nr = 90;
2815         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2816         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2817         if (self == peer && self != ((u64)0))
2818                 return 100;
2819
2820         *rule_nr = 100;
2821         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2822                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2823                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2824                         peer = mdev->p_uuid[j] & ~((u64)1);
2825                         if (self == peer)
2826                                 return -100;
2827                 }
2828         }
2829
2830         return -1000;
2831 }
2832
2833 /* drbd_sync_handshake() returns the new conn state on success, or
2834    CONN_MASK (-1) on failure.
2835  */
2836 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2837                                            enum drbd_disk_state peer_disk) __must_hold(local)
2838 {
2839         enum drbd_conns rv = C_MASK;
2840         enum drbd_disk_state mydisk;
2841         struct net_conf *nc;
2842         int hg, rule_nr, rr_conflict, dry_run;
2843
2844         mydisk = mdev->state.disk;
2845         if (mydisk == D_NEGOTIATING)
2846                 mydisk = mdev->new_state_tmp.disk;
2847
2848         dev_info(DEV, "drbd_sync_handshake:\n");
2849         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2850         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2851                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2852
2853         hg = drbd_uuid_compare(mdev, &rule_nr);
2854
2855         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2856
2857         if (hg == -1000) {
2858                 dev_alert(DEV, "Unrelated data, aborting!\n");
2859                 return C_MASK;
2860         }
2861         if (hg < -1000) {
2862                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2863                 return C_MASK;
2864         }
2865
2866         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2867             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2868                 int f = (hg == -100) || abs(hg) == 2;
2869                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2870                 if (f)
2871                         hg = hg*2;
2872                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2873                      hg > 0 ? "source" : "target");
2874         }
2875
2876         if (abs(hg) == 100)
2877                 drbd_khelper(mdev, "initial-split-brain");
2878
2879         rcu_read_lock();
2880         nc = rcu_dereference(mdev->tconn->net_conf);
2881
2882         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2883                 int pcount = (mdev->state.role == R_PRIMARY)
2884                            + (peer_role == R_PRIMARY);
2885                 int forced = (hg == -100);
2886
2887                 switch (pcount) {
2888                 case 0:
2889                         hg = drbd_asb_recover_0p(mdev);
2890                         break;
2891                 case 1:
2892                         hg = drbd_asb_recover_1p(mdev);
2893                         break;
2894                 case 2:
2895                         hg = drbd_asb_recover_2p(mdev);
2896                         break;
2897                 }
2898                 if (abs(hg) < 100) {
2899                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2900                              "automatically solved. Sync from %s node\n",
2901                              pcount, (hg < 0) ? "peer" : "this");
2902                         if (forced) {
2903                                 dev_warn(DEV, "Doing a full sync, since"
2904                                      " UUIDs where ambiguous.\n");
2905                                 hg = hg*2;
2906                         }
2907                 }
2908         }
2909
2910         if (hg == -100) {
2911                 if (nc->discard_my_data && !(mdev->p_uuid[UI_FLAGS]&1))
2912                         hg = -1;
2913                 if (!nc->discard_my_data && (mdev->p_uuid[UI_FLAGS]&1))
2914                         hg = 1;
2915
2916                 if (abs(hg) < 100)
2917                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2918                              "Sync from %s node\n",
2919                              (hg < 0) ? "peer" : "this");
2920         }
2921         rr_conflict = nc->rr_conflict;
2922         dry_run = nc->dry_run;
2923         rcu_read_unlock();
2924
2925         if (hg == -100) {
2926                 /* FIXME this log message is not correct if we end up here
2927                  * after an attempted attach on a diskless node.
2928                  * We just refuse to attach -- well, we drop the "connection"
2929                  * to that disk, in a way... */
2930                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2931                 drbd_khelper(mdev, "split-brain");
2932                 return C_MASK;
2933         }
2934
2935         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2936                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2937                 return C_MASK;
2938         }
2939
2940         if (hg < 0 && /* by intention we do not use mydisk here. */
2941             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2942                 switch (rr_conflict) {
2943                 case ASB_CALL_HELPER:
2944                         drbd_khelper(mdev, "pri-lost");
2945                         /* fall through */
2946                 case ASB_DISCONNECT:
2947                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2948                         return C_MASK;
2949                 case ASB_VIOLENTLY:
2950                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2951                              "assumption\n");
2952                 }
2953         }
2954
2955         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2956                 if (hg == 0)
2957                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2958                 else
2959                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2960                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2961                                  abs(hg) >= 2 ? "full" : "bit-map based");
2962                 return C_MASK;
2963         }
2964
2965         if (abs(hg) >= 2) {
2966                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2967                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2968                                         BM_LOCKED_SET_ALLOWED))
2969                         return C_MASK;
2970         }
2971
2972         if (hg > 0) { /* become sync source. */
2973                 rv = C_WF_BITMAP_S;
2974         } else if (hg < 0) { /* become sync target */
2975                 rv = C_WF_BITMAP_T;
2976         } else {
2977                 rv = C_CONNECTED;
2978                 if (drbd_bm_total_weight(mdev)) {
2979                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2980                              drbd_bm_total_weight(mdev));
2981                 }
2982         }
2983
2984         return rv;
2985 }
2986
2987 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
2988 {
2989         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2990         if (peer == ASB_DISCARD_REMOTE)
2991                 return ASB_DISCARD_LOCAL;
2992
2993         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2994         if (peer == ASB_DISCARD_LOCAL)
2995                 return ASB_DISCARD_REMOTE;
2996
2997         /* everything else is valid if they are equal on both sides. */
2998         return peer;
2999 }
3000
3001 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3002 {
3003         struct p_protocol *p = pi->data;
3004         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3005         int p_discard_my_data, p_two_primaries, cf;
3006         struct net_conf *nc;
3007         void *int_dig_in = NULL, *int_dig_vv = NULL;
3008
3009         p_proto         = be32_to_cpu(p->protocol);
3010         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3011         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3012         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3013         p_two_primaries = be32_to_cpu(p->two_primaries);
3014         cf              = be32_to_cpu(p->conn_flags);
3015         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3016
3017         if (tconn->agreed_pro_version >= 87) {
3018                 char integrity_alg[SHARED_SECRET_MAX];
3019                 struct crypto_hash *tfm = NULL;
3020                 int err;
3021
3022                 if (pi->size > sizeof(integrity_alg))
3023                         return -EIO;
3024                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3025                 if (err)
3026                         return err;
3027                 integrity_alg[SHARED_SECRET_MAX-1] = 0;
3028
3029                 if (integrity_alg[0]) {
3030                         int hash_size;
3031
3032                         tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3033                         if (!tfm) {
3034                                 conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3035                                          integrity_alg);
3036                                 goto disconnect;
3037                         }
3038                         conn_info(tconn, "peer data-integrity-alg: %s\n", integrity_alg);
3039
3040                         hash_size = crypto_hash_digestsize(tfm);
3041                         int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3042                         int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3043                         if (!(int_dig_in && int_dig_vv)) {
3044                                 crypto_free_hash(tfm);
3045                                 goto disconnect;
3046                         }
3047                 }
3048
3049                 if (tconn->peer_integrity_tfm)
3050                         crypto_free_hash(tconn->peer_integrity_tfm);
3051                 tconn->peer_integrity_tfm = tfm;
3052                 kfree(tconn->int_dig_in);
3053                 kfree(tconn->int_dig_vv);
3054                 tconn->int_dig_in = int_dig_in;
3055                 tconn->int_dig_vv = int_dig_vv;
3056         }
3057
3058         clear_bit(CONN_DRY_RUN, &tconn->flags);
3059
3060         if (cf & CF_DRY_RUN)
3061                 set_bit(CONN_DRY_RUN, &tconn->flags);
3062
3063         rcu_read_lock();
3064         nc = rcu_dereference(tconn->net_conf);
3065
3066         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3067                 conn_err(tconn, "incompatible communication protocols\n");
3068                 goto disconnect_rcu_unlock;
3069         }
3070
3071         if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3072                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3073                 goto disconnect_rcu_unlock;
3074         }
3075
3076         if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3077                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3078                 goto disconnect_rcu_unlock;
3079         }
3080
3081         if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3082                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3083                 goto disconnect_rcu_unlock;
3084         }
3085
3086         if (p_discard_my_data && nc->discard_my_data) {
3087                 conn_err(tconn, "both sides have the 'discard_my_data' flag set\n");
3088                 goto disconnect_rcu_unlock;
3089         }
3090
3091         if (p_two_primaries != nc->two_primaries) {
3092                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3093                 goto disconnect_rcu_unlock;
3094         }
3095
3096         rcu_read_unlock();
3097
3098         return 0;
3099
3100 disconnect_rcu_unlock:
3101         rcu_read_unlock();
3102 disconnect:
3103         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3104         return -EIO;
3105 }
3106
3107 /* helper function
3108  * input: alg name, feature name
3109  * return: NULL (alg name was "")
3110  *         ERR_PTR(error) if something goes wrong
3111  *         or the crypto hash ptr, if it worked out ok. */
3112 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3113                 const char *alg, const char *name)
3114 {
3115         struct crypto_hash *tfm;
3116
3117         if (!alg[0])
3118                 return NULL;
3119
3120         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3121         if (IS_ERR(tfm)) {
3122                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3123                         alg, name, PTR_ERR(tfm));
3124                 return tfm;
3125         }
3126         return tfm;
3127 }
3128
3129 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3130 {
3131         void *buffer = tconn->data.rbuf;
3132         int size = pi->size;
3133
3134         while (size) {
3135                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3136                 s = drbd_recv(tconn, buffer, s);
3137                 if (s <= 0) {
3138                         if (s < 0)
3139                                 return s;
3140                         break;
3141                 }
3142                 size -= s;
3143         }
3144         if (size)
3145                 return -EIO;
3146         return 0;
3147 }
3148
3149 /*
3150  * config_unknown_volume  -  device configuration command for unknown volume
3151  *
3152  * When a device is added to an existing connection, the node on which the
3153  * device is added first will send configuration commands to its peer but the
3154  * peer will not know about the device yet.  It will warn and ignore these
3155  * commands.  Once the device is added on the second node, the second node will
3156  * send the same device configuration commands, but in the other direction.
3157  *
3158  * (We can also end up here if drbd is misconfigured.)
3159  */
3160 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3161 {
3162         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3163                   pi->vnr, cmdname(pi->cmd));
3164         return ignore_remaining_packet(tconn, pi);
3165 }
3166
3167 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3168 {
3169         struct drbd_conf *mdev;
3170         struct p_rs_param_95 *p;
3171         unsigned int header_size, data_size, exp_max_sz;
3172         struct crypto_hash *verify_tfm = NULL;
3173         struct crypto_hash *csums_tfm = NULL;
3174         struct net_conf *old_net_conf, *new_net_conf = NULL;
3175         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3176         const int apv = tconn->agreed_pro_version;
3177         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3178         int fifo_size = 0;
3179         int err;
3180
3181         mdev = vnr_to_mdev(tconn, pi->vnr);
3182         if (!mdev)
3183                 return config_unknown_volume(tconn, pi);
3184
3185         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3186                     : apv == 88 ? sizeof(struct p_rs_param)
3187                                         + SHARED_SECRET_MAX
3188                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3189                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3190
3191         if (pi->size > exp_max_sz) {
3192                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3193                     pi->size, exp_max_sz);
3194                 return -EIO;
3195         }
3196
3197         if (apv <= 88) {
3198                 header_size = sizeof(struct p_rs_param);
3199                 data_size = pi->size - header_size;
3200         } else if (apv <= 94) {
3201                 header_size = sizeof(struct p_rs_param_89);
3202                 data_size = pi->size - header_size;
3203                 D_ASSERT(data_size == 0);
3204         } else {
3205                 header_size = sizeof(struct p_rs_param_95);
3206                 data_size = pi->size - header_size;
3207                 D_ASSERT(data_size == 0);
3208         }
3209
3210         /* initialize verify_alg and csums_alg */
3211         p = pi->data;
3212         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3213
3214         err = drbd_recv_all(mdev->tconn, p, header_size);
3215         if (err)
3216                 return err;
3217
3218         mutex_lock(&mdev->tconn->conf_update);
3219         old_net_conf = mdev->tconn->net_conf;
3220         if (get_ldev(mdev)) {
3221                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3222                 if (!new_disk_conf) {
3223                         put_ldev(mdev);
3224                         mutex_unlock(&mdev->tconn->conf_update);
3225                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3226                         return -ENOMEM;
3227                 }
3228
3229                 old_disk_conf = mdev->ldev->disk_conf;
3230                 *new_disk_conf = *old_disk_conf;
3231
3232                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3233         }
3234
3235         if (apv >= 88) {
3236                 if (apv == 88) {
3237                         if (data_size > SHARED_SECRET_MAX) {
3238                                 dev_err(DEV, "verify-alg too long, "
3239                                     "peer wants %u, accepting only %u byte\n",
3240                                                 data_size, SHARED_SECRET_MAX);
3241                                 err = -EIO;
3242                                 goto reconnect;
3243                         }
3244
3245                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3246                         if (err)
3247                                 goto reconnect;
3248                         /* we expect NUL terminated string */
3249                         /* but just in case someone tries to be evil */
3250                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3251                         p->verify_alg[data_size-1] = 0;
3252
3253                 } else /* apv >= 89 */ {
3254                         /* we still expect NUL terminated strings */
3255                         /* but just in case someone tries to be evil */
3256                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3257                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3258                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3259                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3260                 }
3261
3262                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3263                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3264                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3265                                     old_net_conf->verify_alg, p->verify_alg);
3266                                 goto disconnect;
3267                         }
3268                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3269                                         p->verify_alg, "verify-alg");
3270                         if (IS_ERR(verify_tfm)) {
3271                                 verify_tfm = NULL;
3272                                 goto disconnect;
3273                         }
3274                 }
3275
3276                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3277                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3278                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3279                                     old_net_conf->csums_alg, p->csums_alg);
3280                                 goto disconnect;
3281                         }
3282                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3283                                         p->csums_alg, "csums-alg");
3284                         if (IS_ERR(csums_tfm)) {
3285                                 csums_tfm = NULL;
3286                                 goto disconnect;
3287                         }
3288                 }
3289
3290                 if (apv > 94 && new_disk_conf) {
3291                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3292                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3293                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3294                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3295
3296                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3297                         if (fifo_size != mdev->rs_plan_s->size) {
3298                                 new_plan = fifo_alloc(fifo_size);
3299                                 if (!new_plan) {
3300                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3301                                         put_ldev(mdev);
3302                                         goto disconnect;
3303                                 }
3304                         }
3305                 }
3306
3307                 if (verify_tfm || csums_tfm) {
3308                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3309                         if (!new_net_conf) {
3310                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3311                                 goto disconnect;
3312                         }
3313
3314                         *new_net_conf = *old_net_conf;
3315
3316                         if (verify_tfm) {
3317                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3318                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3319                                 crypto_free_hash(mdev->tconn->verify_tfm);
3320                                 mdev->tconn->verify_tfm = verify_tfm;
3321                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3322                         }
3323                         if (csums_tfm) {
3324                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3325                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3326                                 crypto_free_hash(mdev->tconn->csums_tfm);
3327                                 mdev->tconn->csums_tfm = csums_tfm;
3328                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3329                         }
3330                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3331                 }
3332         }
3333
3334         if (new_disk_conf) {
3335                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3336                 put_ldev(mdev);
3337         }
3338
3339         if (new_plan) {
3340                 old_plan = mdev->rs_plan_s;
3341                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3342         }
3343
3344         mutex_unlock(&mdev->tconn->conf_update);
3345         synchronize_rcu();
3346         if (new_net_conf)
3347                 kfree(old_net_conf);
3348         kfree(old_disk_conf);
3349         kfree(old_plan);
3350
3351         return 0;
3352
3353 reconnect:
3354         if (new_disk_conf) {
3355                 put_ldev(mdev);
3356                 kfree(new_disk_conf);
3357         }
3358         mutex_unlock(&mdev->tconn->conf_update);
3359         return -EIO;
3360
3361 disconnect:
3362         kfree(new_plan);
3363         if (new_disk_conf) {
3364                 put_ldev(mdev);
3365                 kfree(new_disk_conf);
3366         }
3367         mutex_unlock(&mdev->tconn->conf_update);
3368         /* just for completeness: actually not needed,
3369          * as this is not reached if csums_tfm was ok. */
3370         crypto_free_hash(csums_tfm);
3371         /* but free the verify_tfm again, if csums_tfm did not work out */
3372         crypto_free_hash(verify_tfm);
3373         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3374         return -EIO;
3375 }
3376
3377 /* warn if the arguments differ by more than 12.5% */
3378 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3379         const char *s, sector_t a, sector_t b)
3380 {
3381         sector_t d;
3382         if (a == 0 || b == 0)
3383                 return;
3384         d = (a > b) ? (a - b) : (b - a);
3385         if (d > (a>>3) || d > (b>>3))
3386                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3387                      (unsigned long long)a, (unsigned long long)b);
3388 }
3389
3390 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3391 {
3392         struct drbd_conf *mdev;
3393         struct p_sizes *p = pi->data;
3394         enum determine_dev_size dd = unchanged;
3395         sector_t p_size, p_usize, my_usize;
3396         int ldsc = 0; /* local disk size changed */
3397         enum dds_flags ddsf;
3398
3399         mdev = vnr_to_mdev(tconn, pi->vnr);
3400         if (!mdev)
3401                 return config_unknown_volume(tconn, pi);
3402
3403         p_size = be64_to_cpu(p->d_size);
3404         p_usize = be64_to_cpu(p->u_size);
3405
3406         /* just store the peer's disk size for now.
3407          * we still need to figure out whether we accept that. */
3408         mdev->p_size = p_size;
3409
3410         if (get_ldev(mdev)) {
3411                 rcu_read_lock();
3412                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3413                 rcu_read_unlock();
3414
3415                 warn_if_differ_considerably(mdev, "lower level device sizes",
3416                            p_size, drbd_get_max_capacity(mdev->ldev));
3417                 warn_if_differ_considerably(mdev, "user requested size",
3418                                             p_usize, my_usize);
3419
3420                 /* if this is the first connect, or an otherwise expected
3421                  * param exchange, choose the minimum */
3422                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3423                         p_usize = min_not_zero(my_usize, p_usize);
3424
3425                 /* Never shrink a device with usable data during connect.
3426                    But allow online shrinking if we are connected. */
3427                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3428                     drbd_get_capacity(mdev->this_bdev) &&
3429                     mdev->state.disk >= D_OUTDATED &&
3430                     mdev->state.conn < C_CONNECTED) {
3431                         dev_err(DEV, "The peer's disk size is too small!\n");
3432                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3433                         put_ldev(mdev);
3434                         return -EIO;
3435                 }
3436
3437                 if (my_usize != p_usize) {
3438                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3439
3440                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3441                         if (!new_disk_conf) {
3442                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3443                                 put_ldev(mdev);
3444                                 return -ENOMEM;
3445                         }
3446
3447                         mutex_lock(&mdev->tconn->conf_update);
3448                         old_disk_conf = mdev->ldev->disk_conf;
3449                         *new_disk_conf = *old_disk_conf;
3450                         new_disk_conf->disk_size = p_usize;
3451
3452                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3453                         mutex_unlock(&mdev->tconn->conf_update);
3454                         synchronize_rcu();
3455                         kfree(old_disk_conf);
3456
3457                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3458                                  (unsigned long)my_usize);
3459                 }
3460
3461                 put_ldev(mdev);
3462         }
3463
3464         ddsf = be16_to_cpu(p->dds_flags);
3465         if (get_ldev(mdev)) {
3466                 dd = drbd_determine_dev_size(mdev, ddsf);
3467                 put_ldev(mdev);
3468                 if (dd == dev_size_error)
3469                         return -EIO;
3470                 drbd_md_sync(mdev);
3471         } else {
3472                 /* I am diskless, need to accept the peer's size. */
3473                 drbd_set_my_capacity(mdev, p_size);
3474         }
3475
3476         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3477         drbd_reconsider_max_bio_size(mdev);
3478
3479         if (get_ldev(mdev)) {
3480                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3481                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3482                         ldsc = 1;
3483                 }
3484
3485                 put_ldev(mdev);
3486         }
3487
3488         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3489                 if (be64_to_cpu(p->c_size) !=
3490                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3491                         /* we have different sizes, probably peer
3492                          * needs to know my new size... */
3493                         drbd_send_sizes(mdev, 0, ddsf);
3494                 }
3495                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3496                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3497                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3498                             mdev->state.disk >= D_INCONSISTENT) {
3499                                 if (ddsf & DDSF_NO_RESYNC)
3500                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3501                                 else
3502                                         resync_after_online_grow(mdev);
3503                         } else
3504                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3505                 }
3506         }
3507
3508         return 0;
3509 }
3510
3511 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3512 {
3513         struct drbd_conf *mdev;
3514         struct p_uuids *p = pi->data;
3515         u64 *p_uuid;
3516         int i, updated_uuids = 0;
3517
3518         mdev = vnr_to_mdev(tconn, pi->vnr);
3519         if (!mdev)
3520                 return config_unknown_volume(tconn, pi);
3521
3522         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3523
3524         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3525                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3526
3527         kfree(mdev->p_uuid);
3528         mdev->p_uuid = p_uuid;
3529
3530         if (mdev->state.conn < C_CONNECTED &&
3531             mdev->state.disk < D_INCONSISTENT &&
3532             mdev->state.role == R_PRIMARY &&
3533             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3534                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3535                     (unsigned long long)mdev->ed_uuid);
3536                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3537                 return -EIO;
3538         }
3539
3540         if (get_ldev(mdev)) {
3541                 int skip_initial_sync =
3542                         mdev->state.conn == C_CONNECTED &&
3543                         mdev->tconn->agreed_pro_version >= 90 &&
3544                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3545                         (p_uuid[UI_FLAGS] & 8);
3546                 if (skip_initial_sync) {
3547                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3548                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3549                                         "clear_n_write from receive_uuids",
3550                                         BM_LOCKED_TEST_ALLOWED);
3551                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3552                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3553                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3554                                         CS_VERBOSE, NULL);
3555                         drbd_md_sync(mdev);
3556                         updated_uuids = 1;
3557                 }
3558                 put_ldev(mdev);
3559         } else if (mdev->state.disk < D_INCONSISTENT &&
3560                    mdev->state.role == R_PRIMARY) {
3561                 /* I am a diskless primary, the peer just created a new current UUID
3562                    for me. */
3563                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3564         }
3565
3566         /* Before we test for the disk state, we should wait until an eventually
3567            ongoing cluster wide state change is finished. That is important if
3568            we are primary and are detaching from our disk. We need to see the
3569            new disk state... */
3570         mutex_lock(mdev->state_mutex);
3571         mutex_unlock(mdev->state_mutex);
3572         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3573                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3574
3575         if (updated_uuids)
3576                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3577
3578         return 0;
3579 }
3580
3581 /**
3582  * convert_state() - Converts the peer's view of the cluster state to our point of view
3583  * @ps:         The state as seen by the peer.
3584  */
3585 static union drbd_state convert_state(union drbd_state ps)
3586 {
3587         union drbd_state ms;
3588
3589         static enum drbd_conns c_tab[] = {
3590                 [C_CONNECTED] = C_CONNECTED,
3591
3592                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3593                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3594                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3595                 [C_VERIFY_S]       = C_VERIFY_T,
3596                 [C_MASK]   = C_MASK,
3597         };
3598
3599         ms.i = ps.i;
3600
3601         ms.conn = c_tab[ps.conn];
3602         ms.peer = ps.role;
3603         ms.role = ps.peer;
3604         ms.pdsk = ps.disk;
3605         ms.disk = ps.pdsk;
3606         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3607
3608         return ms;
3609 }
3610
3611 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3612 {
3613         struct drbd_conf *mdev;
3614         struct p_req_state *p = pi->data;
3615         union drbd_state mask, val;
3616         enum drbd_state_rv rv;
3617
3618         mdev = vnr_to_mdev(tconn, pi->vnr);
3619         if (!mdev)
3620                 return -EIO;
3621
3622         mask.i = be32_to_cpu(p->mask);
3623         val.i = be32_to_cpu(p->val);
3624
3625         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3626             mutex_is_locked(mdev->state_mutex)) {
3627                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3628                 return 0;
3629         }
3630
3631         mask = convert_state(mask);
3632         val = convert_state(val);
3633
3634         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3635         drbd_send_sr_reply(mdev, rv);
3636
3637         drbd_md_sync(mdev);
3638
3639         return 0;
3640 }
3641
3642 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3643 {
3644         struct p_req_state *p = pi->data;
3645         union drbd_state mask, val;
3646         enum drbd_state_rv rv;
3647
3648         mask.i = be32_to_cpu(p->mask);
3649         val.i = be32_to_cpu(p->val);
3650
3651         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3652             mutex_is_locked(&tconn->cstate_mutex)) {
3653                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3654                 return 0;
3655         }
3656
3657         mask = convert_state(mask);
3658         val = convert_state(val);
3659
3660         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3661         conn_send_sr_reply(tconn, rv);
3662
3663         return 0;
3664 }
3665
3666 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3667 {
3668         struct drbd_conf *mdev;
3669         struct p_state *p = pi->data;
3670         union drbd_state os, ns, peer_state;
3671         enum drbd_disk_state real_peer_disk;
3672         enum chg_state_flags cs_flags;
3673         int rv;
3674
3675         mdev = vnr_to_mdev(tconn, pi->vnr);
3676         if (!mdev)
3677                 return config_unknown_volume(tconn, pi);
3678
3679         peer_state.i = be32_to_cpu(p->state);
3680
3681         real_peer_disk = peer_state.disk;
3682         if (peer_state.disk == D_NEGOTIATING) {
3683                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3684                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3685         }
3686
3687         spin_lock_irq(&mdev->tconn->req_lock);
3688  retry:
3689         os = ns = drbd_read_state(mdev);
3690         spin_unlock_irq(&mdev->tconn->req_lock);
3691
3692         /* peer says his disk is uptodate, while we think it is inconsistent,
3693          * and this happens while we think we have a sync going on. */
3694         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3695             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3696                 /* If we are (becoming) SyncSource, but peer is still in sync
3697                  * preparation, ignore its uptodate-ness to avoid flapping, it
3698                  * will change to inconsistent once the peer reaches active
3699                  * syncing states.
3700                  * It may have changed syncer-paused flags, however, so we
3701                  * cannot ignore this completely. */
3702                 if (peer_state.conn > C_CONNECTED &&
3703                     peer_state.conn < C_SYNC_SOURCE)
3704                         real_peer_disk = D_INCONSISTENT;
3705
3706                 /* if peer_state changes to connected at the same time,
3707                  * it explicitly notifies us that it finished resync.
3708                  * Maybe we should finish it up, too? */
3709                 else if (os.conn >= C_SYNC_SOURCE &&
3710                          peer_state.conn == C_CONNECTED) {
3711                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3712                                 drbd_resync_finished(mdev);
3713                         return 0;
3714                 }
3715         }
3716
3717         /* peer says his disk is inconsistent, while we think it is uptodate,
3718          * and this happens while the peer still thinks we have a sync going on,
3719          * but we think we are already done with the sync.
3720          * We ignore this to avoid flapping pdsk.
3721          * This should not happen, if the peer is a recent version of drbd. */
3722         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3723             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3724                 real_peer_disk = D_UP_TO_DATE;
3725
3726         if (ns.conn == C_WF_REPORT_PARAMS)
3727                 ns.conn = C_CONNECTED;
3728
3729         if (peer_state.conn == C_AHEAD)
3730                 ns.conn = C_BEHIND;
3731
3732         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3733             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3734                 int cr; /* consider resync */
3735
3736                 /* if we established a new connection */
3737                 cr  = (os.conn < C_CONNECTED);
3738                 /* if we had an established connection
3739                  * and one of the nodes newly attaches a disk */
3740                 cr |= (os.conn == C_CONNECTED &&
3741                        (peer_state.disk == D_NEGOTIATING ||
3742                         os.disk == D_NEGOTIATING));
3743                 /* if we have both been inconsistent, and the peer has been
3744                  * forced to be UpToDate with --overwrite-data */
3745                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3746                 /* if we had been plain connected, and the admin requested to
3747                  * start a sync by "invalidate" or "invalidate-remote" */
3748                 cr |= (os.conn == C_CONNECTED &&
3749                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3750                                  peer_state.conn <= C_WF_BITMAP_T));
3751
3752                 if (cr)
3753                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3754
3755                 put_ldev(mdev);
3756                 if (ns.conn == C_MASK) {
3757                         ns.conn = C_CONNECTED;
3758                         if (mdev->state.disk == D_NEGOTIATING) {
3759                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3760                         } else if (peer_state.disk == D_NEGOTIATING) {
3761                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3762                                 peer_state.disk = D_DISKLESS;
3763                                 real_peer_disk = D_DISKLESS;
3764                         } else {
3765                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3766                                         return -EIO;
3767                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3768                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3769                                 return -EIO;
3770                         }
3771                 }
3772         }
3773
3774         spin_lock_irq(&mdev->tconn->req_lock);
3775         if (os.i != drbd_read_state(mdev).i)
3776                 goto retry;
3777         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3778         ns.peer = peer_state.role;
3779         ns.pdsk = real_peer_disk;
3780         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3781         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3782                 ns.disk = mdev->new_state_tmp.disk;
3783         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3784         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3785             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3786                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3787                    for temporal network outages! */
3788                 spin_unlock_irq(&mdev->tconn->req_lock);
3789                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3790                 tl_clear(mdev->tconn);
3791                 drbd_uuid_new_current(mdev);
3792                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3793                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3794                 return -EIO;
3795         }
3796         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3797         ns = drbd_read_state(mdev);
3798         spin_unlock_irq(&mdev->tconn->req_lock);
3799
3800         if (rv < SS_SUCCESS) {
3801                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3802                 return -EIO;
3803         }
3804
3805         if (os.conn > C_WF_REPORT_PARAMS) {
3806                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3807                     peer_state.disk != D_NEGOTIATING ) {
3808                         /* we want resync, peer has not yet decided to sync... */
3809                         /* Nowadays only used when forcing a node into primary role and
3810                            setting its disk to UpToDate with that */
3811                         drbd_send_uuids(mdev);
3812                         drbd_send_state(mdev);
3813                 }
3814         }
3815
3816         mutex_lock(&mdev->tconn->conf_update);
3817         mdev->tconn->net_conf->discard_my_data = 0; /* without copy; single bit op is atomic */
3818         mutex_unlock(&mdev->tconn->conf_update);
3819
3820         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3821
3822         return 0;
3823 }
3824
3825 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3826 {
3827         struct drbd_conf *mdev;
3828         struct p_rs_uuid *p = pi->data;
3829
3830         mdev = vnr_to_mdev(tconn, pi->vnr);
3831         if (!mdev)
3832                 return -EIO;
3833
3834         wait_event(mdev->misc_wait,
3835                    mdev->state.conn == C_WF_SYNC_UUID ||
3836                    mdev->state.conn == C_BEHIND ||
3837                    mdev->state.conn < C_CONNECTED ||
3838                    mdev->state.disk < D_NEGOTIATING);
3839
3840         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3841
3842         /* Here the _drbd_uuid_ functions are right, current should
3843            _not_ be rotated into the history */
3844         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3845                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3846                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3847
3848                 drbd_print_uuids(mdev, "updated sync uuid");
3849                 drbd_start_resync(mdev, C_SYNC_TARGET);
3850
3851                 put_ldev(mdev);
3852         } else
3853                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3854
3855         return 0;
3856 }
3857
3858 /**
3859  * receive_bitmap_plain
3860  *
3861  * Return 0 when done, 1 when another iteration is needed, and a negative error
3862  * code upon failure.
3863  */
3864 static int
3865 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3866                      unsigned long *p, struct bm_xfer_ctx *c)
3867 {
3868         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3869                                  drbd_header_size(mdev->tconn);
3870         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3871                                        c->bm_words - c->word_offset);
3872         unsigned int want = num_words * sizeof(*p);
3873         int err;
3874
3875         if (want != size) {
3876                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3877                 return -EIO;
3878         }
3879         if (want == 0)
3880                 return 0;
3881         err = drbd_recv_all(mdev->tconn, p, want);
3882         if (err)
3883                 return err;
3884
3885         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3886
3887         c->word_offset += num_words;
3888         c->bit_offset = c->word_offset * BITS_PER_LONG;
3889         if (c->bit_offset > c->bm_bits)
3890                 c->bit_offset = c->bm_bits;
3891
3892         return 1;
3893 }
3894
3895 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3896 {
3897         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3898 }
3899
3900 static int dcbp_get_start(struct p_compressed_bm *p)
3901 {
3902         return (p->encoding & 0x80) != 0;
3903 }
3904
3905 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3906 {
3907         return (p->encoding >> 4) & 0x7;
3908 }
3909
3910 /**
3911  * recv_bm_rle_bits
3912  *
3913  * Return 0 when done, 1 when another iteration is needed, and a negative error
3914  * code upon failure.
3915  */
3916 static int
3917 recv_bm_rle_bits(struct drbd_conf *mdev,
3918                 struct p_compressed_bm *p,
3919                  struct bm_xfer_ctx *c,
3920                  unsigned int len)
3921 {
3922         struct bitstream bs;
3923         u64 look_ahead;
3924         u64 rl;
3925         u64 tmp;
3926         unsigned long s = c->bit_offset;
3927         unsigned long e;
3928         int toggle = dcbp_get_start(p);
3929         int have;
3930         int bits;
3931
3932         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3933
3934         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3935         if (bits < 0)
3936                 return -EIO;
3937
3938         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3939                 bits = vli_decode_bits(&rl, look_ahead);
3940                 if (bits <= 0)
3941                         return -EIO;
3942
3943                 if (toggle) {
3944                         e = s + rl -1;
3945                         if (e >= c->bm_bits) {
3946                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3947                                 return -EIO;
3948                         }
3949                         _drbd_bm_set_bits(mdev, s, e);
3950                 }
3951
3952                 if (have < bits) {
3953                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3954                                 have, bits, look_ahead,
3955                                 (unsigned int)(bs.cur.b - p->code),
3956                                 (unsigned int)bs.buf_len);
3957                         return -EIO;
3958                 }
3959                 look_ahead >>= bits;
3960                 have -= bits;
3961
3962                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3963                 if (bits < 0)
3964                         return -EIO;
3965                 look_ahead |= tmp << have;
3966                 have += bits;
3967         }
3968
3969         c->bit_offset = s;
3970         bm_xfer_ctx_bit_to_word_offset(c);
3971
3972         return (s != c->bm_bits);
3973 }
3974
3975 /**
3976  * decode_bitmap_c
3977  *
3978  * Return 0 when done, 1 when another iteration is needed, and a negative error
3979  * code upon failure.
3980  */
3981 static int
3982 decode_bitmap_c(struct drbd_conf *mdev,
3983                 struct p_compressed_bm *p,
3984                 struct bm_xfer_ctx *c,
3985                 unsigned int len)
3986 {
3987         if (dcbp_get_code(p) == RLE_VLI_Bits)
3988                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3989
3990         /* other variants had been implemented for evaluation,
3991          * but have been dropped as this one turned out to be "best"
3992          * during all our tests. */
3993
3994         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3995         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3996         return -EIO;
3997 }
3998
3999 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4000                 const char *direction, struct bm_xfer_ctx *c)
4001 {
4002         /* what would it take to transfer it "plaintext" */
4003         unsigned int header_size = drbd_header_size(mdev->tconn);
4004         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4005         unsigned int plain =
4006                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4007                 c->bm_words * sizeof(unsigned long);
4008         unsigned int total = c->bytes[0] + c->bytes[1];
4009         unsigned int r;
4010
4011         /* total can not be zero. but just in case: */
4012         if (total == 0)
4013                 return;
4014
4015         /* don't report if not compressed */
4016         if (total >= plain)
4017                 return;
4018
4019         /* total < plain. check for overflow, still */
4020         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4021                                     : (1000 * total / plain);
4022
4023         if (r > 1000)
4024                 r = 1000;
4025
4026         r = 1000 - r;
4027         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4028              "total %u; compression: %u.%u%%\n",
4029                         direction,
4030                         c->bytes[1], c->packets[1],
4031                         c->bytes[0], c->packets[0],
4032                         total, r/10, r % 10);
4033 }
4034
4035 /* Since we are processing the bitfield from lower addresses to higher,
4036    it does not matter if the process it in 32 bit chunks or 64 bit
4037    chunks as long as it is little endian. (Understand it as byte stream,
4038    beginning with the lowest byte...) If we would use big endian
4039    we would need to process it from the highest address to the lowest,
4040    in order to be agnostic to the 32 vs 64 bits issue.
4041
4042    returns 0 on failure, 1 if we successfully received it. */
4043 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4044 {
4045         struct drbd_conf *mdev;
4046         struct bm_xfer_ctx c;
4047         int err;
4048
4049         mdev = vnr_to_mdev(tconn, pi->vnr);
4050         if (!mdev)
4051                 return -EIO;
4052
4053         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4054         /* you are supposed to send additional out-of-sync information
4055          * if you actually set bits during this phase */
4056
4057         c = (struct bm_xfer_ctx) {
4058                 .bm_bits = drbd_bm_bits(mdev),
4059                 .bm_words = drbd_bm_words(mdev),
4060         };
4061
4062         for(;;) {
4063                 if (pi->cmd == P_BITMAP)
4064                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4065                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4066                         /* MAYBE: sanity check that we speak proto >= 90,
4067                          * and the feature is enabled! */
4068                         struct p_compressed_bm *p = pi->data;
4069
4070                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4071                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4072                                 err = -EIO;
4073                                 goto out;
4074                         }
4075                         if (pi->size <= sizeof(*p)) {
4076                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4077                                 err = -EIO;
4078                                 goto out;
4079                         }
4080                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4081                         if (err)
4082                                goto out;
4083                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4084                 } else {
4085                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4086                         err = -EIO;
4087                         goto out;
4088                 }
4089
4090                 c.packets[pi->cmd == P_BITMAP]++;
4091                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4092
4093                 if (err <= 0) {
4094                         if (err < 0)
4095                                 goto out;
4096                         break;
4097                 }
4098                 err = drbd_recv_header(mdev->tconn, pi);
4099                 if (err)
4100                         goto out;
4101         }
4102
4103         INFO_bm_xfer_stats(mdev, "receive", &c);
4104
4105         if (mdev->state.conn == C_WF_BITMAP_T) {
4106                 enum drbd_state_rv rv;
4107
4108                 err = drbd_send_bitmap(mdev);
4109                 if (err)
4110                         goto out;
4111                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4112                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4113                 D_ASSERT(rv == SS_SUCCESS);
4114         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4115                 /* admin may have requested C_DISCONNECTING,
4116                  * other threads may have noticed network errors */
4117                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4118                     drbd_conn_str(mdev->state.conn));
4119         }
4120         err = 0;
4121
4122  out:
4123         drbd_bm_unlock(mdev);
4124         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4125                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4126         return err;
4127 }
4128
4129 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4130 {
4131         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4132                  pi->cmd, pi->size);
4133
4134         return ignore_remaining_packet(tconn, pi);
4135 }
4136
4137 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4138 {
4139         /* Make sure we've acked all the TCP data associated
4140          * with the data requests being unplugged */
4141         drbd_tcp_quickack(tconn->data.socket);
4142
4143         return 0;
4144 }
4145
4146 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4147 {
4148         struct drbd_conf *mdev;
4149         struct p_block_desc *p = pi->data;
4150
4151         mdev = vnr_to_mdev(tconn, pi->vnr);
4152         if (!mdev)
4153                 return -EIO;
4154
4155         switch (mdev->state.conn) {
4156         case C_WF_SYNC_UUID:
4157         case C_WF_BITMAP_T:
4158         case C_BEHIND:
4159                         break;
4160         default:
4161                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4162                                 drbd_conn_str(mdev->state.conn));
4163         }
4164
4165         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4166
4167         return 0;
4168 }
4169
4170 struct data_cmd {
4171         int expect_payload;
4172         size_t pkt_size;
4173         int (*fn)(struct drbd_tconn *, struct packet_info *);
4174 };
4175
4176 static struct data_cmd drbd_cmd_handler[] = {
4177         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4178         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4179         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4180         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4181         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4182         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4183         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4184         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4185         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4186         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4187         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4188         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4189         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4190         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4191         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4192         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4193         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4194         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4195         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4196         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4197         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4198         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4199         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4200 };
4201
4202 static void drbdd(struct drbd_tconn *tconn)
4203 {
4204         struct packet_info pi;
4205         size_t shs; /* sub header size */
4206         int err;
4207
4208         while (get_t_state(&tconn->receiver) == RUNNING) {
4209                 struct data_cmd *cmd;
4210
4211                 drbd_thread_current_set_cpu(&tconn->receiver);
4212                 if (drbd_recv_header(tconn, &pi))
4213                         goto err_out;
4214
4215                 cmd = &drbd_cmd_handler[pi.cmd];
4216                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4217                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4218                         goto err_out;
4219                 }
4220
4221                 shs = cmd->pkt_size;
4222                 if (pi.size > shs && !cmd->expect_payload) {
4223                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4224                         goto err_out;
4225                 }
4226
4227                 if (shs) {
4228                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4229                         if (err)
4230                                 goto err_out;
4231                         pi.size -= shs;
4232                 }
4233
4234                 err = cmd->fn(tconn, &pi);
4235                 if (err) {
4236                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4237                                  cmdname(pi.cmd), err, pi.size);
4238                         goto err_out;
4239                 }
4240         }
4241         return;
4242
4243     err_out:
4244         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4245 }
4246
4247 void conn_flush_workqueue(struct drbd_tconn *tconn)
4248 {
4249         struct drbd_wq_barrier barr;
4250
4251         barr.w.cb = w_prev_work_done;
4252         barr.w.tconn = tconn;
4253         init_completion(&barr.done);
4254         drbd_queue_work(&tconn->data.work, &barr.w);
4255         wait_for_completion(&barr.done);
4256 }
4257
4258 static void conn_disconnect(struct drbd_tconn *tconn)
4259 {
4260         struct drbd_conf *mdev;
4261         enum drbd_conns oc;
4262         int vnr, rv = SS_UNKNOWN_ERROR;
4263
4264         if (tconn->cstate == C_STANDALONE)
4265                 return;
4266
4267         /* asender does not clean up anything. it must not interfere, either */
4268         drbd_thread_stop(&tconn->asender);
4269         drbd_free_sock(tconn);
4270
4271         rcu_read_lock();
4272         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4273                 kref_get(&mdev->kref);
4274                 rcu_read_unlock();
4275                 drbd_disconnected(mdev);
4276                 kref_put(&mdev->kref, &drbd_minor_destroy);
4277                 rcu_read_lock();
4278         }
4279         rcu_read_unlock();
4280
4281         conn_info(tconn, "Connection closed\n");
4282
4283         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4284                 conn_try_outdate_peer_async(tconn);
4285
4286         spin_lock_irq(&tconn->req_lock);
4287         oc = tconn->cstate;
4288         if (oc >= C_UNCONNECTED)
4289                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4290
4291         spin_unlock_irq(&tconn->req_lock);
4292
4293         if (oc == C_DISCONNECTING)
4294                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4295 }
4296
4297 static int drbd_disconnected(struct drbd_conf *mdev)
4298 {
4299         enum drbd_fencing_p fp;
4300         unsigned int i;
4301
4302         /* wait for current activity to cease. */
4303         spin_lock_irq(&mdev->tconn->req_lock);
4304         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4305         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4306         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4307         spin_unlock_irq(&mdev->tconn->req_lock);
4308
4309         /* We do not have data structures that would allow us to
4310          * get the rs_pending_cnt down to 0 again.
4311          *  * On C_SYNC_TARGET we do not have any data structures describing
4312          *    the pending RSDataRequest's we have sent.
4313          *  * On C_SYNC_SOURCE there is no data structure that tracks
4314          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4315          *  And no, it is not the sum of the reference counts in the
4316          *  resync_LRU. The resync_LRU tracks the whole operation including
4317          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4318          *  on the fly. */
4319         drbd_rs_cancel_all(mdev);
4320         mdev->rs_total = 0;
4321         mdev->rs_failed = 0;
4322         atomic_set(&mdev->rs_pending_cnt, 0);
4323         wake_up(&mdev->misc_wait);
4324
4325         del_timer(&mdev->request_timer);
4326
4327         del_timer_sync(&mdev->resync_timer);
4328         resync_timer_fn((unsigned long)mdev);
4329
4330         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4331          * w_make_resync_request etc. which may still be on the worker queue
4332          * to be "canceled" */
4333         drbd_flush_workqueue(mdev);
4334
4335         drbd_finish_peer_reqs(mdev);
4336
4337         kfree(mdev->p_uuid);
4338         mdev->p_uuid = NULL;
4339
4340         if (!drbd_suspended(mdev))
4341                 tl_clear(mdev->tconn);
4342
4343         drbd_md_sync(mdev);
4344
4345         fp = FP_DONT_CARE;
4346         if (get_ldev(mdev)) {
4347                 rcu_read_lock();
4348                 fp = rcu_dereference(mdev->ldev->disk_conf)->fencing;
4349                 rcu_read_unlock();
4350                 put_ldev(mdev);
4351         }
4352
4353         /* serialize with bitmap writeout triggered by the state change,
4354          * if any. */
4355         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4356
4357         /* tcp_close and release of sendpage pages can be deferred.  I don't
4358          * want to use SO_LINGER, because apparently it can be deferred for
4359          * more than 20 seconds (longest time I checked).
4360          *
4361          * Actually we don't care for exactly when the network stack does its
4362          * put_page(), but release our reference on these pages right here.
4363          */
4364         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4365         if (i)
4366                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4367         i = atomic_read(&mdev->pp_in_use_by_net);
4368         if (i)
4369                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4370         i = atomic_read(&mdev->pp_in_use);
4371         if (i)
4372                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4373
4374         D_ASSERT(list_empty(&mdev->read_ee));
4375         D_ASSERT(list_empty(&mdev->active_ee));
4376         D_ASSERT(list_empty(&mdev->sync_ee));
4377         D_ASSERT(list_empty(&mdev->done_ee));
4378
4379         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4380         atomic_set(&mdev->current_epoch->epoch_size, 0);
4381         D_ASSERT(list_empty(&mdev->current_epoch->list));
4382
4383         return 0;
4384 }
4385
4386 /*
4387  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4388  * we can agree on is stored in agreed_pro_version.
4389  *
4390  * feature flags and the reserved array should be enough room for future
4391  * enhancements of the handshake protocol, and possible plugins...
4392  *
4393  * for now, they are expected to be zero, but ignored.
4394  */
4395 static int drbd_send_features(struct drbd_tconn *tconn)
4396 {
4397         struct drbd_socket *sock;
4398         struct p_connection_features *p;
4399
4400         sock = &tconn->data;
4401         p = conn_prepare_command(tconn, sock);
4402         if (!p)
4403                 return -EIO;
4404         memset(p, 0, sizeof(*p));
4405         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4406         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4407         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4408 }
4409
4410 /*
4411  * return values:
4412  *   1 yes, we have a valid connection
4413  *   0 oops, did not work out, please try again
4414  *  -1 peer talks different language,
4415  *     no point in trying again, please go standalone.
4416  */
4417 static int drbd_do_features(struct drbd_tconn *tconn)
4418 {
4419         /* ASSERT current == tconn->receiver ... */
4420         struct p_connection_features *p;
4421         const int expect = sizeof(struct p_connection_features);
4422         struct packet_info pi;
4423         int err;
4424
4425         err = drbd_send_features(tconn);
4426         if (err)
4427                 return 0;
4428
4429         err = drbd_recv_header(tconn, &pi);
4430         if (err)
4431                 return 0;
4432
4433         if (pi.cmd != P_CONNECTION_FEATURES) {
4434                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4435                      cmdname(pi.cmd), pi.cmd);
4436                 return -1;
4437         }
4438
4439         if (pi.size != expect) {
4440                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4441                      expect, pi.size);
4442                 return -1;
4443         }
4444
4445         p = pi.data;
4446         err = drbd_recv_all_warn(tconn, p, expect);
4447         if (err)
4448                 return 0;
4449
4450         p->protocol_min = be32_to_cpu(p->protocol_min);
4451         p->protocol_max = be32_to_cpu(p->protocol_max);
4452         if (p->protocol_max == 0)
4453                 p->protocol_max = p->protocol_min;
4454
4455         if (PRO_VERSION_MAX < p->protocol_min ||
4456             PRO_VERSION_MIN > p->protocol_max)
4457                 goto incompat;
4458
4459         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4460
4461         conn_info(tconn, "Handshake successful: "
4462              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4463
4464         return 1;
4465
4466  incompat:
4467         conn_err(tconn, "incompatible DRBD dialects: "
4468             "I support %d-%d, peer supports %d-%d\n",
4469             PRO_VERSION_MIN, PRO_VERSION_MAX,
4470             p->protocol_min, p->protocol_max);
4471         return -1;
4472 }
4473
4474 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4475 static int drbd_do_auth(struct drbd_tconn *tconn)
4476 {
4477         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4478         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4479         return -1;
4480 }
4481 #else
4482 #define CHALLENGE_LEN 64
4483
4484 /* Return value:
4485         1 - auth succeeded,
4486         0 - failed, try again (network error),
4487         -1 - auth failed, don't try again.
4488 */
4489
4490 static int drbd_do_auth(struct drbd_tconn *tconn)
4491 {
4492         struct drbd_socket *sock;
4493         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4494         struct scatterlist sg;
4495         char *response = NULL;
4496         char *right_response = NULL;
4497         char *peers_ch = NULL;
4498         unsigned int key_len;
4499         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4500         unsigned int resp_size;
4501         struct hash_desc desc;
4502         struct packet_info pi;
4503         struct net_conf *nc;
4504         int err, rv;
4505
4506         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4507
4508         rcu_read_lock();
4509         nc = rcu_dereference(tconn->net_conf);
4510         key_len = strlen(nc->shared_secret);
4511         memcpy(secret, nc->shared_secret, key_len);
4512         rcu_read_unlock();
4513
4514         desc.tfm = tconn->cram_hmac_tfm;
4515         desc.flags = 0;
4516
4517         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4518         if (rv) {
4519                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4520                 rv = -1;
4521                 goto fail;
4522         }
4523
4524         get_random_bytes(my_challenge, CHALLENGE_LEN);
4525
4526         sock = &tconn->data;
4527         if (!conn_prepare_command(tconn, sock)) {
4528                 rv = 0;
4529                 goto fail;
4530         }
4531         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4532                                 my_challenge, CHALLENGE_LEN);
4533         if (!rv)
4534                 goto fail;
4535
4536         err = drbd_recv_header(tconn, &pi);
4537         if (err) {
4538                 rv = 0;
4539                 goto fail;
4540         }
4541
4542         if (pi.cmd != P_AUTH_CHALLENGE) {
4543                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4544                     cmdname(pi.cmd), pi.cmd);
4545                 rv = 0;
4546                 goto fail;
4547         }
4548
4549         if (pi.size > CHALLENGE_LEN * 2) {
4550                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4551                 rv = -1;
4552                 goto fail;
4553         }
4554
4555         peers_ch = kmalloc(pi.size, GFP_NOIO);
4556         if (peers_ch == NULL) {
4557                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4558                 rv = -1;
4559                 goto fail;
4560         }
4561
4562         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4563         if (err) {
4564                 rv = 0;
4565                 goto fail;
4566         }
4567
4568         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4569         response = kmalloc(resp_size, GFP_NOIO);
4570         if (response == NULL) {
4571                 conn_err(tconn, "kmalloc of response failed\n");
4572                 rv = -1;
4573                 goto fail;
4574         }
4575
4576         sg_init_table(&sg, 1);
4577         sg_set_buf(&sg, peers_ch, pi.size);
4578
4579         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4580         if (rv) {
4581                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4582                 rv = -1;
4583                 goto fail;
4584         }
4585
4586         if (!conn_prepare_command(tconn, sock)) {
4587                 rv = 0;
4588                 goto fail;
4589         }
4590         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4591                                 response, resp_size);
4592         if (!rv)
4593                 goto fail;
4594
4595         err = drbd_recv_header(tconn, &pi);
4596         if (err) {
4597                 rv = 0;
4598                 goto fail;
4599         }
4600
4601         if (pi.cmd != P_AUTH_RESPONSE) {
4602                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4603                         cmdname(pi.cmd), pi.cmd);
4604                 rv = 0;
4605                 goto fail;
4606         }
4607
4608         if (pi.size != resp_size) {
4609                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4610                 rv = 0;
4611                 goto fail;
4612         }
4613
4614         err = drbd_recv_all_warn(tconn, response , resp_size);
4615         if (err) {
4616                 rv = 0;
4617                 goto fail;
4618         }
4619
4620         right_response = kmalloc(resp_size, GFP_NOIO);
4621         if (right_response == NULL) {
4622                 conn_err(tconn, "kmalloc of right_response failed\n");
4623                 rv = -1;
4624                 goto fail;
4625         }
4626
4627         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4628
4629         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4630         if (rv) {
4631                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4632                 rv = -1;
4633                 goto fail;
4634         }
4635
4636         rv = !memcmp(response, right_response, resp_size);
4637
4638         if (rv)
4639                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4640                      resp_size);
4641         else
4642                 rv = -1;
4643
4644  fail:
4645         kfree(peers_ch);
4646         kfree(response);
4647         kfree(right_response);
4648
4649         return rv;
4650 }
4651 #endif
4652
4653 int drbdd_init(struct drbd_thread *thi)
4654 {
4655         struct drbd_tconn *tconn = thi->tconn;
4656         int h;
4657
4658         conn_info(tconn, "receiver (re)started\n");
4659
4660         do {
4661                 h = conn_connect(tconn);
4662                 if (h == 0) {
4663                         conn_disconnect(tconn);
4664                         schedule_timeout_interruptible(HZ);
4665                 }
4666                 if (h == -1) {
4667                         conn_warn(tconn, "Discarding network configuration.\n");
4668                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4669                 }
4670         } while (h == 0);
4671
4672         if (h > 0)
4673                 drbdd(tconn);
4674
4675         conn_disconnect(tconn);
4676
4677         conn_info(tconn, "receiver terminated\n");
4678         return 0;
4679 }
4680
4681 /* ********* acknowledge sender ******** */
4682
4683 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4684 {
4685         struct p_req_state_reply *p = pi->data;
4686         int retcode = be32_to_cpu(p->retcode);
4687
4688         if (retcode >= SS_SUCCESS) {
4689                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4690         } else {
4691                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4692                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4693                          drbd_set_st_err_str(retcode), retcode);
4694         }
4695         wake_up(&tconn->ping_wait);
4696
4697         return 0;
4698 }
4699
4700 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4701 {
4702         struct drbd_conf *mdev;
4703         struct p_req_state_reply *p = pi->data;
4704         int retcode = be32_to_cpu(p->retcode);
4705
4706         mdev = vnr_to_mdev(tconn, pi->vnr);
4707         if (!mdev)
4708                 return -EIO;
4709
4710         if (retcode >= SS_SUCCESS) {
4711                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4712         } else {
4713                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4714                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4715                         drbd_set_st_err_str(retcode), retcode);
4716         }
4717         wake_up(&mdev->state_wait);
4718
4719         return 0;
4720 }
4721
4722 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4723 {
4724         return drbd_send_ping_ack(tconn);
4725
4726 }
4727
4728 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4729 {
4730         /* restore idle timeout */
4731         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4732         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4733                 wake_up(&tconn->ping_wait);
4734
4735         return 0;
4736 }
4737
4738 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4739 {
4740         struct drbd_conf *mdev;
4741         struct p_block_ack *p = pi->data;
4742         sector_t sector = be64_to_cpu(p->sector);
4743         int blksize = be32_to_cpu(p->blksize);
4744
4745         mdev = vnr_to_mdev(tconn, pi->vnr);
4746         if (!mdev)
4747                 return -EIO;
4748
4749         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4750
4751         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4752
4753         if (get_ldev(mdev)) {
4754                 drbd_rs_complete_io(mdev, sector);
4755                 drbd_set_in_sync(mdev, sector, blksize);
4756                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4757                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4758                 put_ldev(mdev);
4759         }
4760         dec_rs_pending(mdev);
4761         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4762
4763         return 0;
4764 }
4765
4766 static int
4767 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4768                               struct rb_root *root, const char *func,
4769                               enum drbd_req_event what, bool missing_ok)
4770 {
4771         struct drbd_request *req;
4772         struct bio_and_error m;
4773
4774         spin_lock_irq(&mdev->tconn->req_lock);
4775         req = find_request(mdev, root, id, sector, missing_ok, func);
4776         if (unlikely(!req)) {
4777                 spin_unlock_irq(&mdev->tconn->req_lock);
4778                 return -EIO;
4779         }
4780         __req_mod(req, what, &m);
4781         spin_unlock_irq(&mdev->tconn->req_lock);
4782
4783         if (m.bio)
4784                 complete_master_bio(mdev, &m);
4785         return 0;
4786 }
4787
4788 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4789 {
4790         struct drbd_conf *mdev;
4791         struct p_block_ack *p = pi->data;
4792         sector_t sector = be64_to_cpu(p->sector);
4793         int blksize = be32_to_cpu(p->blksize);
4794         enum drbd_req_event what;
4795
4796         mdev = vnr_to_mdev(tconn, pi->vnr);
4797         if (!mdev)
4798                 return -EIO;
4799
4800         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4801
4802         if (p->block_id == ID_SYNCER) {
4803                 drbd_set_in_sync(mdev, sector, blksize);
4804                 dec_rs_pending(mdev);
4805                 return 0;
4806         }
4807         switch (pi->cmd) {
4808         case P_RS_WRITE_ACK:
4809                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4810                 break;
4811         case P_WRITE_ACK:
4812                 what = WRITE_ACKED_BY_PEER;
4813                 break;
4814         case P_RECV_ACK:
4815                 what = RECV_ACKED_BY_PEER;
4816                 break;
4817         case P_DISCARD_WRITE:
4818                 what = DISCARD_WRITE;
4819                 break;
4820         case P_RETRY_WRITE:
4821                 what = POSTPONE_WRITE;
4822                 break;
4823         default:
4824                 BUG();
4825         }
4826
4827         return validate_req_change_req_state(mdev, p->block_id, sector,
4828                                              &mdev->write_requests, __func__,
4829                                              what, false);
4830 }
4831
4832 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4833 {
4834         struct drbd_conf *mdev;
4835         struct p_block_ack *p = pi->data;
4836         sector_t sector = be64_to_cpu(p->sector);
4837         int size = be32_to_cpu(p->blksize);
4838         int err;
4839
4840         mdev = vnr_to_mdev(tconn, pi->vnr);
4841         if (!mdev)
4842                 return -EIO;
4843
4844         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4845
4846         if (p->block_id == ID_SYNCER) {
4847                 dec_rs_pending(mdev);
4848                 drbd_rs_failed_io(mdev, sector, size);
4849                 return 0;
4850         }
4851
4852         err = validate_req_change_req_state(mdev, p->block_id, sector,
4853                                             &mdev->write_requests, __func__,
4854                                             NEG_ACKED, true);
4855         if (err) {
4856                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4857                    The master bio might already be completed, therefore the
4858                    request is no longer in the collision hash. */
4859                 /* In Protocol B we might already have got a P_RECV_ACK
4860                    but then get a P_NEG_ACK afterwards. */
4861                 drbd_set_out_of_sync(mdev, sector, size);
4862         }
4863         return 0;
4864 }
4865
4866 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4867 {
4868         struct drbd_conf *mdev;
4869         struct p_block_ack *p = pi->data;
4870         sector_t sector = be64_to_cpu(p->sector);
4871
4872         mdev = vnr_to_mdev(tconn, pi->vnr);
4873         if (!mdev)
4874                 return -EIO;
4875
4876         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4877
4878         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4879             (unsigned long long)sector, be32_to_cpu(p->blksize));
4880
4881         return validate_req_change_req_state(mdev, p->block_id, sector,
4882                                              &mdev->read_requests, __func__,
4883                                              NEG_ACKED, false);
4884 }
4885
4886 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4887 {
4888         struct drbd_conf *mdev;
4889         sector_t sector;
4890         int size;
4891         struct p_block_ack *p = pi->data;
4892
4893         mdev = vnr_to_mdev(tconn, pi->vnr);
4894         if (!mdev)
4895                 return -EIO;
4896
4897         sector = be64_to_cpu(p->sector);
4898         size = be32_to_cpu(p->blksize);
4899
4900         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4901
4902         dec_rs_pending(mdev);
4903
4904         if (get_ldev_if_state(mdev, D_FAILED)) {
4905                 drbd_rs_complete_io(mdev, sector);
4906                 switch (pi->cmd) {
4907                 case P_NEG_RS_DREPLY:
4908                         drbd_rs_failed_io(mdev, sector, size);
4909                 case P_RS_CANCEL:
4910                         break;
4911                 default:
4912                         BUG();
4913                 }
4914                 put_ldev(mdev);
4915         }
4916
4917         return 0;
4918 }
4919
4920 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4921 {
4922         struct drbd_conf *mdev;
4923         struct p_barrier_ack *p = pi->data;
4924
4925         mdev = vnr_to_mdev(tconn, pi->vnr);
4926         if (!mdev)
4927                 return -EIO;
4928
4929         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4930
4931         if (mdev->state.conn == C_AHEAD &&
4932             atomic_read(&mdev->ap_in_flight) == 0 &&
4933             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4934                 mdev->start_resync_timer.expires = jiffies + HZ;
4935                 add_timer(&mdev->start_resync_timer);
4936         }
4937
4938         return 0;
4939 }
4940
4941 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4942 {
4943         struct drbd_conf *mdev;
4944         struct p_block_ack *p = pi->data;
4945         struct drbd_work *w;
4946         sector_t sector;
4947         int size;
4948
4949         mdev = vnr_to_mdev(tconn, pi->vnr);
4950         if (!mdev)
4951                 return -EIO;
4952
4953         sector = be64_to_cpu(p->sector);
4954         size = be32_to_cpu(p->blksize);
4955
4956         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4957
4958         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4959                 drbd_ov_out_of_sync_found(mdev, sector, size);
4960         else
4961                 ov_out_of_sync_print(mdev);
4962
4963         if (!get_ldev(mdev))
4964                 return 0;
4965
4966         drbd_rs_complete_io(mdev, sector);
4967         dec_rs_pending(mdev);
4968
4969         --mdev->ov_left;
4970
4971         /* let's advance progress step marks only for every other megabyte */
4972         if ((mdev->ov_left & 0x200) == 0x200)
4973                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4974
4975         if (mdev->ov_left == 0) {
4976                 w = kmalloc(sizeof(*w), GFP_NOIO);
4977                 if (w) {
4978                         w->cb = w_ov_finished;
4979                         w->mdev = mdev;
4980                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4981                 } else {
4982                         dev_err(DEV, "kmalloc(w) failed.");
4983                         ov_out_of_sync_print(mdev);
4984                         drbd_resync_finished(mdev);
4985                 }
4986         }
4987         put_ldev(mdev);
4988         return 0;
4989 }
4990
4991 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4992 {
4993         return 0;
4994 }
4995
4996 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4997 {
4998         struct drbd_conf *mdev;
4999         int vnr, not_empty = 0;
5000
5001         do {
5002                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5003                 flush_signals(current);
5004
5005                 rcu_read_lock();
5006                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5007                         kref_get(&mdev->kref);
5008                         rcu_read_unlock();
5009                         if (drbd_finish_peer_reqs(mdev)) {
5010                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5011                                 return 1;
5012                         }
5013                         kref_put(&mdev->kref, &drbd_minor_destroy);
5014                         rcu_read_lock();
5015                 }
5016                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5017
5018                 spin_lock_irq(&tconn->req_lock);
5019                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5020                         not_empty = !list_empty(&mdev->done_ee);
5021                         if (not_empty)
5022                                 break;
5023                 }
5024                 spin_unlock_irq(&tconn->req_lock);
5025                 rcu_read_unlock();
5026         } while (not_empty);
5027
5028         return 0;
5029 }
5030
5031 struct asender_cmd {
5032         size_t pkt_size;
5033         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5034 };
5035
5036 static struct asender_cmd asender_tbl[] = {
5037         [P_PING]            = { 0, got_Ping },
5038         [P_PING_ACK]        = { 0, got_PingAck },
5039         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5040         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5041         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5042         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5043         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5044         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5045         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5046         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5047         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5048         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5049         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5050         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5051         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5052         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5053         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5054 };
5055
5056 int drbd_asender(struct drbd_thread *thi)
5057 {
5058         struct drbd_tconn *tconn = thi->tconn;
5059         struct asender_cmd *cmd = NULL;
5060         struct packet_info pi;
5061         int rv;
5062         void *buf    = tconn->meta.rbuf;
5063         int received = 0;
5064         unsigned int header_size = drbd_header_size(tconn);
5065         int expect   = header_size;
5066         bool ping_timeout_active = false;
5067         struct net_conf *nc;
5068         int ping_timeo, tcp_cork, ping_int;
5069
5070         current->policy = SCHED_RR;  /* Make this a realtime task! */
5071         current->rt_priority = 2;    /* more important than all other tasks */
5072
5073         while (get_t_state(thi) == RUNNING) {
5074                 drbd_thread_current_set_cpu(thi);
5075
5076                 rcu_read_lock();
5077                 nc = rcu_dereference(tconn->net_conf);
5078                 ping_timeo = nc->ping_timeo;
5079                 tcp_cork = nc->tcp_cork;
5080                 ping_int = nc->ping_int;
5081                 rcu_read_unlock();
5082
5083                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5084                         if (drbd_send_ping(tconn)) {
5085                                 conn_err(tconn, "drbd_send_ping has failed\n");
5086                                 goto reconnect;
5087                         }
5088                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5089                         ping_timeout_active = true;
5090                 }
5091
5092                 /* TODO: conditionally cork; it may hurt latency if we cork without
5093                    much to send */
5094                 if (tcp_cork)
5095                         drbd_tcp_cork(tconn->meta.socket);
5096                 if (tconn_finish_peer_reqs(tconn)) {
5097                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5098                         goto reconnect;
5099                 }
5100                 /* but unconditionally uncork unless disabled */
5101                 if (tcp_cork)
5102                         drbd_tcp_uncork(tconn->meta.socket);
5103
5104                 /* short circuit, recv_msg would return EINTR anyways. */
5105                 if (signal_pending(current))
5106                         continue;
5107
5108                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5109                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5110
5111                 flush_signals(current);
5112
5113                 /* Note:
5114                  * -EINTR        (on meta) we got a signal
5115                  * -EAGAIN       (on meta) rcvtimeo expired
5116                  * -ECONNRESET   other side closed the connection
5117                  * -ERESTARTSYS  (on data) we got a signal
5118                  * rv <  0       other than above: unexpected error!
5119                  * rv == expected: full header or command
5120                  * rv <  expected: "woken" by signal during receive
5121                  * rv == 0       : "connection shut down by peer"
5122                  */
5123                 if (likely(rv > 0)) {
5124                         received += rv;
5125                         buf      += rv;
5126                 } else if (rv == 0) {
5127                         conn_err(tconn, "meta connection shut down by peer.\n");
5128                         goto reconnect;
5129                 } else if (rv == -EAGAIN) {
5130                         /* If the data socket received something meanwhile,
5131                          * that is good enough: peer is still alive. */
5132                         if (time_after(tconn->last_received,
5133                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5134                                 continue;
5135                         if (ping_timeout_active) {
5136                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5137                                 goto reconnect;
5138                         }
5139                         set_bit(SEND_PING, &tconn->flags);
5140                         continue;
5141                 } else if (rv == -EINTR) {
5142                         continue;
5143                 } else {
5144                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5145                         goto reconnect;
5146                 }
5147
5148                 if (received == expect && cmd == NULL) {
5149                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5150                                 goto reconnect;
5151                         cmd = &asender_tbl[pi.cmd];
5152                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5153                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5154                                         pi.cmd, pi.size);
5155                                 goto disconnect;
5156                         }
5157                         expect = header_size + cmd->pkt_size;
5158                         if (pi.size != expect - header_size) {
5159                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5160                                         pi.cmd, pi.size);
5161                                 goto reconnect;
5162                         }
5163                 }
5164                 if (received == expect) {
5165                         bool err;
5166
5167                         err = cmd->fn(tconn, &pi);
5168                         if (err) {
5169                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5170                                 goto reconnect;
5171                         }
5172
5173                         tconn->last_received = jiffies;
5174
5175                         if (cmd == &asender_tbl[P_PING_ACK]) {
5176                                 /* restore idle timeout */
5177                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5178                                 ping_timeout_active = false;
5179                         }
5180
5181                         buf      = tconn->meta.rbuf;
5182                         received = 0;
5183                         expect   = header_size;
5184                         cmd      = NULL;
5185                 }
5186         }
5187
5188         if (0) {
5189 reconnect:
5190                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5191         }
5192         if (0) {
5193 disconnect:
5194                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5195         }
5196         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5197
5198         conn_info(tconn, "asender terminated\n");
5199
5200         return 0;
5201 }