drbd: Remove drbd_accept() and use kernel_accept() instead
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(struct drbd_conf *mdev);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
465 {
466         mm_segment_t oldfs;
467         struct kvec iov = {
468                 .iov_base = buf,
469                 .iov_len = size,
470         };
471         struct msghdr msg = {
472                 .msg_iovlen = 1,
473                 .msg_iov = (struct iovec *)&iov,
474                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
475         };
476         int rv;
477
478         oldfs = get_fs();
479         set_fs(KERNEL_DS);
480         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
481         set_fs(oldfs);
482
483         return rv;
484 }
485
486 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
487 {
488         mm_segment_t oldfs;
489         struct kvec iov = {
490                 .iov_base = buf,
491                 .iov_len = size,
492         };
493         struct msghdr msg = {
494                 .msg_iovlen = 1,
495                 .msg_iov = (struct iovec *)&iov,
496                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
497         };
498         int rv;
499
500         oldfs = get_fs();
501         set_fs(KERNEL_DS);
502
503         for (;;) {
504                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
505                 if (rv == size)
506                         break;
507
508                 /* Note:
509                  * ECONNRESET   other side closed the connection
510                  * ERESTARTSYS  (on  sock) we got a signal
511                  */
512
513                 if (rv < 0) {
514                         if (rv == -ECONNRESET)
515                                 conn_info(tconn, "sock was reset by peer\n");
516                         else if (rv != -ERESTARTSYS)
517                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
518                         break;
519                 } else if (rv == 0) {
520                         conn_info(tconn, "sock was shut down by peer\n");
521                         break;
522                 } else  {
523                         /* signal came in, or peer/link went down,
524                          * after we read a partial message
525                          */
526                         /* D_ASSERT(signal_pending(current)); */
527                         break;
528                 }
529         };
530
531         set_fs(oldfs);
532
533         if (rv != size)
534                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
535
536         return rv;
537 }
538
539 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
540 {
541         int err;
542
543         err = drbd_recv(tconn, buf, size);
544         if (err != size) {
545                 if (err >= 0)
546                         err = -EIO;
547         } else
548                 err = 0;
549         return err;
550 }
551
552 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
553 {
554         int err;
555
556         err = drbd_recv_all(tconn, buf, size);
557         if (err && !signal_pending(current))
558                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
559         return err;
560 }
561
562 /* quoting tcp(7):
563  *   On individual connections, the socket buffer size must be set prior to the
564  *   listen(2) or connect(2) calls in order to have it take effect.
565  * This is our wrapper to do so.
566  */
567 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
568                 unsigned int rcv)
569 {
570         /* open coded SO_SNDBUF, SO_RCVBUF */
571         if (snd) {
572                 sock->sk->sk_sndbuf = snd;
573                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
574         }
575         if (rcv) {
576                 sock->sk->sk_rcvbuf = rcv;
577                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
578         }
579 }
580
581 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
582 {
583         const char *what;
584         struct socket *sock;
585         struct sockaddr_in6 src_in6;
586         struct sockaddr_in6 peer_in6;
587         struct net_conf *nc;
588         int err, peer_addr_len, my_addr_len;
589         int sndbuf_size, rcvbuf_size, connect_int;
590         int disconnect_on_error = 1;
591
592         rcu_read_lock();
593         nc = rcu_dereference(tconn->net_conf);
594         if (!nc) {
595                 rcu_read_unlock();
596                 return NULL;
597         }
598         sndbuf_size = nc->sndbuf_size;
599         rcvbuf_size = nc->rcvbuf_size;
600         connect_int = nc->connect_int;
601         rcu_read_unlock();
602
603         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
604         memcpy(&src_in6, &tconn->my_addr, my_addr_len);
605
606         if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
607                 src_in6.sin6_port = 0;
608         else
609                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
610
611         peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
612         memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
613
614         what = "sock_create_kern";
615         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
616                                SOCK_STREAM, IPPROTO_TCP, &sock);
617         if (err < 0) {
618                 sock = NULL;
619                 goto out;
620         }
621
622         sock->sk->sk_rcvtimeo =
623         sock->sk->sk_sndtimeo = connect_int * HZ;
624         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
625
626        /* explicitly bind to the configured IP as source IP
627         *  for the outgoing connections.
628         *  This is needed for multihomed hosts and to be
629         *  able to use lo: interfaces for drbd.
630         * Make sure to use 0 as port number, so linux selects
631         *  a free one dynamically.
632         */
633         what = "bind before connect";
634         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
635         if (err < 0)
636                 goto out;
637
638         /* connect may fail, peer not yet available.
639          * stay C_WF_CONNECTION, don't go Disconnecting! */
640         disconnect_on_error = 0;
641         what = "connect";
642         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
643
644 out:
645         if (err < 0) {
646                 if (sock) {
647                         sock_release(sock);
648                         sock = NULL;
649                 }
650                 switch (-err) {
651                         /* timeout, busy, signal pending */
652                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
653                 case EINTR: case ERESTARTSYS:
654                         /* peer not (yet) available, network problem */
655                 case ECONNREFUSED: case ENETUNREACH:
656                 case EHOSTDOWN:    case EHOSTUNREACH:
657                         disconnect_on_error = 0;
658                         break;
659                 default:
660                         conn_err(tconn, "%s failed, err = %d\n", what, err);
661                 }
662                 if (disconnect_on_error)
663                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
664         }
665
666         return sock;
667 }
668
669 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
670 {
671         int timeo, err, my_addr_len;
672         int sndbuf_size, rcvbuf_size, connect_int;
673         struct socket *s_estab = NULL, *s_listen;
674         struct sockaddr_in6 my_addr;
675         struct net_conf *nc;
676         const char *what;
677
678         rcu_read_lock();
679         nc = rcu_dereference(tconn->net_conf);
680         if (!nc) {
681                 rcu_read_unlock();
682                 return NULL;
683         }
684         sndbuf_size = nc->sndbuf_size;
685         rcvbuf_size = nc->rcvbuf_size;
686         connect_int = nc->connect_int;
687         rcu_read_unlock();
688
689         my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
690         memcpy(&my_addr, &tconn->my_addr, my_addr_len);
691
692         what = "sock_create_kern";
693         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
694                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
695         if (err) {
696                 s_listen = NULL;
697                 goto out;
698         }
699
700         timeo = connect_int * HZ;
701         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
702
703         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
704         s_listen->sk->sk_rcvtimeo = timeo;
705         s_listen->sk->sk_sndtimeo = timeo;
706         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
707
708         what = "bind before listen";
709         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
710         if (err < 0)
711                 goto out;
712
713         what = "listen";
714         err = s_listen->ops->listen(s_listen, 5);
715         if (err < 0)
716                 goto out;
717
718         what = "accept";
719         err = kernel_accept(s_listen, &s_estab, 0);
720
721 out:
722         if (s_listen)
723                 sock_release(s_listen);
724         if (err < 0) {
725                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
726                         conn_err(tconn, "%s failed, err = %d\n", what, err);
727                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
728                 }
729         }
730
731         return s_estab;
732 }
733
734 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
735
736 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
737                              enum drbd_packet cmd)
738 {
739         if (!conn_prepare_command(tconn, sock))
740                 return -EIO;
741         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
742 }
743
744 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
745 {
746         unsigned int header_size = drbd_header_size(tconn);
747         struct packet_info pi;
748         int err;
749
750         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
751         if (err != header_size) {
752                 if (err >= 0)
753                         err = -EIO;
754                 return err;
755         }
756         err = decode_header(tconn, tconn->data.rbuf, &pi);
757         if (err)
758                 return err;
759         return pi.cmd;
760 }
761
762 /**
763  * drbd_socket_okay() - Free the socket if its connection is not okay
764  * @sock:       pointer to the pointer to the socket.
765  */
766 static int drbd_socket_okay(struct socket **sock)
767 {
768         int rr;
769         char tb[4];
770
771         if (!*sock)
772                 return false;
773
774         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
775
776         if (rr > 0 || rr == -EAGAIN) {
777                 return true;
778         } else {
779                 sock_release(*sock);
780                 *sock = NULL;
781                 return false;
782         }
783 }
784 /* Gets called if a connection is established, or if a new minor gets created
785    in a connection */
786 int drbd_connected(struct drbd_conf *mdev)
787 {
788         int err;
789
790         atomic_set(&mdev->packet_seq, 0);
791         mdev->peer_seq = 0;
792
793         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
794                 &mdev->tconn->cstate_mutex :
795                 &mdev->own_state_mutex;
796
797         err = drbd_send_sync_param(mdev);
798         if (!err)
799                 err = drbd_send_sizes(mdev, 0, 0);
800         if (!err)
801                 err = drbd_send_uuids(mdev);
802         if (!err)
803                 err = drbd_send_current_state(mdev);
804         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
805         clear_bit(RESIZE_PENDING, &mdev->flags);
806         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
807         return err;
808 }
809
810 /*
811  * return values:
812  *   1 yes, we have a valid connection
813  *   0 oops, did not work out, please try again
814  *  -1 peer talks different language,
815  *     no point in trying again, please go standalone.
816  *  -2 We do not have a network config...
817  */
818 static int conn_connect(struct drbd_tconn *tconn)
819 {
820         struct drbd_socket sock, msock;
821         struct drbd_conf *mdev;
822         struct net_conf *nc;
823         int vnr, timeout, try, h, ok;
824         bool discard_my_data;
825         enum drbd_state_rv rv;
826
827         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
828                 return -2;
829
830         mutex_init(&sock.mutex);
831         sock.sbuf = tconn->data.sbuf;
832         sock.rbuf = tconn->data.rbuf;
833         sock.socket = NULL;
834         mutex_init(&msock.mutex);
835         msock.sbuf = tconn->meta.sbuf;
836         msock.rbuf = tconn->meta.rbuf;
837         msock.socket = NULL;
838
839         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
840
841         /* Assume that the peer only understands protocol 80 until we know better.  */
842         tconn->agreed_pro_version = 80;
843
844         do {
845                 struct socket *s;
846
847                 for (try = 0;;) {
848                         /* 3 tries, this should take less than a second! */
849                         s = drbd_try_connect(tconn);
850                         if (s || ++try >= 3)
851                                 break;
852                         /* give the other side time to call bind() & listen() */
853                         schedule_timeout_interruptible(HZ / 10);
854                 }
855
856                 if (s) {
857                         if (!sock.socket) {
858                                 sock.socket = s;
859                                 send_first_packet(tconn, &sock, P_INITIAL_DATA);
860                         } else if (!msock.socket) {
861                                 msock.socket = s;
862                                 send_first_packet(tconn, &msock, P_INITIAL_META);
863                         } else {
864                                 conn_err(tconn, "Logic error in conn_connect()\n");
865                                 goto out_release_sockets;
866                         }
867                 }
868
869                 if (sock.socket && msock.socket) {
870                         rcu_read_lock();
871                         nc = rcu_dereference(tconn->net_conf);
872                         timeout = nc->ping_timeo * HZ / 10;
873                         rcu_read_unlock();
874                         schedule_timeout_interruptible(timeout);
875                         ok = drbd_socket_okay(&sock.socket);
876                         ok = drbd_socket_okay(&msock.socket) && ok;
877                         if (ok)
878                                 break;
879                 }
880
881 retry:
882                 s = drbd_wait_for_connect(tconn);
883                 if (s) {
884                         try = receive_first_packet(tconn, s);
885                         drbd_socket_okay(&sock.socket);
886                         drbd_socket_okay(&msock.socket);
887                         switch (try) {
888                         case P_INITIAL_DATA:
889                                 if (sock.socket) {
890                                         conn_warn(tconn, "initial packet S crossed\n");
891                                         sock_release(sock.socket);
892                                 }
893                                 sock.socket = s;
894                                 break;
895                         case P_INITIAL_META:
896                                 if (msock.socket) {
897                                         conn_warn(tconn, "initial packet M crossed\n");
898                                         sock_release(msock.socket);
899                                 }
900                                 msock.socket = s;
901                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
902                                 break;
903                         default:
904                                 conn_warn(tconn, "Error receiving initial packet\n");
905                                 sock_release(s);
906                                 if (random32() & 1)
907                                         goto retry;
908                         }
909                 }
910
911                 if (tconn->cstate <= C_DISCONNECTING)
912                         goto out_release_sockets;
913                 if (signal_pending(current)) {
914                         flush_signals(current);
915                         smp_rmb();
916                         if (get_t_state(&tconn->receiver) == EXITING)
917                                 goto out_release_sockets;
918                 }
919
920                 if (sock.socket && &msock.socket) {
921                         ok = drbd_socket_okay(&sock.socket);
922                         ok = drbd_socket_okay(&msock.socket) && ok;
923                         if (ok)
924                                 break;
925                 }
926         } while (1);
927
928         sock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
929         msock.socket->sk->sk_reuse = 1; /* SO_REUSEADDR */
930
931         sock.socket->sk->sk_allocation = GFP_NOIO;
932         msock.socket->sk->sk_allocation = GFP_NOIO;
933
934         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
935         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
936
937         /* NOT YET ...
938          * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
939          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
940          * first set it to the P_CONNECTION_FEATURES timeout,
941          * which we set to 4x the configured ping_timeout. */
942         rcu_read_lock();
943         nc = rcu_dereference(tconn->net_conf);
944
945         sock.socket->sk->sk_sndtimeo =
946         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
947
948         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
949         timeout = nc->timeout * HZ / 10;
950         discard_my_data = nc->discard_my_data;
951         rcu_read_unlock();
952
953         msock.socket->sk->sk_sndtimeo = timeout;
954
955         /* we don't want delays.
956          * we use TCP_CORK where appropriate, though */
957         drbd_tcp_nodelay(sock.socket);
958         drbd_tcp_nodelay(msock.socket);
959
960         tconn->data.socket = sock.socket;
961         tconn->meta.socket = msock.socket;
962         tconn->last_received = jiffies;
963
964         h = drbd_do_features(tconn);
965         if (h <= 0)
966                 return h;
967
968         if (tconn->cram_hmac_tfm) {
969                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
970                 switch (drbd_do_auth(tconn)) {
971                 case -1:
972                         conn_err(tconn, "Authentication of peer failed\n");
973                         return -1;
974                 case 0:
975                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
976                         return 0;
977                 }
978         }
979
980         tconn->data.socket->sk->sk_sndtimeo = timeout;
981         tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
982
983         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
984                 return -1;
985
986         set_bit(STATE_SENT, &tconn->flags);
987
988         rcu_read_lock();
989         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
990                 kref_get(&mdev->kref);
991                 rcu_read_unlock();
992
993                 if (discard_my_data)
994                         set_bit(DISCARD_MY_DATA, &mdev->flags);
995                 else
996                         clear_bit(DISCARD_MY_DATA, &mdev->flags);
997
998                 drbd_connected(mdev);
999                 kref_put(&mdev->kref, &drbd_minor_destroy);
1000                 rcu_read_lock();
1001         }
1002         rcu_read_unlock();
1003
1004         rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1005         if (rv < SS_SUCCESS) {
1006                 clear_bit(STATE_SENT, &tconn->flags);
1007                 return 0;
1008         }
1009
1010         drbd_thread_start(&tconn->asender);
1011
1012         mutex_lock(&tconn->conf_update);
1013         /* The discard_my_data flag is a single-shot modifier to the next
1014          * connection attempt, the handshake of which is now well underway.
1015          * No need for rcu style copying of the whole struct
1016          * just to clear a single value. */
1017         tconn->net_conf->discard_my_data = 0;
1018         mutex_unlock(&tconn->conf_update);
1019
1020         return h;
1021
1022 out_release_sockets:
1023         if (sock.socket)
1024                 sock_release(sock.socket);
1025         if (msock.socket)
1026                 sock_release(msock.socket);
1027         return -1;
1028 }
1029
1030 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1031 {
1032         unsigned int header_size = drbd_header_size(tconn);
1033
1034         if (header_size == sizeof(struct p_header100) &&
1035             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1036                 struct p_header100 *h = header;
1037                 if (h->pad != 0) {
1038                         conn_err(tconn, "Header padding is not zero\n");
1039                         return -EINVAL;
1040                 }
1041                 pi->vnr = be16_to_cpu(h->volume);
1042                 pi->cmd = be16_to_cpu(h->command);
1043                 pi->size = be32_to_cpu(h->length);
1044         } else if (header_size == sizeof(struct p_header95) &&
1045                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1046                 struct p_header95 *h = header;
1047                 pi->cmd = be16_to_cpu(h->command);
1048                 pi->size = be32_to_cpu(h->length);
1049                 pi->vnr = 0;
1050         } else if (header_size == sizeof(struct p_header80) &&
1051                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1052                 struct p_header80 *h = header;
1053                 pi->cmd = be16_to_cpu(h->command);
1054                 pi->size = be16_to_cpu(h->length);
1055                 pi->vnr = 0;
1056         } else {
1057                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1058                          be32_to_cpu(*(__be32 *)header),
1059                          tconn->agreed_pro_version);
1060                 return -EINVAL;
1061         }
1062         pi->data = header + header_size;
1063         return 0;
1064 }
1065
1066 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1067 {
1068         void *buffer = tconn->data.rbuf;
1069         int err;
1070
1071         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1072         if (err)
1073                 return err;
1074
1075         err = decode_header(tconn, buffer, pi);
1076         tconn->last_received = jiffies;
1077
1078         return err;
1079 }
1080
1081 static void drbd_flush(struct drbd_tconn *tconn)
1082 {
1083         int rv;
1084         struct drbd_conf *mdev;
1085         int vnr;
1086
1087         if (tconn->write_ordering >= WO_bdev_flush) {
1088                 rcu_read_lock();
1089                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1090                         if (!get_ldev(mdev))
1091                                 continue;
1092                         kref_get(&mdev->kref);
1093                         rcu_read_unlock();
1094
1095                         rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1096                                         GFP_NOIO, NULL);
1097                         if (rv) {
1098                                 dev_info(DEV, "local disk flush failed with status %d\n", rv);
1099                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1100                                  * don't try again for ANY return value != 0
1101                                  * if (rv == -EOPNOTSUPP) */
1102                                 drbd_bump_write_ordering(tconn, WO_drain_io);
1103                         }
1104                         put_ldev(mdev);
1105                         kref_put(&mdev->kref, &drbd_minor_destroy);
1106
1107                         rcu_read_lock();
1108                         if (rv)
1109                                 break;
1110                 }
1111                 rcu_read_unlock();
1112         }
1113 }
1114
1115 /**
1116  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1117  * @mdev:       DRBD device.
1118  * @epoch:      Epoch object.
1119  * @ev:         Epoch event.
1120  */
1121 static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1122                                                struct drbd_epoch *epoch,
1123                                                enum epoch_event ev)
1124 {
1125         int epoch_size;
1126         struct drbd_epoch *next_epoch;
1127         enum finish_epoch rv = FE_STILL_LIVE;
1128
1129         spin_lock(&tconn->epoch_lock);
1130         do {
1131                 next_epoch = NULL;
1132
1133                 epoch_size = atomic_read(&epoch->epoch_size);
1134
1135                 switch (ev & ~EV_CLEANUP) {
1136                 case EV_PUT:
1137                         atomic_dec(&epoch->active);
1138                         break;
1139                 case EV_GOT_BARRIER_NR:
1140                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1141                         break;
1142                 case EV_BECAME_LAST:
1143                         /* nothing to do*/
1144                         break;
1145                 }
1146
1147                 if (epoch_size != 0 &&
1148                     atomic_read(&epoch->active) == 0 &&
1149                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1150                         if (!(ev & EV_CLEANUP)) {
1151                                 spin_unlock(&tconn->epoch_lock);
1152                                 drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1153                                 spin_lock(&tconn->epoch_lock);
1154                         }
1155 #if 0
1156                         /* FIXME: dec unacked on connection, once we have
1157                          * something to count pending connection packets in. */
1158                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1159                                 dec_unacked(epoch->tconn);
1160 #endif
1161
1162                         if (tconn->current_epoch != epoch) {
1163                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1164                                 list_del(&epoch->list);
1165                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1166                                 tconn->epochs--;
1167                                 kfree(epoch);
1168
1169                                 if (rv == FE_STILL_LIVE)
1170                                         rv = FE_DESTROYED;
1171                         } else {
1172                                 epoch->flags = 0;
1173                                 atomic_set(&epoch->epoch_size, 0);
1174                                 /* atomic_set(&epoch->active, 0); is already zero */
1175                                 if (rv == FE_STILL_LIVE)
1176                                         rv = FE_RECYCLED;
1177                         }
1178                 }
1179
1180                 if (!next_epoch)
1181                         break;
1182
1183                 epoch = next_epoch;
1184         } while (1);
1185
1186         spin_unlock(&tconn->epoch_lock);
1187
1188         return rv;
1189 }
1190
1191 /**
1192  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1193  * @tconn:      DRBD connection.
1194  * @wo:         Write ordering method to try.
1195  */
1196 void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1197 {
1198         struct disk_conf *dc;
1199         struct drbd_conf *mdev;
1200         enum write_ordering_e pwo;
1201         int vnr;
1202         static char *write_ordering_str[] = {
1203                 [WO_none] = "none",
1204                 [WO_drain_io] = "drain",
1205                 [WO_bdev_flush] = "flush",
1206         };
1207
1208         pwo = tconn->write_ordering;
1209         wo = min(pwo, wo);
1210         rcu_read_lock();
1211         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1212                 if (!get_ldev_if_state(mdev, D_ATTACHING))
1213                         continue;
1214                 dc = rcu_dereference(mdev->ldev->disk_conf);
1215
1216                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1217                         wo = WO_drain_io;
1218                 if (wo == WO_drain_io && !dc->disk_drain)
1219                         wo = WO_none;
1220                 put_ldev(mdev);
1221         }
1222         rcu_read_unlock();
1223         tconn->write_ordering = wo;
1224         if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1225                 conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1226 }
1227
1228 /**
1229  * drbd_submit_peer_request()
1230  * @mdev:       DRBD device.
1231  * @peer_req:   peer request
1232  * @rw:         flag field, see bio->bi_rw
1233  *
1234  * May spread the pages to multiple bios,
1235  * depending on bio_add_page restrictions.
1236  *
1237  * Returns 0 if all bios have been submitted,
1238  * -ENOMEM if we could not allocate enough bios,
1239  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1240  *  single page to an empty bio (which should never happen and likely indicates
1241  *  that the lower level IO stack is in some way broken). This has been observed
1242  *  on certain Xen deployments.
1243  */
1244 /* TODO allocate from our own bio_set. */
1245 int drbd_submit_peer_request(struct drbd_conf *mdev,
1246                              struct drbd_peer_request *peer_req,
1247                              const unsigned rw, const int fault_type)
1248 {
1249         struct bio *bios = NULL;
1250         struct bio *bio;
1251         struct page *page = peer_req->pages;
1252         sector_t sector = peer_req->i.sector;
1253         unsigned ds = peer_req->i.size;
1254         unsigned n_bios = 0;
1255         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1256         int err = -ENOMEM;
1257
1258         /* In most cases, we will only need one bio.  But in case the lower
1259          * level restrictions happen to be different at this offset on this
1260          * side than those of the sending peer, we may need to submit the
1261          * request in more than one bio.
1262          *
1263          * Plain bio_alloc is good enough here, this is no DRBD internally
1264          * generated bio, but a bio allocated on behalf of the peer.
1265          */
1266 next_bio:
1267         bio = bio_alloc(GFP_NOIO, nr_pages);
1268         if (!bio) {
1269                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1270                 goto fail;
1271         }
1272         /* > peer_req->i.sector, unless this is the first bio */
1273         bio->bi_sector = sector;
1274         bio->bi_bdev = mdev->ldev->backing_bdev;
1275         bio->bi_rw = rw;
1276         bio->bi_private = peer_req;
1277         bio->bi_end_io = drbd_peer_request_endio;
1278
1279         bio->bi_next = bios;
1280         bios = bio;
1281         ++n_bios;
1282
1283         page_chain_for_each(page) {
1284                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1285                 if (!bio_add_page(bio, page, len, 0)) {
1286                         /* A single page must always be possible!
1287                          * But in case it fails anyways,
1288                          * we deal with it, and complain (below). */
1289                         if (bio->bi_vcnt == 0) {
1290                                 dev_err(DEV,
1291                                         "bio_add_page failed for len=%u, "
1292                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1293                                         len, (unsigned long long)bio->bi_sector);
1294                                 err = -ENOSPC;
1295                                 goto fail;
1296                         }
1297                         goto next_bio;
1298                 }
1299                 ds -= len;
1300                 sector += len >> 9;
1301                 --nr_pages;
1302         }
1303         D_ASSERT(page == NULL);
1304         D_ASSERT(ds == 0);
1305
1306         atomic_set(&peer_req->pending_bios, n_bios);
1307         do {
1308                 bio = bios;
1309                 bios = bios->bi_next;
1310                 bio->bi_next = NULL;
1311
1312                 drbd_generic_make_request(mdev, fault_type, bio);
1313         } while (bios);
1314         return 0;
1315
1316 fail:
1317         while (bios) {
1318                 bio = bios;
1319                 bios = bios->bi_next;
1320                 bio_put(bio);
1321         }
1322         return err;
1323 }
1324
1325 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1326                                              struct drbd_peer_request *peer_req)
1327 {
1328         struct drbd_interval *i = &peer_req->i;
1329
1330         drbd_remove_interval(&mdev->write_requests, i);
1331         drbd_clear_interval(i);
1332
1333         /* Wake up any processes waiting for this peer request to complete.  */
1334         if (i->waiting)
1335                 wake_up(&mdev->misc_wait);
1336 }
1337
1338 void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1339 {
1340         struct drbd_conf *mdev;
1341         int vnr;
1342
1343         rcu_read_lock();
1344         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1345                 kref_get(&mdev->kref);
1346                 rcu_read_unlock();
1347                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1348                 kref_put(&mdev->kref, &drbd_minor_destroy);
1349                 rcu_read_lock();
1350         }
1351         rcu_read_unlock();
1352 }
1353
1354 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1355 {
1356         int rv;
1357         struct p_barrier *p = pi->data;
1358         struct drbd_epoch *epoch;
1359
1360         /* FIXME these are unacked on connection,
1361          * not a specific (peer)device.
1362          */
1363         tconn->current_epoch->barrier_nr = p->barrier;
1364         tconn->current_epoch->tconn = tconn;
1365         rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1366
1367         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1368          * the activity log, which means it would not be resynced in case the
1369          * R_PRIMARY crashes now.
1370          * Therefore we must send the barrier_ack after the barrier request was
1371          * completed. */
1372         switch (tconn->write_ordering) {
1373         case WO_none:
1374                 if (rv == FE_RECYCLED)
1375                         return 0;
1376
1377                 /* receiver context, in the writeout path of the other node.
1378                  * avoid potential distributed deadlock */
1379                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1380                 if (epoch)
1381                         break;
1382                 else
1383                         conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1384                         /* Fall through */
1385
1386         case WO_bdev_flush:
1387         case WO_drain_io:
1388                 conn_wait_active_ee_empty(tconn);
1389                 drbd_flush(tconn);
1390
1391                 if (atomic_read(&tconn->current_epoch->epoch_size)) {
1392                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1393                         if (epoch)
1394                                 break;
1395                 }
1396
1397                 return 0;
1398         default:
1399                 conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1400                 return -EIO;
1401         }
1402
1403         epoch->flags = 0;
1404         atomic_set(&epoch->epoch_size, 0);
1405         atomic_set(&epoch->active, 0);
1406
1407         spin_lock(&tconn->epoch_lock);
1408         if (atomic_read(&tconn->current_epoch->epoch_size)) {
1409                 list_add(&epoch->list, &tconn->current_epoch->list);
1410                 tconn->current_epoch = epoch;
1411                 tconn->epochs++;
1412         } else {
1413                 /* The current_epoch got recycled while we allocated this one... */
1414                 kfree(epoch);
1415         }
1416         spin_unlock(&tconn->epoch_lock);
1417
1418         return 0;
1419 }
1420
1421 /* used from receive_RSDataReply (recv_resync_read)
1422  * and from receive_Data */
1423 static struct drbd_peer_request *
1424 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1425               int data_size) __must_hold(local)
1426 {
1427         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1428         struct drbd_peer_request *peer_req;
1429         struct page *page;
1430         int dgs, ds, err;
1431         void *dig_in = mdev->tconn->int_dig_in;
1432         void *dig_vv = mdev->tconn->int_dig_vv;
1433         unsigned long *data;
1434
1435         dgs = 0;
1436         if (mdev->tconn->peer_integrity_tfm) {
1437                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1438                 /*
1439                  * FIXME: Receive the incoming digest into the receive buffer
1440                  *        here, together with its struct p_data?
1441                  */
1442                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1443                 if (err)
1444                         return NULL;
1445                 data_size -= dgs;
1446         }
1447
1448         if (!expect(data_size != 0))
1449                 return NULL;
1450         if (!expect(IS_ALIGNED(data_size, 512)))
1451                 return NULL;
1452         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1453                 return NULL;
1454
1455         /* even though we trust out peer,
1456          * we sometimes have to double check. */
1457         if (sector + (data_size>>9) > capacity) {
1458                 dev_err(DEV, "request from peer beyond end of local disk: "
1459                         "capacity: %llus < sector: %llus + size: %u\n",
1460                         (unsigned long long)capacity,
1461                         (unsigned long long)sector, data_size);
1462                 return NULL;
1463         }
1464
1465         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1466          * "criss-cross" setup, that might cause write-out on some other DRBD,
1467          * which in turn might block on the other node at this very place.  */
1468         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1469         if (!peer_req)
1470                 return NULL;
1471
1472         ds = data_size;
1473         page = peer_req->pages;
1474         page_chain_for_each(page) {
1475                 unsigned len = min_t(int, ds, PAGE_SIZE);
1476                 data = kmap(page);
1477                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1478                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1479                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1480                         data[0] = data[0] ^ (unsigned long)-1;
1481                 }
1482                 kunmap(page);
1483                 if (err) {
1484                         drbd_free_peer_req(mdev, peer_req);
1485                         return NULL;
1486                 }
1487                 ds -= len;
1488         }
1489
1490         if (dgs) {
1491                 drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1492                 if (memcmp(dig_in, dig_vv, dgs)) {
1493                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1494                                 (unsigned long long)sector, data_size);
1495                         drbd_free_peer_req(mdev, peer_req);
1496                         return NULL;
1497                 }
1498         }
1499         mdev->recv_cnt += data_size>>9;
1500         return peer_req;
1501 }
1502
1503 /* drbd_drain_block() just takes a data block
1504  * out of the socket input buffer, and discards it.
1505  */
1506 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1507 {
1508         struct page *page;
1509         int err = 0;
1510         void *data;
1511
1512         if (!data_size)
1513                 return 0;
1514
1515         page = drbd_alloc_pages(mdev, 1, 1);
1516
1517         data = kmap(page);
1518         while (data_size) {
1519                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1520
1521                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1522                 if (err)
1523                         break;
1524                 data_size -= len;
1525         }
1526         kunmap(page);
1527         drbd_free_pages(mdev, page, 0);
1528         return err;
1529 }
1530
1531 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1532                            sector_t sector, int data_size)
1533 {
1534         struct bio_vec *bvec;
1535         struct bio *bio;
1536         int dgs, err, i, expect;
1537         void *dig_in = mdev->tconn->int_dig_in;
1538         void *dig_vv = mdev->tconn->int_dig_vv;
1539
1540         dgs = 0;
1541         if (mdev->tconn->peer_integrity_tfm) {
1542                 dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1543                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1544                 if (err)
1545                         return err;
1546                 data_size -= dgs;
1547         }
1548
1549         /* optimistically update recv_cnt.  if receiving fails below,
1550          * we disconnect anyways, and counters will be reset. */
1551         mdev->recv_cnt += data_size>>9;
1552
1553         bio = req->master_bio;
1554         D_ASSERT(sector == bio->bi_sector);
1555
1556         bio_for_each_segment(bvec, bio, i) {
1557                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1558                 expect = min_t(int, data_size, bvec->bv_len);
1559                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1560                 kunmap(bvec->bv_page);
1561                 if (err)
1562                         return err;
1563                 data_size -= expect;
1564         }
1565
1566         if (dgs) {
1567                 drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1568                 if (memcmp(dig_in, dig_vv, dgs)) {
1569                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1570                         return -EINVAL;
1571                 }
1572         }
1573
1574         D_ASSERT(data_size == 0);
1575         return 0;
1576 }
1577
1578 /*
1579  * e_end_resync_block() is called in asender context via
1580  * drbd_finish_peer_reqs().
1581  */
1582 static int e_end_resync_block(struct drbd_work *w, int unused)
1583 {
1584         struct drbd_peer_request *peer_req =
1585                 container_of(w, struct drbd_peer_request, w);
1586         struct drbd_conf *mdev = w->mdev;
1587         sector_t sector = peer_req->i.sector;
1588         int err;
1589
1590         D_ASSERT(drbd_interval_empty(&peer_req->i));
1591
1592         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1593                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1594                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1595         } else {
1596                 /* Record failure to sync */
1597                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1598
1599                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1600         }
1601         dec_unacked(mdev);
1602
1603         return err;
1604 }
1605
1606 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1607 {
1608         struct drbd_peer_request *peer_req;
1609
1610         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1611         if (!peer_req)
1612                 goto fail;
1613
1614         dec_rs_pending(mdev);
1615
1616         inc_unacked(mdev);
1617         /* corresponding dec_unacked() in e_end_resync_block()
1618          * respective _drbd_clear_done_ee */
1619
1620         peer_req->w.cb = e_end_resync_block;
1621
1622         spin_lock_irq(&mdev->tconn->req_lock);
1623         list_add(&peer_req->w.list, &mdev->sync_ee);
1624         spin_unlock_irq(&mdev->tconn->req_lock);
1625
1626         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1627         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1628                 return 0;
1629
1630         /* don't care for the reason here */
1631         dev_err(DEV, "submit failed, triggering re-connect\n");
1632         spin_lock_irq(&mdev->tconn->req_lock);
1633         list_del(&peer_req->w.list);
1634         spin_unlock_irq(&mdev->tconn->req_lock);
1635
1636         drbd_free_peer_req(mdev, peer_req);
1637 fail:
1638         put_ldev(mdev);
1639         return -EIO;
1640 }
1641
1642 static struct drbd_request *
1643 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1644              sector_t sector, bool missing_ok, const char *func)
1645 {
1646         struct drbd_request *req;
1647
1648         /* Request object according to our peer */
1649         req = (struct drbd_request *)(unsigned long)id;
1650         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1651                 return req;
1652         if (!missing_ok) {
1653                 dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1654                         (unsigned long)id, (unsigned long long)sector);
1655         }
1656         return NULL;
1657 }
1658
1659 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1660 {
1661         struct drbd_conf *mdev;
1662         struct drbd_request *req;
1663         sector_t sector;
1664         int err;
1665         struct p_data *p = pi->data;
1666
1667         mdev = vnr_to_mdev(tconn, pi->vnr);
1668         if (!mdev)
1669                 return -EIO;
1670
1671         sector = be64_to_cpu(p->sector);
1672
1673         spin_lock_irq(&mdev->tconn->req_lock);
1674         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1675         spin_unlock_irq(&mdev->tconn->req_lock);
1676         if (unlikely(!req))
1677                 return -EIO;
1678
1679         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1680          * special casing it there for the various failure cases.
1681          * still no race with drbd_fail_pending_reads */
1682         err = recv_dless_read(mdev, req, sector, pi->size);
1683         if (!err)
1684                 req_mod(req, DATA_RECEIVED);
1685         /* else: nothing. handled from drbd_disconnect...
1686          * I don't think we may complete this just yet
1687          * in case we are "on-disconnect: freeze" */
1688
1689         return err;
1690 }
1691
1692 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1693 {
1694         struct drbd_conf *mdev;
1695         sector_t sector;
1696         int err;
1697         struct p_data *p = pi->data;
1698
1699         mdev = vnr_to_mdev(tconn, pi->vnr);
1700         if (!mdev)
1701                 return -EIO;
1702
1703         sector = be64_to_cpu(p->sector);
1704         D_ASSERT(p->block_id == ID_SYNCER);
1705
1706         if (get_ldev(mdev)) {
1707                 /* data is submitted to disk within recv_resync_read.
1708                  * corresponding put_ldev done below on error,
1709                  * or in drbd_peer_request_endio. */
1710                 err = recv_resync_read(mdev, sector, pi->size);
1711         } else {
1712                 if (__ratelimit(&drbd_ratelimit_state))
1713                         dev_err(DEV, "Can not write resync data to local disk.\n");
1714
1715                 err = drbd_drain_block(mdev, pi->size);
1716
1717                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1718         }
1719
1720         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1721
1722         return err;
1723 }
1724
1725 static void restart_conflicting_writes(struct drbd_conf *mdev,
1726                                        sector_t sector, int size)
1727 {
1728         struct drbd_interval *i;
1729         struct drbd_request *req;
1730
1731         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1732                 if (!i->local)
1733                         continue;
1734                 req = container_of(i, struct drbd_request, i);
1735                 if (req->rq_state & RQ_LOCAL_PENDING ||
1736                     !(req->rq_state & RQ_POSTPONED))
1737                         continue;
1738                 /* as it is RQ_POSTPONED, this will cause it to
1739                  * be queued on the retry workqueue. */
1740                 __req_mod(req, DISCARD_WRITE, NULL);
1741         }
1742 }
1743
1744 /*
1745  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1746  */
1747 static int e_end_block(struct drbd_work *w, int cancel)
1748 {
1749         struct drbd_peer_request *peer_req =
1750                 container_of(w, struct drbd_peer_request, w);
1751         struct drbd_conf *mdev = w->mdev;
1752         sector_t sector = peer_req->i.sector;
1753         int err = 0, pcmd;
1754
1755         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1756                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1757                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1758                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1759                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1760                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1761                         err = drbd_send_ack(mdev, pcmd, peer_req);
1762                         if (pcmd == P_RS_WRITE_ACK)
1763                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1764                 } else {
1765                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1766                         /* we expect it to be marked out of sync anyways...
1767                          * maybe assert this?  */
1768                 }
1769                 dec_unacked(mdev);
1770         }
1771         /* we delete from the conflict detection hash _after_ we sent out the
1772          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1773         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1774                 spin_lock_irq(&mdev->tconn->req_lock);
1775                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1776                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1777                 if (peer_req->flags & EE_RESTART_REQUESTS)
1778                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1779                 spin_unlock_irq(&mdev->tconn->req_lock);
1780         } else
1781                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1782
1783         drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1784
1785         return err;
1786 }
1787
1788 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1789 {
1790         struct drbd_conf *mdev = w->mdev;
1791         struct drbd_peer_request *peer_req =
1792                 container_of(w, struct drbd_peer_request, w);
1793         int err;
1794
1795         err = drbd_send_ack(mdev, ack, peer_req);
1796         dec_unacked(mdev);
1797
1798         return err;
1799 }
1800
1801 static int e_send_discard_write(struct drbd_work *w, int unused)
1802 {
1803         return e_send_ack(w, P_DISCARD_WRITE);
1804 }
1805
1806 static int e_send_retry_write(struct drbd_work *w, int unused)
1807 {
1808         struct drbd_tconn *tconn = w->mdev->tconn;
1809
1810         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1811                              P_RETRY_WRITE : P_DISCARD_WRITE);
1812 }
1813
1814 static bool seq_greater(u32 a, u32 b)
1815 {
1816         /*
1817          * We assume 32-bit wrap-around here.
1818          * For 24-bit wrap-around, we would have to shift:
1819          *  a <<= 8; b <<= 8;
1820          */
1821         return (s32)a - (s32)b > 0;
1822 }
1823
1824 static u32 seq_max(u32 a, u32 b)
1825 {
1826         return seq_greater(a, b) ? a : b;
1827 }
1828
1829 static bool need_peer_seq(struct drbd_conf *mdev)
1830 {
1831         struct drbd_tconn *tconn = mdev->tconn;
1832         int tp;
1833
1834         /*
1835          * We only need to keep track of the last packet_seq number of our peer
1836          * if we are in dual-primary mode and we have the discard flag set; see
1837          * handle_write_conflicts().
1838          */
1839
1840         rcu_read_lock();
1841         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1842         rcu_read_unlock();
1843
1844         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1845 }
1846
1847 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1848 {
1849         unsigned int newest_peer_seq;
1850
1851         if (need_peer_seq(mdev)) {
1852                 spin_lock(&mdev->peer_seq_lock);
1853                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1854                 mdev->peer_seq = newest_peer_seq;
1855                 spin_unlock(&mdev->peer_seq_lock);
1856                 /* wake up only if we actually changed mdev->peer_seq */
1857                 if (peer_seq == newest_peer_seq)
1858                         wake_up(&mdev->seq_wait);
1859         }
1860 }
1861
1862 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1863 {
1864         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1865 }
1866
1867 /* maybe change sync_ee into interval trees as well? */
1868 static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1869 {
1870         struct drbd_peer_request *rs_req;
1871         bool rv = 0;
1872
1873         spin_lock_irq(&mdev->tconn->req_lock);
1874         list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1875                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1876                              rs_req->i.sector, rs_req->i.size)) {
1877                         rv = 1;
1878                         break;
1879                 }
1880         }
1881         spin_unlock_irq(&mdev->tconn->req_lock);
1882
1883         return rv;
1884 }
1885
1886 /* Called from receive_Data.
1887  * Synchronize packets on sock with packets on msock.
1888  *
1889  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1890  * packet traveling on msock, they are still processed in the order they have
1891  * been sent.
1892  *
1893  * Note: we don't care for Ack packets overtaking P_DATA packets.
1894  *
1895  * In case packet_seq is larger than mdev->peer_seq number, there are
1896  * outstanding packets on the msock. We wait for them to arrive.
1897  * In case we are the logically next packet, we update mdev->peer_seq
1898  * ourselves. Correctly handles 32bit wrap around.
1899  *
1900  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1901  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1902  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1903  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1904  *
1905  * returns 0 if we may process the packet,
1906  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1907 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1908 {
1909         DEFINE_WAIT(wait);
1910         long timeout;
1911         int ret;
1912
1913         if (!need_peer_seq(mdev))
1914                 return 0;
1915
1916         spin_lock(&mdev->peer_seq_lock);
1917         for (;;) {
1918                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1919                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1920                         ret = 0;
1921                         break;
1922                 }
1923                 if (signal_pending(current)) {
1924                         ret = -ERESTARTSYS;
1925                         break;
1926                 }
1927                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1928                 spin_unlock(&mdev->peer_seq_lock);
1929                 rcu_read_lock();
1930                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1931                 rcu_read_unlock();
1932                 timeout = schedule_timeout(timeout);
1933                 spin_lock(&mdev->peer_seq_lock);
1934                 if (!timeout) {
1935                         ret = -ETIMEDOUT;
1936                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1937                         break;
1938                 }
1939         }
1940         spin_unlock(&mdev->peer_seq_lock);
1941         finish_wait(&mdev->seq_wait, &wait);
1942         return ret;
1943 }
1944
1945 /* see also bio_flags_to_wire()
1946  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1947  * flags and back. We may replicate to other kernel versions. */
1948 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1949 {
1950         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1951                 (dpf & DP_FUA ? REQ_FUA : 0) |
1952                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1953                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1954 }
1955
1956 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1957                                     unsigned int size)
1958 {
1959         struct drbd_interval *i;
1960
1961     repeat:
1962         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1963                 struct drbd_request *req;
1964                 struct bio_and_error m;
1965
1966                 if (!i->local)
1967                         continue;
1968                 req = container_of(i, struct drbd_request, i);
1969                 if (!(req->rq_state & RQ_POSTPONED))
1970                         continue;
1971                 req->rq_state &= ~RQ_POSTPONED;
1972                 __req_mod(req, NEG_ACKED, &m);
1973                 spin_unlock_irq(&mdev->tconn->req_lock);
1974                 if (m.bio)
1975                         complete_master_bio(mdev, &m);
1976                 spin_lock_irq(&mdev->tconn->req_lock);
1977                 goto repeat;
1978         }
1979 }
1980
1981 static int handle_write_conflicts(struct drbd_conf *mdev,
1982                                   struct drbd_peer_request *peer_req)
1983 {
1984         struct drbd_tconn *tconn = mdev->tconn;
1985         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1986         sector_t sector = peer_req->i.sector;
1987         const unsigned int size = peer_req->i.size;
1988         struct drbd_interval *i;
1989         bool equal;
1990         int err;
1991
1992         /*
1993          * Inserting the peer request into the write_requests tree will prevent
1994          * new conflicting local requests from being added.
1995          */
1996         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1997
1998     repeat:
1999         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2000                 if (i == &peer_req->i)
2001                         continue;
2002
2003                 if (!i->local) {
2004                         /*
2005                          * Our peer has sent a conflicting remote request; this
2006                          * should not happen in a two-node setup.  Wait for the
2007                          * earlier peer request to complete.
2008                          */
2009                         err = drbd_wait_misc(mdev, i);
2010                         if (err)
2011                                 goto out;
2012                         goto repeat;
2013                 }
2014
2015                 equal = i->sector == sector && i->size == size;
2016                 if (resolve_conflicts) {
2017                         /*
2018                          * If the peer request is fully contained within the
2019                          * overlapping request, it can be discarded; otherwise,
2020                          * it will be retried once all overlapping requests
2021                          * have completed.
2022                          */
2023                         bool discard = i->sector <= sector && i->sector +
2024                                        (i->size >> 9) >= sector + (size >> 9);
2025
2026                         if (!equal)
2027                                 dev_alert(DEV, "Concurrent writes detected: "
2028                                                "local=%llus +%u, remote=%llus +%u, "
2029                                                "assuming %s came first\n",
2030                                           (unsigned long long)i->sector, i->size,
2031                                           (unsigned long long)sector, size,
2032                                           discard ? "local" : "remote");
2033
2034                         inc_unacked(mdev);
2035                         peer_req->w.cb = discard ? e_send_discard_write :
2036                                                    e_send_retry_write;
2037                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
2038                         wake_asender(mdev->tconn);
2039
2040                         err = -ENOENT;
2041                         goto out;
2042                 } else {
2043                         struct drbd_request *req =
2044                                 container_of(i, struct drbd_request, i);
2045
2046                         if (!equal)
2047                                 dev_alert(DEV, "Concurrent writes detected: "
2048                                                "local=%llus +%u, remote=%llus +%u\n",
2049                                           (unsigned long long)i->sector, i->size,
2050                                           (unsigned long long)sector, size);
2051
2052                         if (req->rq_state & RQ_LOCAL_PENDING ||
2053                             !(req->rq_state & RQ_POSTPONED)) {
2054                                 /*
2055                                  * Wait for the node with the discard flag to
2056                                  * decide if this request will be discarded or
2057                                  * retried.  Requests that are discarded will
2058                                  * disappear from the write_requests tree.
2059                                  *
2060                                  * In addition, wait for the conflicting
2061                                  * request to finish locally before submitting
2062                                  * the conflicting peer request.
2063                                  */
2064                                 err = drbd_wait_misc(mdev, &req->i);
2065                                 if (err) {
2066                                         _conn_request_state(mdev->tconn,
2067                                                             NS(conn, C_TIMEOUT),
2068                                                             CS_HARD);
2069                                         fail_postponed_requests(mdev, sector, size);
2070                                         goto out;
2071                                 }
2072                                 goto repeat;
2073                         }
2074                         /*
2075                          * Remember to restart the conflicting requests after
2076                          * the new peer request has completed.
2077                          */
2078                         peer_req->flags |= EE_RESTART_REQUESTS;
2079                 }
2080         }
2081         err = 0;
2082
2083     out:
2084         if (err)
2085                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2086         return err;
2087 }
2088
2089 /* mirrored write */
2090 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2091 {
2092         struct drbd_conf *mdev;
2093         sector_t sector;
2094         struct drbd_peer_request *peer_req;
2095         struct p_data *p = pi->data;
2096         u32 peer_seq = be32_to_cpu(p->seq_num);
2097         int rw = WRITE;
2098         u32 dp_flags;
2099         int err, tp;
2100
2101         mdev = vnr_to_mdev(tconn, pi->vnr);
2102         if (!mdev)
2103                 return -EIO;
2104
2105         if (!get_ldev(mdev)) {
2106                 int err2;
2107
2108                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2109                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2110                 atomic_inc(&tconn->current_epoch->epoch_size);
2111                 err2 = drbd_drain_block(mdev, pi->size);
2112                 if (!err)
2113                         err = err2;
2114                 return err;
2115         }
2116
2117         /*
2118          * Corresponding put_ldev done either below (on various errors), or in
2119          * drbd_peer_request_endio, if we successfully submit the data at the
2120          * end of this function.
2121          */
2122
2123         sector = be64_to_cpu(p->sector);
2124         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2125         if (!peer_req) {
2126                 put_ldev(mdev);
2127                 return -EIO;
2128         }
2129
2130         peer_req->w.cb = e_end_block;
2131
2132         dp_flags = be32_to_cpu(p->dp_flags);
2133         rw |= wire_flags_to_bio(mdev, dp_flags);
2134
2135         if (dp_flags & DP_MAY_SET_IN_SYNC)
2136                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2137
2138         spin_lock(&tconn->epoch_lock);
2139         peer_req->epoch = tconn->current_epoch;
2140         atomic_inc(&peer_req->epoch->epoch_size);
2141         atomic_inc(&peer_req->epoch->active);
2142         spin_unlock(&tconn->epoch_lock);
2143
2144         rcu_read_lock();
2145         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2146         rcu_read_unlock();
2147         if (tp) {
2148                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2149                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2150                 if (err)
2151                         goto out_interrupted;
2152                 spin_lock_irq(&mdev->tconn->req_lock);
2153                 err = handle_write_conflicts(mdev, peer_req);
2154                 if (err) {
2155                         spin_unlock_irq(&mdev->tconn->req_lock);
2156                         if (err == -ENOENT) {
2157                                 put_ldev(mdev);
2158                                 return 0;
2159                         }
2160                         goto out_interrupted;
2161                 }
2162         } else
2163                 spin_lock_irq(&mdev->tconn->req_lock);
2164         list_add(&peer_req->w.list, &mdev->active_ee);
2165         spin_unlock_irq(&mdev->tconn->req_lock);
2166
2167         if (mdev->state.conn == C_SYNC_TARGET)
2168                 wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2169
2170         if (mdev->tconn->agreed_pro_version < 100) {
2171                 rcu_read_lock();
2172                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2173                 case DRBD_PROT_C:
2174                         dp_flags |= DP_SEND_WRITE_ACK;
2175                         break;
2176                 case DRBD_PROT_B:
2177                         dp_flags |= DP_SEND_RECEIVE_ACK;
2178                         break;
2179                 }
2180                 rcu_read_unlock();
2181         }
2182
2183         if (dp_flags & DP_SEND_WRITE_ACK) {
2184                 peer_req->flags |= EE_SEND_WRITE_ACK;
2185                 inc_unacked(mdev);
2186                 /* corresponding dec_unacked() in e_end_block()
2187                  * respective _drbd_clear_done_ee */
2188         }
2189
2190         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2191                 /* I really don't like it that the receiver thread
2192                  * sends on the msock, but anyways */
2193                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2194         }
2195
2196         if (mdev->state.pdsk < D_INCONSISTENT) {
2197                 /* In case we have the only disk of the cluster, */
2198                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2199                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2200                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2201                 drbd_al_begin_io(mdev, &peer_req->i);
2202         }
2203
2204         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2205         if (!err)
2206                 return 0;
2207
2208         /* don't care for the reason here */
2209         dev_err(DEV, "submit failed, triggering re-connect\n");
2210         spin_lock_irq(&mdev->tconn->req_lock);
2211         list_del(&peer_req->w.list);
2212         drbd_remove_epoch_entry_interval(mdev, peer_req);
2213         spin_unlock_irq(&mdev->tconn->req_lock);
2214         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2215                 drbd_al_complete_io(mdev, &peer_req->i);
2216
2217 out_interrupted:
2218         drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2219         put_ldev(mdev);
2220         drbd_free_peer_req(mdev, peer_req);
2221         return err;
2222 }
2223
2224 /* We may throttle resync, if the lower device seems to be busy,
2225  * and current sync rate is above c_min_rate.
2226  *
2227  * To decide whether or not the lower device is busy, we use a scheme similar
2228  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2229  * (more than 64 sectors) of activity we cannot account for with our own resync
2230  * activity, it obviously is "busy".
2231  *
2232  * The current sync rate used here uses only the most recent two step marks,
2233  * to have a short time average so we can react faster.
2234  */
2235 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2236 {
2237         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2238         unsigned long db, dt, dbdt;
2239         struct lc_element *tmp;
2240         int curr_events;
2241         int throttle = 0;
2242         unsigned int c_min_rate;
2243
2244         rcu_read_lock();
2245         c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2246         rcu_read_unlock();
2247
2248         /* feature disabled? */
2249         if (c_min_rate == 0)
2250                 return 0;
2251
2252         spin_lock_irq(&mdev->al_lock);
2253         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2254         if (tmp) {
2255                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2256                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2257                         spin_unlock_irq(&mdev->al_lock);
2258                         return 0;
2259                 }
2260                 /* Do not slow down if app IO is already waiting for this extent */
2261         }
2262         spin_unlock_irq(&mdev->al_lock);
2263
2264         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2265                       (int)part_stat_read(&disk->part0, sectors[1]) -
2266                         atomic_read(&mdev->rs_sect_ev);
2267
2268         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2269                 unsigned long rs_left;
2270                 int i;
2271
2272                 mdev->rs_last_events = curr_events;
2273
2274                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2275                  * approx. */
2276                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2277
2278                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2279                         rs_left = mdev->ov_left;
2280                 else
2281                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2282
2283                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2284                 if (!dt)
2285                         dt++;
2286                 db = mdev->rs_mark_left[i] - rs_left;
2287                 dbdt = Bit2KB(db/dt);
2288
2289                 if (dbdt > c_min_rate)
2290                         throttle = 1;
2291         }
2292         return throttle;
2293 }
2294
2295
2296 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2297 {
2298         struct drbd_conf *mdev;
2299         sector_t sector;
2300         sector_t capacity;
2301         struct drbd_peer_request *peer_req;
2302         struct digest_info *di = NULL;
2303         int size, verb;
2304         unsigned int fault_type;
2305         struct p_block_req *p = pi->data;
2306
2307         mdev = vnr_to_mdev(tconn, pi->vnr);
2308         if (!mdev)
2309                 return -EIO;
2310         capacity = drbd_get_capacity(mdev->this_bdev);
2311
2312         sector = be64_to_cpu(p->sector);
2313         size   = be32_to_cpu(p->blksize);
2314
2315         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2316                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2317                                 (unsigned long long)sector, size);
2318                 return -EINVAL;
2319         }
2320         if (sector + (size>>9) > capacity) {
2321                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2322                                 (unsigned long long)sector, size);
2323                 return -EINVAL;
2324         }
2325
2326         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2327                 verb = 1;
2328                 switch (pi->cmd) {
2329                 case P_DATA_REQUEST:
2330                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2331                         break;
2332                 case P_RS_DATA_REQUEST:
2333                 case P_CSUM_RS_REQUEST:
2334                 case P_OV_REQUEST:
2335                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2336                         break;
2337                 case P_OV_REPLY:
2338                         verb = 0;
2339                         dec_rs_pending(mdev);
2340                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2341                         break;
2342                 default:
2343                         BUG();
2344                 }
2345                 if (verb && __ratelimit(&drbd_ratelimit_state))
2346                         dev_err(DEV, "Can not satisfy peer's read request, "
2347                             "no local data.\n");
2348
2349                 /* drain possibly payload */
2350                 return drbd_drain_block(mdev, pi->size);
2351         }
2352
2353         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2354          * "criss-cross" setup, that might cause write-out on some other DRBD,
2355          * which in turn might block on the other node at this very place.  */
2356         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2357         if (!peer_req) {
2358                 put_ldev(mdev);
2359                 return -ENOMEM;
2360         }
2361
2362         switch (pi->cmd) {
2363         case P_DATA_REQUEST:
2364                 peer_req->w.cb = w_e_end_data_req;
2365                 fault_type = DRBD_FAULT_DT_RD;
2366                 /* application IO, don't drbd_rs_begin_io */
2367                 goto submit;
2368
2369         case P_RS_DATA_REQUEST:
2370                 peer_req->w.cb = w_e_end_rsdata_req;
2371                 fault_type = DRBD_FAULT_RS_RD;
2372                 /* used in the sector offset progress display */
2373                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2374                 break;
2375
2376         case P_OV_REPLY:
2377         case P_CSUM_RS_REQUEST:
2378                 fault_type = DRBD_FAULT_RS_RD;
2379                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2380                 if (!di)
2381                         goto out_free_e;
2382
2383                 di->digest_size = pi->size;
2384                 di->digest = (((char *)di)+sizeof(struct digest_info));
2385
2386                 peer_req->digest = di;
2387                 peer_req->flags |= EE_HAS_DIGEST;
2388
2389                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2390                         goto out_free_e;
2391
2392                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2393                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2394                         peer_req->w.cb = w_e_end_csum_rs_req;
2395                         /* used in the sector offset progress display */
2396                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2397                 } else if (pi->cmd == P_OV_REPLY) {
2398                         /* track progress, we may need to throttle */
2399                         atomic_add(size >> 9, &mdev->rs_sect_in);
2400                         peer_req->w.cb = w_e_end_ov_reply;
2401                         dec_rs_pending(mdev);
2402                         /* drbd_rs_begin_io done when we sent this request,
2403                          * but accounting still needs to be done. */
2404                         goto submit_for_resync;
2405                 }
2406                 break;
2407
2408         case P_OV_REQUEST:
2409                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2410                     mdev->tconn->agreed_pro_version >= 90) {
2411                         unsigned long now = jiffies;
2412                         int i;
2413                         mdev->ov_start_sector = sector;
2414                         mdev->ov_position = sector;
2415                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2416                         mdev->rs_total = mdev->ov_left;
2417                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2418                                 mdev->rs_mark_left[i] = mdev->ov_left;
2419                                 mdev->rs_mark_time[i] = now;
2420                         }
2421                         dev_info(DEV, "Online Verify start sector: %llu\n",
2422                                         (unsigned long long)sector);
2423                 }
2424                 peer_req->w.cb = w_e_end_ov_req;
2425                 fault_type = DRBD_FAULT_RS_RD;
2426                 break;
2427
2428         default:
2429                 BUG();
2430         }
2431
2432         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2433          * wrt the receiver, but it is not as straightforward as it may seem.
2434          * Various places in the resync start and stop logic assume resync
2435          * requests are processed in order, requeuing this on the worker thread
2436          * introduces a bunch of new code for synchronization between threads.
2437          *
2438          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2439          * "forever", throttling after drbd_rs_begin_io will lock that extent
2440          * for application writes for the same time.  For now, just throttle
2441          * here, where the rest of the code expects the receiver to sleep for
2442          * a while, anyways.
2443          */
2444
2445         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2446          * this defers syncer requests for some time, before letting at least
2447          * on request through.  The resync controller on the receiving side
2448          * will adapt to the incoming rate accordingly.
2449          *
2450          * We cannot throttle here if remote is Primary/SyncTarget:
2451          * we would also throttle its application reads.
2452          * In that case, throttling is done on the SyncTarget only.
2453          */
2454         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2455                 schedule_timeout_uninterruptible(HZ/10);
2456         if (drbd_rs_begin_io(mdev, sector))
2457                 goto out_free_e;
2458
2459 submit_for_resync:
2460         atomic_add(size >> 9, &mdev->rs_sect_ev);
2461
2462 submit:
2463         inc_unacked(mdev);
2464         spin_lock_irq(&mdev->tconn->req_lock);
2465         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2466         spin_unlock_irq(&mdev->tconn->req_lock);
2467
2468         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2469                 return 0;
2470
2471         /* don't care for the reason here */
2472         dev_err(DEV, "submit failed, triggering re-connect\n");
2473         spin_lock_irq(&mdev->tconn->req_lock);
2474         list_del(&peer_req->w.list);
2475         spin_unlock_irq(&mdev->tconn->req_lock);
2476         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2477
2478 out_free_e:
2479         put_ldev(mdev);
2480         drbd_free_peer_req(mdev, peer_req);
2481         return -EIO;
2482 }
2483
2484 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2485 {
2486         int self, peer, rv = -100;
2487         unsigned long ch_self, ch_peer;
2488         enum drbd_after_sb_p after_sb_0p;
2489
2490         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2491         peer = mdev->p_uuid[UI_BITMAP] & 1;
2492
2493         ch_peer = mdev->p_uuid[UI_SIZE];
2494         ch_self = mdev->comm_bm_set;
2495
2496         rcu_read_lock();
2497         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2498         rcu_read_unlock();
2499         switch (after_sb_0p) {
2500         case ASB_CONSENSUS:
2501         case ASB_DISCARD_SECONDARY:
2502         case ASB_CALL_HELPER:
2503         case ASB_VIOLENTLY:
2504                 dev_err(DEV, "Configuration error.\n");
2505                 break;
2506         case ASB_DISCONNECT:
2507                 break;
2508         case ASB_DISCARD_YOUNGER_PRI:
2509                 if (self == 0 && peer == 1) {
2510                         rv = -1;
2511                         break;
2512                 }
2513                 if (self == 1 && peer == 0) {
2514                         rv =  1;
2515                         break;
2516                 }
2517                 /* Else fall through to one of the other strategies... */
2518         case ASB_DISCARD_OLDER_PRI:
2519                 if (self == 0 && peer == 1) {
2520                         rv = 1;
2521                         break;
2522                 }
2523                 if (self == 1 && peer == 0) {
2524                         rv = -1;
2525                         break;
2526                 }
2527                 /* Else fall through to one of the other strategies... */
2528                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2529                      "Using discard-least-changes instead\n");
2530         case ASB_DISCARD_ZERO_CHG:
2531                 if (ch_peer == 0 && ch_self == 0) {
2532                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2533                                 ? -1 : 1;
2534                         break;
2535                 } else {
2536                         if (ch_peer == 0) { rv =  1; break; }
2537                         if (ch_self == 0) { rv = -1; break; }
2538                 }
2539                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2540                         break;
2541         case ASB_DISCARD_LEAST_CHG:
2542                 if      (ch_self < ch_peer)
2543                         rv = -1;
2544                 else if (ch_self > ch_peer)
2545                         rv =  1;
2546                 else /* ( ch_self == ch_peer ) */
2547                      /* Well, then use something else. */
2548                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2549                                 ? -1 : 1;
2550                 break;
2551         case ASB_DISCARD_LOCAL:
2552                 rv = -1;
2553                 break;
2554         case ASB_DISCARD_REMOTE:
2555                 rv =  1;
2556         }
2557
2558         return rv;
2559 }
2560
2561 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2562 {
2563         int hg, rv = -100;
2564         enum drbd_after_sb_p after_sb_1p;
2565
2566         rcu_read_lock();
2567         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2568         rcu_read_unlock();
2569         switch (after_sb_1p) {
2570         case ASB_DISCARD_YOUNGER_PRI:
2571         case ASB_DISCARD_OLDER_PRI:
2572         case ASB_DISCARD_LEAST_CHG:
2573         case ASB_DISCARD_LOCAL:
2574         case ASB_DISCARD_REMOTE:
2575         case ASB_DISCARD_ZERO_CHG:
2576                 dev_err(DEV, "Configuration error.\n");
2577                 break;
2578         case ASB_DISCONNECT:
2579                 break;
2580         case ASB_CONSENSUS:
2581                 hg = drbd_asb_recover_0p(mdev);
2582                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2583                         rv = hg;
2584                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2585                         rv = hg;
2586                 break;
2587         case ASB_VIOLENTLY:
2588                 rv = drbd_asb_recover_0p(mdev);
2589                 break;
2590         case ASB_DISCARD_SECONDARY:
2591                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2592         case ASB_CALL_HELPER:
2593                 hg = drbd_asb_recover_0p(mdev);
2594                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2595                         enum drbd_state_rv rv2;
2596
2597                         drbd_set_role(mdev, R_SECONDARY, 0);
2598                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2599                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2600                           * we do not need to wait for the after state change work either. */
2601                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2602                         if (rv2 != SS_SUCCESS) {
2603                                 drbd_khelper(mdev, "pri-lost-after-sb");
2604                         } else {
2605                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2606                                 rv = hg;
2607                         }
2608                 } else
2609                         rv = hg;
2610         }
2611
2612         return rv;
2613 }
2614
2615 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2616 {
2617         int hg, rv = -100;
2618         enum drbd_after_sb_p after_sb_2p;
2619
2620         rcu_read_lock();
2621         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2622         rcu_read_unlock();
2623         switch (after_sb_2p) {
2624         case ASB_DISCARD_YOUNGER_PRI:
2625         case ASB_DISCARD_OLDER_PRI:
2626         case ASB_DISCARD_LEAST_CHG:
2627         case ASB_DISCARD_LOCAL:
2628         case ASB_DISCARD_REMOTE:
2629         case ASB_CONSENSUS:
2630         case ASB_DISCARD_SECONDARY:
2631         case ASB_DISCARD_ZERO_CHG:
2632                 dev_err(DEV, "Configuration error.\n");
2633                 break;
2634         case ASB_VIOLENTLY:
2635                 rv = drbd_asb_recover_0p(mdev);
2636                 break;
2637         case ASB_DISCONNECT:
2638                 break;
2639         case ASB_CALL_HELPER:
2640                 hg = drbd_asb_recover_0p(mdev);
2641                 if (hg == -1) {
2642                         enum drbd_state_rv rv2;
2643
2644                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2645                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2646                           * we do not need to wait for the after state change work either. */
2647                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2648                         if (rv2 != SS_SUCCESS) {
2649                                 drbd_khelper(mdev, "pri-lost-after-sb");
2650                         } else {
2651                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2652                                 rv = hg;
2653                         }
2654                 } else
2655                         rv = hg;
2656         }
2657
2658         return rv;
2659 }
2660
2661 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2662                            u64 bits, u64 flags)
2663 {
2664         if (!uuid) {
2665                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2666                 return;
2667         }
2668         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2669              text,
2670              (unsigned long long)uuid[UI_CURRENT],
2671              (unsigned long long)uuid[UI_BITMAP],
2672              (unsigned long long)uuid[UI_HISTORY_START],
2673              (unsigned long long)uuid[UI_HISTORY_END],
2674              (unsigned long long)bits,
2675              (unsigned long long)flags);
2676 }
2677
2678 /*
2679   100   after split brain try auto recover
2680     2   C_SYNC_SOURCE set BitMap
2681     1   C_SYNC_SOURCE use BitMap
2682     0   no Sync
2683    -1   C_SYNC_TARGET use BitMap
2684    -2   C_SYNC_TARGET set BitMap
2685  -100   after split brain, disconnect
2686 -1000   unrelated data
2687 -1091   requires proto 91
2688 -1096   requires proto 96
2689  */
2690 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2691 {
2692         u64 self, peer;
2693         int i, j;
2694
2695         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2696         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2697
2698         *rule_nr = 10;
2699         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2700                 return 0;
2701
2702         *rule_nr = 20;
2703         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2704              peer != UUID_JUST_CREATED)
2705                 return -2;
2706
2707         *rule_nr = 30;
2708         if (self != UUID_JUST_CREATED &&
2709             (peer == UUID_JUST_CREATED || peer == (u64)0))
2710                 return 2;
2711
2712         if (self == peer) {
2713                 int rct, dc; /* roles at crash time */
2714
2715                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2716
2717                         if (mdev->tconn->agreed_pro_version < 91)
2718                                 return -1091;
2719
2720                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2721                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2722                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2723                                 drbd_uuid_set_bm(mdev, 0UL);
2724
2725                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2726                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2727                                 *rule_nr = 34;
2728                         } else {
2729                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2730                                 *rule_nr = 36;
2731                         }
2732
2733                         return 1;
2734                 }
2735
2736                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2737
2738                         if (mdev->tconn->agreed_pro_version < 91)
2739                                 return -1091;
2740
2741                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2742                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2743                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2744
2745                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2746                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2747                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2748
2749                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2750                                 *rule_nr = 35;
2751                         } else {
2752                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2753                                 *rule_nr = 37;
2754                         }
2755
2756                         return -1;
2757                 }
2758
2759                 /* Common power [off|failure] */
2760                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2761                         (mdev->p_uuid[UI_FLAGS] & 2);
2762                 /* lowest bit is set when we were primary,
2763                  * next bit (weight 2) is set when peer was primary */
2764                 *rule_nr = 40;
2765
2766                 switch (rct) {
2767                 case 0: /* !self_pri && !peer_pri */ return 0;
2768                 case 1: /*  self_pri && !peer_pri */ return 1;
2769                 case 2: /* !self_pri &&  peer_pri */ return -1;
2770                 case 3: /*  self_pri &&  peer_pri */
2771                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2772                         return dc ? -1 : 1;
2773                 }
2774         }
2775
2776         *rule_nr = 50;
2777         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2778         if (self == peer)
2779                 return -1;
2780
2781         *rule_nr = 51;
2782         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2783         if (self == peer) {
2784                 if (mdev->tconn->agreed_pro_version < 96 ?
2785                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2786                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2787                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2788                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2789                            resync as sync source modifications of the peer's UUIDs. */
2790
2791                         if (mdev->tconn->agreed_pro_version < 91)
2792                                 return -1091;
2793
2794                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2795                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2796
2797                         dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2798                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2799
2800                         return -1;
2801                 }
2802         }
2803
2804         *rule_nr = 60;
2805         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2806         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2807                 peer = mdev->p_uuid[i] & ~((u64)1);
2808                 if (self == peer)
2809                         return -2;
2810         }
2811
2812         *rule_nr = 70;
2813         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2814         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2815         if (self == peer)
2816                 return 1;
2817
2818         *rule_nr = 71;
2819         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2820         if (self == peer) {
2821                 if (mdev->tconn->agreed_pro_version < 96 ?
2822                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2823                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2824                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2825                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2826                            resync as sync source modifications of our UUIDs. */
2827
2828                         if (mdev->tconn->agreed_pro_version < 91)
2829                                 return -1091;
2830
2831                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2832                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2833
2834                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2835                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2836                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2837
2838                         return 1;
2839                 }
2840         }
2841
2842
2843         *rule_nr = 80;
2844         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2845         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2846                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2847                 if (self == peer)
2848                         return 2;
2849         }
2850
2851         *rule_nr = 90;
2852         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2853         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2854         if (self == peer && self != ((u64)0))
2855                 return 100;
2856
2857         *rule_nr = 100;
2858         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2859                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2860                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2861                         peer = mdev->p_uuid[j] & ~((u64)1);
2862                         if (self == peer)
2863                                 return -100;
2864                 }
2865         }
2866
2867         return -1000;
2868 }
2869
2870 /* drbd_sync_handshake() returns the new conn state on success, or
2871    CONN_MASK (-1) on failure.
2872  */
2873 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2874                                            enum drbd_disk_state peer_disk) __must_hold(local)
2875 {
2876         enum drbd_conns rv = C_MASK;
2877         enum drbd_disk_state mydisk;
2878         struct net_conf *nc;
2879         int hg, rule_nr, rr_conflict, tentative;
2880
2881         mydisk = mdev->state.disk;
2882         if (mydisk == D_NEGOTIATING)
2883                 mydisk = mdev->new_state_tmp.disk;
2884
2885         dev_info(DEV, "drbd_sync_handshake:\n");
2886         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2887         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2888                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2889
2890         hg = drbd_uuid_compare(mdev, &rule_nr);
2891
2892         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2893
2894         if (hg == -1000) {
2895                 dev_alert(DEV, "Unrelated data, aborting!\n");
2896                 return C_MASK;
2897         }
2898         if (hg < -1000) {
2899                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2900                 return C_MASK;
2901         }
2902
2903         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2904             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2905                 int f = (hg == -100) || abs(hg) == 2;
2906                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2907                 if (f)
2908                         hg = hg*2;
2909                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2910                      hg > 0 ? "source" : "target");
2911         }
2912
2913         if (abs(hg) == 100)
2914                 drbd_khelper(mdev, "initial-split-brain");
2915
2916         rcu_read_lock();
2917         nc = rcu_dereference(mdev->tconn->net_conf);
2918
2919         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2920                 int pcount = (mdev->state.role == R_PRIMARY)
2921                            + (peer_role == R_PRIMARY);
2922                 int forced = (hg == -100);
2923
2924                 switch (pcount) {
2925                 case 0:
2926                         hg = drbd_asb_recover_0p(mdev);
2927                         break;
2928                 case 1:
2929                         hg = drbd_asb_recover_1p(mdev);
2930                         break;
2931                 case 2:
2932                         hg = drbd_asb_recover_2p(mdev);
2933                         break;
2934                 }
2935                 if (abs(hg) < 100) {
2936                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2937                              "automatically solved. Sync from %s node\n",
2938                              pcount, (hg < 0) ? "peer" : "this");
2939                         if (forced) {
2940                                 dev_warn(DEV, "Doing a full sync, since"
2941                                      " UUIDs where ambiguous.\n");
2942                                 hg = hg*2;
2943                         }
2944                 }
2945         }
2946
2947         if (hg == -100) {
2948                 if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
2949                         hg = -1;
2950                 if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
2951                         hg = 1;
2952
2953                 if (abs(hg) < 100)
2954                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2955                              "Sync from %s node\n",
2956                              (hg < 0) ? "peer" : "this");
2957         }
2958         rr_conflict = nc->rr_conflict;
2959         tentative = nc->tentative;
2960         rcu_read_unlock();
2961
2962         if (hg == -100) {
2963                 /* FIXME this log message is not correct if we end up here
2964                  * after an attempted attach on a diskless node.
2965                  * We just refuse to attach -- well, we drop the "connection"
2966                  * to that disk, in a way... */
2967                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2968                 drbd_khelper(mdev, "split-brain");
2969                 return C_MASK;
2970         }
2971
2972         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2973                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2974                 return C_MASK;
2975         }
2976
2977         if (hg < 0 && /* by intention we do not use mydisk here. */
2978             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2979                 switch (rr_conflict) {
2980                 case ASB_CALL_HELPER:
2981                         drbd_khelper(mdev, "pri-lost");
2982                         /* fall through */
2983                 case ASB_DISCONNECT:
2984                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2985                         return C_MASK;
2986                 case ASB_VIOLENTLY:
2987                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2988                              "assumption\n");
2989                 }
2990         }
2991
2992         if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2993                 if (hg == 0)
2994                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2995                 else
2996                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2997                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2998                                  abs(hg) >= 2 ? "full" : "bit-map based");
2999                 return C_MASK;
3000         }
3001
3002         if (abs(hg) >= 2) {
3003                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3004                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3005                                         BM_LOCKED_SET_ALLOWED))
3006                         return C_MASK;
3007         }
3008
3009         if (hg > 0) { /* become sync source. */
3010                 rv = C_WF_BITMAP_S;
3011         } else if (hg < 0) { /* become sync target */
3012                 rv = C_WF_BITMAP_T;
3013         } else {
3014                 rv = C_CONNECTED;
3015                 if (drbd_bm_total_weight(mdev)) {
3016                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3017                              drbd_bm_total_weight(mdev));
3018                 }
3019         }
3020
3021         return rv;
3022 }
3023
3024 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3025 {
3026         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3027         if (peer == ASB_DISCARD_REMOTE)
3028                 return ASB_DISCARD_LOCAL;
3029
3030         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3031         if (peer == ASB_DISCARD_LOCAL)
3032                 return ASB_DISCARD_REMOTE;
3033
3034         /* everything else is valid if they are equal on both sides. */
3035         return peer;
3036 }
3037
3038 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3039 {
3040         struct p_protocol *p = pi->data;
3041         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3042         int p_proto, p_discard_my_data, p_two_primaries, cf;
3043         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3044         char integrity_alg[SHARED_SECRET_MAX] = "";
3045         struct crypto_hash *peer_integrity_tfm = NULL;
3046         void *int_dig_in = NULL, *int_dig_vv = NULL;
3047
3048         p_proto         = be32_to_cpu(p->protocol);
3049         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3050         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3051         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3052         p_two_primaries = be32_to_cpu(p->two_primaries);
3053         cf              = be32_to_cpu(p->conn_flags);
3054         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3055
3056         if (tconn->agreed_pro_version >= 87) {
3057                 int err;
3058
3059                 if (pi->size > sizeof(integrity_alg))
3060                         return -EIO;
3061                 err = drbd_recv_all(tconn, integrity_alg, pi->size);
3062                 if (err)
3063                         return err;
3064                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3065         }
3066
3067         if (pi->cmd != P_PROTOCOL_UPDATE) {
3068                 clear_bit(CONN_DRY_RUN, &tconn->flags);
3069
3070                 if (cf & CF_DRY_RUN)
3071                         set_bit(CONN_DRY_RUN, &tconn->flags);
3072
3073                 rcu_read_lock();
3074                 nc = rcu_dereference(tconn->net_conf);
3075
3076                 if (p_proto != nc->wire_protocol) {
3077                         conn_err(tconn, "incompatible %s settings\n", "protocol");
3078                         goto disconnect_rcu_unlock;
3079                 }
3080
3081                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3082                         conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3083                         goto disconnect_rcu_unlock;
3084                 }
3085
3086                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3087                         conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3088                         goto disconnect_rcu_unlock;
3089                 }
3090
3091                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3092                         conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3093                         goto disconnect_rcu_unlock;
3094                 }
3095
3096                 if (p_discard_my_data && nc->discard_my_data) {
3097                         conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3098                         goto disconnect_rcu_unlock;
3099                 }
3100
3101                 if (p_two_primaries != nc->two_primaries) {
3102                         conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3103                         goto disconnect_rcu_unlock;
3104                 }
3105
3106                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3107                         conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3108                         goto disconnect_rcu_unlock;
3109                 }
3110
3111                 rcu_read_unlock();
3112         }
3113
3114         if (integrity_alg[0]) {
3115                 int hash_size;
3116
3117                 /*
3118                  * We can only change the peer data integrity algorithm
3119                  * here.  Changing our own data integrity algorithm
3120                  * requires that we send a P_PROTOCOL_UPDATE packet at
3121                  * the same time; otherwise, the peer has no way to
3122                  * tell between which packets the algorithm should
3123                  * change.
3124                  */
3125
3126                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3127                 if (!peer_integrity_tfm) {
3128                         conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3129                                  integrity_alg);
3130                         goto disconnect;
3131                 }
3132
3133                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3134                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3135                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3136                 if (!(int_dig_in && int_dig_vv)) {
3137                         conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3138                         goto disconnect;
3139                 }
3140         }
3141
3142         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3143         if (!new_net_conf) {
3144                 conn_err(tconn, "Allocation of new net_conf failed\n");
3145                 goto disconnect;
3146         }
3147
3148         mutex_lock(&tconn->data.mutex);
3149         mutex_lock(&tconn->conf_update);
3150         old_net_conf = tconn->net_conf;
3151         *new_net_conf = *old_net_conf;
3152
3153         new_net_conf->wire_protocol = p_proto;
3154         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3155         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3156         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3157         new_net_conf->two_primaries = p_two_primaries;
3158
3159         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3160         mutex_unlock(&tconn->conf_update);
3161         mutex_unlock(&tconn->data.mutex);
3162
3163         crypto_free_hash(tconn->peer_integrity_tfm);
3164         kfree(tconn->int_dig_in);
3165         kfree(tconn->int_dig_vv);
3166         tconn->peer_integrity_tfm = peer_integrity_tfm;
3167         tconn->int_dig_in = int_dig_in;
3168         tconn->int_dig_vv = int_dig_vv;
3169
3170         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3171                 conn_info(tconn, "peer data-integrity-alg: %s\n",
3172                           integrity_alg[0] ? integrity_alg : "(none)");
3173
3174         synchronize_rcu();
3175         kfree(old_net_conf);
3176         return 0;
3177
3178 disconnect_rcu_unlock:
3179         rcu_read_unlock();
3180 disconnect:
3181         crypto_free_hash(peer_integrity_tfm);
3182         kfree(int_dig_in);
3183         kfree(int_dig_vv);
3184         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3185         return -EIO;
3186 }
3187
3188 /* helper function
3189  * input: alg name, feature name
3190  * return: NULL (alg name was "")
3191  *         ERR_PTR(error) if something goes wrong
3192  *         or the crypto hash ptr, if it worked out ok. */
3193 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3194                 const char *alg, const char *name)
3195 {
3196         struct crypto_hash *tfm;
3197
3198         if (!alg[0])
3199                 return NULL;
3200
3201         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3202         if (IS_ERR(tfm)) {
3203                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3204                         alg, name, PTR_ERR(tfm));
3205                 return tfm;
3206         }
3207         return tfm;
3208 }
3209
3210 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3211 {
3212         void *buffer = tconn->data.rbuf;
3213         int size = pi->size;
3214
3215         while (size) {
3216                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3217                 s = drbd_recv(tconn, buffer, s);
3218                 if (s <= 0) {
3219                         if (s < 0)
3220                                 return s;
3221                         break;
3222                 }
3223                 size -= s;
3224         }
3225         if (size)
3226                 return -EIO;
3227         return 0;
3228 }
3229
3230 /*
3231  * config_unknown_volume  -  device configuration command for unknown volume
3232  *
3233  * When a device is added to an existing connection, the node on which the
3234  * device is added first will send configuration commands to its peer but the
3235  * peer will not know about the device yet.  It will warn and ignore these
3236  * commands.  Once the device is added on the second node, the second node will
3237  * send the same device configuration commands, but in the other direction.
3238  *
3239  * (We can also end up here if drbd is misconfigured.)
3240  */
3241 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3242 {
3243         conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3244                   cmdname(pi->cmd), pi->vnr);
3245         return ignore_remaining_packet(tconn, pi);
3246 }
3247
3248 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3249 {
3250         struct drbd_conf *mdev;
3251         struct p_rs_param_95 *p;
3252         unsigned int header_size, data_size, exp_max_sz;
3253         struct crypto_hash *verify_tfm = NULL;
3254         struct crypto_hash *csums_tfm = NULL;
3255         struct net_conf *old_net_conf, *new_net_conf = NULL;
3256         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3257         const int apv = tconn->agreed_pro_version;
3258         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3259         int fifo_size = 0;
3260         int err;
3261
3262         mdev = vnr_to_mdev(tconn, pi->vnr);
3263         if (!mdev)
3264                 return config_unknown_volume(tconn, pi);
3265
3266         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3267                     : apv == 88 ? sizeof(struct p_rs_param)
3268                                         + SHARED_SECRET_MAX
3269                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3270                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3271
3272         if (pi->size > exp_max_sz) {
3273                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3274                     pi->size, exp_max_sz);
3275                 return -EIO;
3276         }
3277
3278         if (apv <= 88) {
3279                 header_size = sizeof(struct p_rs_param);
3280                 data_size = pi->size - header_size;
3281         } else if (apv <= 94) {
3282                 header_size = sizeof(struct p_rs_param_89);
3283                 data_size = pi->size - header_size;
3284                 D_ASSERT(data_size == 0);
3285         } else {
3286                 header_size = sizeof(struct p_rs_param_95);
3287                 data_size = pi->size - header_size;
3288                 D_ASSERT(data_size == 0);
3289         }
3290
3291         /* initialize verify_alg and csums_alg */
3292         p = pi->data;
3293         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3294
3295         err = drbd_recv_all(mdev->tconn, p, header_size);
3296         if (err)
3297                 return err;
3298
3299         mutex_lock(&mdev->tconn->conf_update);
3300         old_net_conf = mdev->tconn->net_conf;
3301         if (get_ldev(mdev)) {
3302                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3303                 if (!new_disk_conf) {
3304                         put_ldev(mdev);
3305                         mutex_unlock(&mdev->tconn->conf_update);
3306                         dev_err(DEV, "Allocation of new disk_conf failed\n");
3307                         return -ENOMEM;
3308                 }
3309
3310                 old_disk_conf = mdev->ldev->disk_conf;
3311                 *new_disk_conf = *old_disk_conf;
3312
3313                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3314         }
3315
3316         if (apv >= 88) {
3317                 if (apv == 88) {
3318                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3319                                 dev_err(DEV, "verify-alg of wrong size, "
3320                                         "peer wants %u, accepting only up to %u byte\n",
3321                                         data_size, SHARED_SECRET_MAX);
3322                                 err = -EIO;
3323                                 goto reconnect;
3324                         }
3325
3326                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3327                         if (err)
3328                                 goto reconnect;
3329                         /* we expect NUL terminated string */
3330                         /* but just in case someone tries to be evil */
3331                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3332                         p->verify_alg[data_size-1] = 0;
3333
3334                 } else /* apv >= 89 */ {
3335                         /* we still expect NUL terminated strings */
3336                         /* but just in case someone tries to be evil */
3337                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3338                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3339                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3340                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3341                 }
3342
3343                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3344                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3345                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3346                                     old_net_conf->verify_alg, p->verify_alg);
3347                                 goto disconnect;
3348                         }
3349                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3350                                         p->verify_alg, "verify-alg");
3351                         if (IS_ERR(verify_tfm)) {
3352                                 verify_tfm = NULL;
3353                                 goto disconnect;
3354                         }
3355                 }
3356
3357                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3358                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3359                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3360                                     old_net_conf->csums_alg, p->csums_alg);
3361                                 goto disconnect;
3362                         }
3363                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3364                                         p->csums_alg, "csums-alg");
3365                         if (IS_ERR(csums_tfm)) {
3366                                 csums_tfm = NULL;
3367                                 goto disconnect;
3368                         }
3369                 }
3370
3371                 if (apv > 94 && new_disk_conf) {
3372                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3373                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3374                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3375                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3376
3377                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3378                         if (fifo_size != mdev->rs_plan_s->size) {
3379                                 new_plan = fifo_alloc(fifo_size);
3380                                 if (!new_plan) {
3381                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3382                                         put_ldev(mdev);
3383                                         goto disconnect;
3384                                 }
3385                         }
3386                 }
3387
3388                 if (verify_tfm || csums_tfm) {
3389                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3390                         if (!new_net_conf) {
3391                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3392                                 goto disconnect;
3393                         }
3394
3395                         *new_net_conf = *old_net_conf;
3396
3397                         if (verify_tfm) {
3398                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3399                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3400                                 crypto_free_hash(mdev->tconn->verify_tfm);
3401                                 mdev->tconn->verify_tfm = verify_tfm;
3402                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3403                         }
3404                         if (csums_tfm) {
3405                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3406                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3407                                 crypto_free_hash(mdev->tconn->csums_tfm);
3408                                 mdev->tconn->csums_tfm = csums_tfm;
3409                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3410                         }
3411                         rcu_assign_pointer(tconn->net_conf, new_net_conf);
3412                 }
3413         }
3414
3415         if (new_disk_conf) {
3416                 rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3417                 put_ldev(mdev);
3418         }
3419
3420         if (new_plan) {
3421                 old_plan = mdev->rs_plan_s;
3422                 rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3423         }
3424
3425         mutex_unlock(&mdev->tconn->conf_update);
3426         synchronize_rcu();
3427         if (new_net_conf)
3428                 kfree(old_net_conf);
3429         kfree(old_disk_conf);
3430         kfree(old_plan);
3431
3432         return 0;
3433
3434 reconnect:
3435         if (new_disk_conf) {
3436                 put_ldev(mdev);
3437                 kfree(new_disk_conf);
3438         }
3439         mutex_unlock(&mdev->tconn->conf_update);
3440         return -EIO;
3441
3442 disconnect:
3443         kfree(new_plan);
3444         if (new_disk_conf) {
3445                 put_ldev(mdev);
3446                 kfree(new_disk_conf);
3447         }
3448         mutex_unlock(&mdev->tconn->conf_update);
3449         /* just for completeness: actually not needed,
3450          * as this is not reached if csums_tfm was ok. */
3451         crypto_free_hash(csums_tfm);
3452         /* but free the verify_tfm again, if csums_tfm did not work out */
3453         crypto_free_hash(verify_tfm);
3454         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3455         return -EIO;
3456 }
3457
3458 /* warn if the arguments differ by more than 12.5% */
3459 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3460         const char *s, sector_t a, sector_t b)
3461 {
3462         sector_t d;
3463         if (a == 0 || b == 0)
3464                 return;
3465         d = (a > b) ? (a - b) : (b - a);
3466         if (d > (a>>3) || d > (b>>3))
3467                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3468                      (unsigned long long)a, (unsigned long long)b);
3469 }
3470
3471 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3472 {
3473         struct drbd_conf *mdev;
3474         struct p_sizes *p = pi->data;
3475         enum determine_dev_size dd = unchanged;
3476         sector_t p_size, p_usize, my_usize;
3477         int ldsc = 0; /* local disk size changed */
3478         enum dds_flags ddsf;
3479
3480         mdev = vnr_to_mdev(tconn, pi->vnr);
3481         if (!mdev)
3482                 return config_unknown_volume(tconn, pi);
3483
3484         p_size = be64_to_cpu(p->d_size);
3485         p_usize = be64_to_cpu(p->u_size);
3486
3487         /* just store the peer's disk size for now.
3488          * we still need to figure out whether we accept that. */
3489         mdev->p_size = p_size;
3490
3491         if (get_ldev(mdev)) {
3492                 rcu_read_lock();
3493                 my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3494                 rcu_read_unlock();
3495
3496                 warn_if_differ_considerably(mdev, "lower level device sizes",
3497                            p_size, drbd_get_max_capacity(mdev->ldev));
3498                 warn_if_differ_considerably(mdev, "user requested size",
3499                                             p_usize, my_usize);
3500
3501                 /* if this is the first connect, or an otherwise expected
3502                  * param exchange, choose the minimum */
3503                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3504                         p_usize = min_not_zero(my_usize, p_usize);
3505
3506                 /* Never shrink a device with usable data during connect.
3507                    But allow online shrinking if we are connected. */
3508                 if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3509                     drbd_get_capacity(mdev->this_bdev) &&
3510                     mdev->state.disk >= D_OUTDATED &&
3511                     mdev->state.conn < C_CONNECTED) {
3512                         dev_err(DEV, "The peer's disk size is too small!\n");
3513                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3514                         put_ldev(mdev);
3515                         return -EIO;
3516                 }
3517
3518                 if (my_usize != p_usize) {
3519                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3520
3521                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3522                         if (!new_disk_conf) {
3523                                 dev_err(DEV, "Allocation of new disk_conf failed\n");
3524                                 put_ldev(mdev);
3525                                 return -ENOMEM;
3526                         }
3527
3528                         mutex_lock(&mdev->tconn->conf_update);
3529                         old_disk_conf = mdev->ldev->disk_conf;
3530                         *new_disk_conf = *old_disk_conf;
3531                         new_disk_conf->disk_size = p_usize;
3532
3533                         rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3534                         mutex_unlock(&mdev->tconn->conf_update);
3535                         synchronize_rcu();
3536                         kfree(old_disk_conf);
3537
3538                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3539                                  (unsigned long)my_usize);
3540                 }
3541
3542                 put_ldev(mdev);
3543         }
3544
3545         ddsf = be16_to_cpu(p->dds_flags);
3546         if (get_ldev(mdev)) {
3547                 dd = drbd_determine_dev_size(mdev, ddsf);
3548                 put_ldev(mdev);
3549                 if (dd == dev_size_error)
3550                         return -EIO;
3551                 drbd_md_sync(mdev);
3552         } else {
3553                 /* I am diskless, need to accept the peer's size. */
3554                 drbd_set_my_capacity(mdev, p_size);
3555         }
3556
3557         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3558         drbd_reconsider_max_bio_size(mdev);
3559
3560         if (get_ldev(mdev)) {
3561                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3562                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3563                         ldsc = 1;
3564                 }
3565
3566                 put_ldev(mdev);
3567         }
3568
3569         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3570                 if (be64_to_cpu(p->c_size) !=
3571                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3572                         /* we have different sizes, probably peer
3573                          * needs to know my new size... */
3574                         drbd_send_sizes(mdev, 0, ddsf);
3575                 }
3576                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3577                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3578                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3579                             mdev->state.disk >= D_INCONSISTENT) {
3580                                 if (ddsf & DDSF_NO_RESYNC)
3581                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3582                                 else
3583                                         resync_after_online_grow(mdev);
3584                         } else
3585                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3586                 }
3587         }
3588
3589         return 0;
3590 }
3591
3592 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3593 {
3594         struct drbd_conf *mdev;
3595         struct p_uuids *p = pi->data;
3596         u64 *p_uuid;
3597         int i, updated_uuids = 0;
3598
3599         mdev = vnr_to_mdev(tconn, pi->vnr);
3600         if (!mdev)
3601                 return config_unknown_volume(tconn, pi);
3602
3603         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3604
3605         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3606                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3607
3608         kfree(mdev->p_uuid);
3609         mdev->p_uuid = p_uuid;
3610
3611         if (mdev->state.conn < C_CONNECTED &&
3612             mdev->state.disk < D_INCONSISTENT &&
3613             mdev->state.role == R_PRIMARY &&
3614             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3615                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3616                     (unsigned long long)mdev->ed_uuid);
3617                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3618                 return -EIO;
3619         }
3620
3621         if (get_ldev(mdev)) {
3622                 int skip_initial_sync =
3623                         mdev->state.conn == C_CONNECTED &&
3624                         mdev->tconn->agreed_pro_version >= 90 &&
3625                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3626                         (p_uuid[UI_FLAGS] & 8);
3627                 if (skip_initial_sync) {
3628                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3629                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3630                                         "clear_n_write from receive_uuids",
3631                                         BM_LOCKED_TEST_ALLOWED);
3632                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3633                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3634                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3635                                         CS_VERBOSE, NULL);
3636                         drbd_md_sync(mdev);
3637                         updated_uuids = 1;
3638                 }
3639                 put_ldev(mdev);
3640         } else if (mdev->state.disk < D_INCONSISTENT &&
3641                    mdev->state.role == R_PRIMARY) {
3642                 /* I am a diskless primary, the peer just created a new current UUID
3643                    for me. */
3644                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3645         }
3646
3647         /* Before we test for the disk state, we should wait until an eventually
3648            ongoing cluster wide state change is finished. That is important if
3649            we are primary and are detaching from our disk. We need to see the
3650            new disk state... */
3651         mutex_lock(mdev->state_mutex);
3652         mutex_unlock(mdev->state_mutex);
3653         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3654                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3655
3656         if (updated_uuids)
3657                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3658
3659         return 0;
3660 }
3661
3662 /**
3663  * convert_state() - Converts the peer's view of the cluster state to our point of view
3664  * @ps:         The state as seen by the peer.
3665  */
3666 static union drbd_state convert_state(union drbd_state ps)
3667 {
3668         union drbd_state ms;
3669
3670         static enum drbd_conns c_tab[] = {
3671                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3672                 [C_CONNECTED] = C_CONNECTED,
3673
3674                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3675                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3676                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3677                 [C_VERIFY_S]       = C_VERIFY_T,
3678                 [C_MASK]   = C_MASK,
3679         };
3680
3681         ms.i = ps.i;
3682
3683         ms.conn = c_tab[ps.conn];
3684         ms.peer = ps.role;
3685         ms.role = ps.peer;
3686         ms.pdsk = ps.disk;
3687         ms.disk = ps.pdsk;
3688         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3689
3690         return ms;
3691 }
3692
3693 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3694 {
3695         struct drbd_conf *mdev;
3696         struct p_req_state *p = pi->data;
3697         union drbd_state mask, val;
3698         enum drbd_state_rv rv;
3699
3700         mdev = vnr_to_mdev(tconn, pi->vnr);
3701         if (!mdev)
3702                 return -EIO;
3703
3704         mask.i = be32_to_cpu(p->mask);
3705         val.i = be32_to_cpu(p->val);
3706
3707         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3708             mutex_is_locked(mdev->state_mutex)) {
3709                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3710                 return 0;
3711         }
3712
3713         mask = convert_state(mask);
3714         val = convert_state(val);
3715
3716         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3717         drbd_send_sr_reply(mdev, rv);
3718
3719         drbd_md_sync(mdev);
3720
3721         return 0;
3722 }
3723
3724 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3725 {
3726         struct p_req_state *p = pi->data;
3727         union drbd_state mask, val;
3728         enum drbd_state_rv rv;
3729
3730         mask.i = be32_to_cpu(p->mask);
3731         val.i = be32_to_cpu(p->val);
3732
3733         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3734             mutex_is_locked(&tconn->cstate_mutex)) {
3735                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3736                 return 0;
3737         }
3738
3739         mask = convert_state(mask);
3740         val = convert_state(val);
3741
3742         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3743         conn_send_sr_reply(tconn, rv);
3744
3745         return 0;
3746 }
3747
3748 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3749 {
3750         struct drbd_conf *mdev;
3751         struct p_state *p = pi->data;
3752         union drbd_state os, ns, peer_state;
3753         enum drbd_disk_state real_peer_disk;
3754         enum chg_state_flags cs_flags;
3755         int rv;
3756
3757         mdev = vnr_to_mdev(tconn, pi->vnr);
3758         if (!mdev)
3759                 return config_unknown_volume(tconn, pi);
3760
3761         peer_state.i = be32_to_cpu(p->state);
3762
3763         real_peer_disk = peer_state.disk;
3764         if (peer_state.disk == D_NEGOTIATING) {
3765                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3766                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3767         }
3768
3769         spin_lock_irq(&mdev->tconn->req_lock);
3770  retry:
3771         os = ns = drbd_read_state(mdev);
3772         spin_unlock_irq(&mdev->tconn->req_lock);
3773
3774         /* If some other part of the code (asender thread, timeout)
3775          * already decided to close the connection again,
3776          * we must not "re-establish" it here. */
3777         if (os.conn <= C_TEAR_DOWN)
3778                 return false;
3779
3780         /* If this is the "end of sync" confirmation, usually the peer disk
3781          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3782          * set) resync started in PausedSyncT, or if the timing of pause-/
3783          * unpause-sync events has been "just right", the peer disk may
3784          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3785          */
3786         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3787             real_peer_disk == D_UP_TO_DATE &&
3788             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3789                 /* If we are (becoming) SyncSource, but peer is still in sync
3790                  * preparation, ignore its uptodate-ness to avoid flapping, it
3791                  * will change to inconsistent once the peer reaches active
3792                  * syncing states.
3793                  * It may have changed syncer-paused flags, however, so we
3794                  * cannot ignore this completely. */
3795                 if (peer_state.conn > C_CONNECTED &&
3796                     peer_state.conn < C_SYNC_SOURCE)
3797                         real_peer_disk = D_INCONSISTENT;
3798
3799                 /* if peer_state changes to connected at the same time,
3800                  * it explicitly notifies us that it finished resync.
3801                  * Maybe we should finish it up, too? */
3802                 else if (os.conn >= C_SYNC_SOURCE &&
3803                          peer_state.conn == C_CONNECTED) {
3804                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3805                                 drbd_resync_finished(mdev);
3806                         return 0;
3807                 }
3808         }
3809
3810         /* peer says his disk is inconsistent, while we think it is uptodate,
3811          * and this happens while the peer still thinks we have a sync going on,
3812          * but we think we are already done with the sync.
3813          * We ignore this to avoid flapping pdsk.
3814          * This should not happen, if the peer is a recent version of drbd. */
3815         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3816             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3817                 real_peer_disk = D_UP_TO_DATE;
3818
3819         if (ns.conn == C_WF_REPORT_PARAMS)
3820                 ns.conn = C_CONNECTED;
3821
3822         if (peer_state.conn == C_AHEAD)
3823                 ns.conn = C_BEHIND;
3824
3825         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3826             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3827                 int cr; /* consider resync */
3828
3829                 /* if we established a new connection */
3830                 cr  = (os.conn < C_CONNECTED);
3831                 /* if we had an established connection
3832                  * and one of the nodes newly attaches a disk */
3833                 cr |= (os.conn == C_CONNECTED &&
3834                        (peer_state.disk == D_NEGOTIATING ||
3835                         os.disk == D_NEGOTIATING));
3836                 /* if we have both been inconsistent, and the peer has been
3837                  * forced to be UpToDate with --overwrite-data */
3838                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3839                 /* if we had been plain connected, and the admin requested to
3840                  * start a sync by "invalidate" or "invalidate-remote" */
3841                 cr |= (os.conn == C_CONNECTED &&
3842                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3843                                  peer_state.conn <= C_WF_BITMAP_T));
3844
3845                 if (cr)
3846                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3847
3848                 put_ldev(mdev);
3849                 if (ns.conn == C_MASK) {
3850                         ns.conn = C_CONNECTED;
3851                         if (mdev->state.disk == D_NEGOTIATING) {
3852                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3853                         } else if (peer_state.disk == D_NEGOTIATING) {
3854                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3855                                 peer_state.disk = D_DISKLESS;
3856                                 real_peer_disk = D_DISKLESS;
3857                         } else {
3858                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3859                                         return -EIO;
3860                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3861                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3862                                 return -EIO;
3863                         }
3864                 }
3865         }
3866
3867         spin_lock_irq(&mdev->tconn->req_lock);
3868         if (os.i != drbd_read_state(mdev).i)
3869                 goto retry;
3870         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3871         ns.peer = peer_state.role;
3872         ns.pdsk = real_peer_disk;
3873         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3874         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3875                 ns.disk = mdev->new_state_tmp.disk;
3876         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3877         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3878             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3879                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3880                    for temporal network outages! */
3881                 spin_unlock_irq(&mdev->tconn->req_lock);
3882                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3883                 tl_clear(mdev->tconn);
3884                 drbd_uuid_new_current(mdev);
3885                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3886                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3887                 return -EIO;
3888         }
3889         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3890         ns = drbd_read_state(mdev);
3891         spin_unlock_irq(&mdev->tconn->req_lock);
3892
3893         if (rv < SS_SUCCESS) {
3894                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3895                 return -EIO;
3896         }
3897
3898         if (os.conn > C_WF_REPORT_PARAMS) {
3899                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3900                     peer_state.disk != D_NEGOTIATING ) {
3901                         /* we want resync, peer has not yet decided to sync... */
3902                         /* Nowadays only used when forcing a node into primary role and
3903                            setting its disk to UpToDate with that */
3904                         drbd_send_uuids(mdev);
3905                         drbd_send_current_state(mdev);
3906                 }
3907         }
3908
3909         clear_bit(DISCARD_MY_DATA, &mdev->flags);
3910
3911         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3912
3913         return 0;
3914 }
3915
3916 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3917 {
3918         struct drbd_conf *mdev;
3919         struct p_rs_uuid *p = pi->data;
3920
3921         mdev = vnr_to_mdev(tconn, pi->vnr);
3922         if (!mdev)
3923                 return -EIO;
3924
3925         wait_event(mdev->misc_wait,
3926                    mdev->state.conn == C_WF_SYNC_UUID ||
3927                    mdev->state.conn == C_BEHIND ||
3928                    mdev->state.conn < C_CONNECTED ||
3929                    mdev->state.disk < D_NEGOTIATING);
3930
3931         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3932
3933         /* Here the _drbd_uuid_ functions are right, current should
3934            _not_ be rotated into the history */
3935         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3936                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3937                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3938
3939                 drbd_print_uuids(mdev, "updated sync uuid");
3940                 drbd_start_resync(mdev, C_SYNC_TARGET);
3941
3942                 put_ldev(mdev);
3943         } else
3944                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3945
3946         return 0;
3947 }
3948
3949 /**
3950  * receive_bitmap_plain
3951  *
3952  * Return 0 when done, 1 when another iteration is needed, and a negative error
3953  * code upon failure.
3954  */
3955 static int
3956 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3957                      unsigned long *p, struct bm_xfer_ctx *c)
3958 {
3959         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3960                                  drbd_header_size(mdev->tconn);
3961         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3962                                        c->bm_words - c->word_offset);
3963         unsigned int want = num_words * sizeof(*p);
3964         int err;
3965
3966         if (want != size) {
3967                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3968                 return -EIO;
3969         }
3970         if (want == 0)
3971                 return 0;
3972         err = drbd_recv_all(mdev->tconn, p, want);
3973         if (err)
3974                 return err;
3975
3976         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3977
3978         c->word_offset += num_words;
3979         c->bit_offset = c->word_offset * BITS_PER_LONG;
3980         if (c->bit_offset > c->bm_bits)
3981                 c->bit_offset = c->bm_bits;
3982
3983         return 1;
3984 }
3985
3986 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3987 {
3988         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3989 }
3990
3991 static int dcbp_get_start(struct p_compressed_bm *p)
3992 {
3993         return (p->encoding & 0x80) != 0;
3994 }
3995
3996 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3997 {
3998         return (p->encoding >> 4) & 0x7;
3999 }
4000
4001 /**
4002  * recv_bm_rle_bits
4003  *
4004  * Return 0 when done, 1 when another iteration is needed, and a negative error
4005  * code upon failure.
4006  */
4007 static int
4008 recv_bm_rle_bits(struct drbd_conf *mdev,
4009                 struct p_compressed_bm *p,
4010                  struct bm_xfer_ctx *c,
4011                  unsigned int len)
4012 {
4013         struct bitstream bs;
4014         u64 look_ahead;
4015         u64 rl;
4016         u64 tmp;
4017         unsigned long s = c->bit_offset;
4018         unsigned long e;
4019         int toggle = dcbp_get_start(p);
4020         int have;
4021         int bits;
4022
4023         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4024
4025         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4026         if (bits < 0)
4027                 return -EIO;
4028
4029         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4030                 bits = vli_decode_bits(&rl, look_ahead);
4031                 if (bits <= 0)
4032                         return -EIO;
4033
4034                 if (toggle) {
4035                         e = s + rl -1;
4036                         if (e >= c->bm_bits) {
4037                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4038                                 return -EIO;
4039                         }
4040                         _drbd_bm_set_bits(mdev, s, e);
4041                 }
4042
4043                 if (have < bits) {
4044                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4045                                 have, bits, look_ahead,
4046                                 (unsigned int)(bs.cur.b - p->code),
4047                                 (unsigned int)bs.buf_len);
4048                         return -EIO;
4049                 }
4050                 look_ahead >>= bits;
4051                 have -= bits;
4052
4053                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4054                 if (bits < 0)
4055                         return -EIO;
4056                 look_ahead |= tmp << have;
4057                 have += bits;
4058         }
4059
4060         c->bit_offset = s;
4061         bm_xfer_ctx_bit_to_word_offset(c);
4062
4063         return (s != c->bm_bits);
4064 }
4065
4066 /**
4067  * decode_bitmap_c
4068  *
4069  * Return 0 when done, 1 when another iteration is needed, and a negative error
4070  * code upon failure.
4071  */
4072 static int
4073 decode_bitmap_c(struct drbd_conf *mdev,
4074                 struct p_compressed_bm *p,
4075                 struct bm_xfer_ctx *c,
4076                 unsigned int len)
4077 {
4078         if (dcbp_get_code(p) == RLE_VLI_Bits)
4079                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4080
4081         /* other variants had been implemented for evaluation,
4082          * but have been dropped as this one turned out to be "best"
4083          * during all our tests. */
4084
4085         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4086         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4087         return -EIO;
4088 }
4089
4090 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4091                 const char *direction, struct bm_xfer_ctx *c)
4092 {
4093         /* what would it take to transfer it "plaintext" */
4094         unsigned int header_size = drbd_header_size(mdev->tconn);
4095         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4096         unsigned int plain =
4097                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4098                 c->bm_words * sizeof(unsigned long);
4099         unsigned int total = c->bytes[0] + c->bytes[1];
4100         unsigned int r;
4101
4102         /* total can not be zero. but just in case: */
4103         if (total == 0)
4104                 return;
4105
4106         /* don't report if not compressed */
4107         if (total >= plain)
4108                 return;
4109
4110         /* total < plain. check for overflow, still */
4111         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4112                                     : (1000 * total / plain);
4113
4114         if (r > 1000)
4115                 r = 1000;
4116
4117         r = 1000 - r;
4118         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4119              "total %u; compression: %u.%u%%\n",
4120                         direction,
4121                         c->bytes[1], c->packets[1],
4122                         c->bytes[0], c->packets[0],
4123                         total, r/10, r % 10);
4124 }
4125
4126 /* Since we are processing the bitfield from lower addresses to higher,
4127    it does not matter if the process it in 32 bit chunks or 64 bit
4128    chunks as long as it is little endian. (Understand it as byte stream,
4129    beginning with the lowest byte...) If we would use big endian
4130    we would need to process it from the highest address to the lowest,
4131    in order to be agnostic to the 32 vs 64 bits issue.
4132
4133    returns 0 on failure, 1 if we successfully received it. */
4134 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4135 {
4136         struct drbd_conf *mdev;
4137         struct bm_xfer_ctx c;
4138         int err;
4139
4140         mdev = vnr_to_mdev(tconn, pi->vnr);
4141         if (!mdev)
4142                 return -EIO;
4143
4144         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4145         /* you are supposed to send additional out-of-sync information
4146          * if you actually set bits during this phase */
4147
4148         c = (struct bm_xfer_ctx) {
4149                 .bm_bits = drbd_bm_bits(mdev),
4150                 .bm_words = drbd_bm_words(mdev),
4151         };
4152
4153         for(;;) {
4154                 if (pi->cmd == P_BITMAP)
4155                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4156                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4157                         /* MAYBE: sanity check that we speak proto >= 90,
4158                          * and the feature is enabled! */
4159                         struct p_compressed_bm *p = pi->data;
4160
4161                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4162                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4163                                 err = -EIO;
4164                                 goto out;
4165                         }
4166                         if (pi->size <= sizeof(*p)) {
4167                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4168                                 err = -EIO;
4169                                 goto out;
4170                         }
4171                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4172                         if (err)
4173                                goto out;
4174                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4175                 } else {
4176                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4177                         err = -EIO;
4178                         goto out;
4179                 }
4180
4181                 c.packets[pi->cmd == P_BITMAP]++;
4182                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4183
4184                 if (err <= 0) {
4185                         if (err < 0)
4186                                 goto out;
4187                         break;
4188                 }
4189                 err = drbd_recv_header(mdev->tconn, pi);
4190                 if (err)
4191                         goto out;
4192         }
4193
4194         INFO_bm_xfer_stats(mdev, "receive", &c);
4195
4196         if (mdev->state.conn == C_WF_BITMAP_T) {
4197                 enum drbd_state_rv rv;
4198
4199                 err = drbd_send_bitmap(mdev);
4200                 if (err)
4201                         goto out;
4202                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4203                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4204                 D_ASSERT(rv == SS_SUCCESS);
4205         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4206                 /* admin may have requested C_DISCONNECTING,
4207                  * other threads may have noticed network errors */
4208                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4209                     drbd_conn_str(mdev->state.conn));
4210         }
4211         err = 0;
4212
4213  out:
4214         drbd_bm_unlock(mdev);
4215         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4216                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4217         return err;
4218 }
4219
4220 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4221 {
4222         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4223                  pi->cmd, pi->size);
4224
4225         return ignore_remaining_packet(tconn, pi);
4226 }
4227
4228 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4229 {
4230         /* Make sure we've acked all the TCP data associated
4231          * with the data requests being unplugged */
4232         drbd_tcp_quickack(tconn->data.socket);
4233
4234         return 0;
4235 }
4236
4237 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4238 {
4239         struct drbd_conf *mdev;
4240         struct p_block_desc *p = pi->data;
4241
4242         mdev = vnr_to_mdev(tconn, pi->vnr);
4243         if (!mdev)
4244                 return -EIO;
4245
4246         switch (mdev->state.conn) {
4247         case C_WF_SYNC_UUID:
4248         case C_WF_BITMAP_T:
4249         case C_BEHIND:
4250                         break;
4251         default:
4252                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4253                                 drbd_conn_str(mdev->state.conn));
4254         }
4255
4256         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4257
4258         return 0;
4259 }
4260
4261 struct data_cmd {
4262         int expect_payload;
4263         size_t pkt_size;
4264         int (*fn)(struct drbd_tconn *, struct packet_info *);
4265 };
4266
4267 static struct data_cmd drbd_cmd_handler[] = {
4268         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4269         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4270         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4271         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4272         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4273         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4274         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4275         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4276         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4277         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4278         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4279         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4280         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4281         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4282         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4283         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4284         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4285         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4286         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4287         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4288         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4289         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4290         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4291         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4292 };
4293
4294 static void drbdd(struct drbd_tconn *tconn)
4295 {
4296         struct packet_info pi;
4297         size_t shs; /* sub header size */
4298         int err;
4299
4300         while (get_t_state(&tconn->receiver) == RUNNING) {
4301                 struct data_cmd *cmd;
4302
4303                 drbd_thread_current_set_cpu(&tconn->receiver);
4304                 if (drbd_recv_header(tconn, &pi))
4305                         goto err_out;
4306
4307                 cmd = &drbd_cmd_handler[pi.cmd];
4308                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4309                         conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4310                                  cmdname(pi.cmd), pi.cmd);
4311                         goto err_out;
4312                 }
4313
4314                 shs = cmd->pkt_size;
4315                 if (pi.size > shs && !cmd->expect_payload) {
4316                         conn_err(tconn, "No payload expected %s l:%d\n",
4317                                  cmdname(pi.cmd), pi.size);
4318                         goto err_out;
4319                 }
4320
4321                 if (shs) {
4322                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4323                         if (err)
4324                                 goto err_out;
4325                         pi.size -= shs;
4326                 }
4327
4328                 err = cmd->fn(tconn, &pi);
4329                 if (err) {
4330                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4331                                  cmdname(pi.cmd), err, pi.size);
4332                         goto err_out;
4333                 }
4334         }
4335         return;
4336
4337     err_out:
4338         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4339 }
4340
4341 void conn_flush_workqueue(struct drbd_tconn *tconn)
4342 {
4343         struct drbd_wq_barrier barr;
4344
4345         barr.w.cb = w_prev_work_done;
4346         barr.w.tconn = tconn;
4347         init_completion(&barr.done);
4348         drbd_queue_work(&tconn->data.work, &barr.w);
4349         wait_for_completion(&barr.done);
4350 }
4351
4352 static void conn_disconnect(struct drbd_tconn *tconn)
4353 {
4354         struct drbd_conf *mdev;
4355         enum drbd_conns oc;
4356         int vnr;
4357
4358         if (tconn->cstate == C_STANDALONE)
4359                 return;
4360
4361         /* We are about to start the cleanup after connection loss.
4362          * Make sure drbd_make_request knows about that.
4363          * Usually we should be in some network failure state already,
4364          * but just in case we are not, we fix it up here.
4365          */
4366         conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4367
4368         /* asender does not clean up anything. it must not interfere, either */
4369         drbd_thread_stop(&tconn->asender);
4370         drbd_free_sock(tconn);
4371
4372         rcu_read_lock();
4373         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4374                 kref_get(&mdev->kref);
4375                 rcu_read_unlock();
4376                 drbd_disconnected(mdev);
4377                 kref_put(&mdev->kref, &drbd_minor_destroy);
4378                 rcu_read_lock();
4379         }
4380         rcu_read_unlock();
4381
4382         if (!list_empty(&tconn->current_epoch->list))
4383                 conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4384         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4385         atomic_set(&tconn->current_epoch->epoch_size, 0);
4386
4387         conn_info(tconn, "Connection closed\n");
4388
4389         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4390                 conn_try_outdate_peer_async(tconn);
4391
4392         spin_lock_irq(&tconn->req_lock);
4393         oc = tconn->cstate;
4394         if (oc >= C_UNCONNECTED)
4395                 _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4396
4397         spin_unlock_irq(&tconn->req_lock);
4398
4399         if (oc == C_DISCONNECTING)
4400                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4401 }
4402
4403 static int drbd_disconnected(struct drbd_conf *mdev)
4404 {
4405         unsigned int i;
4406
4407         /* wait for current activity to cease. */
4408         spin_lock_irq(&mdev->tconn->req_lock);
4409         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4410         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4411         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4412         spin_unlock_irq(&mdev->tconn->req_lock);
4413
4414         /* We do not have data structures that would allow us to
4415          * get the rs_pending_cnt down to 0 again.
4416          *  * On C_SYNC_TARGET we do not have any data structures describing
4417          *    the pending RSDataRequest's we have sent.
4418          *  * On C_SYNC_SOURCE there is no data structure that tracks
4419          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4420          *  And no, it is not the sum of the reference counts in the
4421          *  resync_LRU. The resync_LRU tracks the whole operation including
4422          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4423          *  on the fly. */
4424         drbd_rs_cancel_all(mdev);
4425         mdev->rs_total = 0;
4426         mdev->rs_failed = 0;
4427         atomic_set(&mdev->rs_pending_cnt, 0);
4428         wake_up(&mdev->misc_wait);
4429
4430         del_timer_sync(&mdev->resync_timer);
4431         resync_timer_fn((unsigned long)mdev);
4432
4433         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4434          * w_make_resync_request etc. which may still be on the worker queue
4435          * to be "canceled" */
4436         drbd_flush_workqueue(mdev);
4437
4438         drbd_finish_peer_reqs(mdev);
4439
4440         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4441            might have issued a work again. The one before drbd_finish_peer_reqs() is
4442            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4443         drbd_flush_workqueue(mdev);
4444
4445         kfree(mdev->p_uuid);
4446         mdev->p_uuid = NULL;
4447
4448         if (!drbd_suspended(mdev))
4449                 tl_clear(mdev->tconn);
4450
4451         drbd_md_sync(mdev);
4452
4453         /* serialize with bitmap writeout triggered by the state change,
4454          * if any. */
4455         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4456
4457         /* tcp_close and release of sendpage pages can be deferred.  I don't
4458          * want to use SO_LINGER, because apparently it can be deferred for
4459          * more than 20 seconds (longest time I checked).
4460          *
4461          * Actually we don't care for exactly when the network stack does its
4462          * put_page(), but release our reference on these pages right here.
4463          */
4464         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4465         if (i)
4466                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4467         i = atomic_read(&mdev->pp_in_use_by_net);
4468         if (i)
4469                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4470         i = atomic_read(&mdev->pp_in_use);
4471         if (i)
4472                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4473
4474         D_ASSERT(list_empty(&mdev->read_ee));
4475         D_ASSERT(list_empty(&mdev->active_ee));
4476         D_ASSERT(list_empty(&mdev->sync_ee));
4477         D_ASSERT(list_empty(&mdev->done_ee));
4478
4479         return 0;
4480 }
4481
4482 /*
4483  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4484  * we can agree on is stored in agreed_pro_version.
4485  *
4486  * feature flags and the reserved array should be enough room for future
4487  * enhancements of the handshake protocol, and possible plugins...
4488  *
4489  * for now, they are expected to be zero, but ignored.
4490  */
4491 static int drbd_send_features(struct drbd_tconn *tconn)
4492 {
4493         struct drbd_socket *sock;
4494         struct p_connection_features *p;
4495
4496         sock = &tconn->data;
4497         p = conn_prepare_command(tconn, sock);
4498         if (!p)
4499                 return -EIO;
4500         memset(p, 0, sizeof(*p));
4501         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4502         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4503         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4504 }
4505
4506 /*
4507  * return values:
4508  *   1 yes, we have a valid connection
4509  *   0 oops, did not work out, please try again
4510  *  -1 peer talks different language,
4511  *     no point in trying again, please go standalone.
4512  */
4513 static int drbd_do_features(struct drbd_tconn *tconn)
4514 {
4515         /* ASSERT current == tconn->receiver ... */
4516         struct p_connection_features *p;
4517         const int expect = sizeof(struct p_connection_features);
4518         struct packet_info pi;
4519         int err;
4520
4521         err = drbd_send_features(tconn);
4522         if (err)
4523                 return 0;
4524
4525         err = drbd_recv_header(tconn, &pi);
4526         if (err)
4527                 return 0;
4528
4529         if (pi.cmd != P_CONNECTION_FEATURES) {
4530                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4531                          cmdname(pi.cmd), pi.cmd);
4532                 return -1;
4533         }
4534
4535         if (pi.size != expect) {
4536                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4537                      expect, pi.size);
4538                 return -1;
4539         }
4540
4541         p = pi.data;
4542         err = drbd_recv_all_warn(tconn, p, expect);
4543         if (err)
4544                 return 0;
4545
4546         p->protocol_min = be32_to_cpu(p->protocol_min);
4547         p->protocol_max = be32_to_cpu(p->protocol_max);
4548         if (p->protocol_max == 0)
4549                 p->protocol_max = p->protocol_min;
4550
4551         if (PRO_VERSION_MAX < p->protocol_min ||
4552             PRO_VERSION_MIN > p->protocol_max)
4553                 goto incompat;
4554
4555         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4556
4557         conn_info(tconn, "Handshake successful: "
4558              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4559
4560         return 1;
4561
4562  incompat:
4563         conn_err(tconn, "incompatible DRBD dialects: "
4564             "I support %d-%d, peer supports %d-%d\n",
4565             PRO_VERSION_MIN, PRO_VERSION_MAX,
4566             p->protocol_min, p->protocol_max);
4567         return -1;
4568 }
4569
4570 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4571 static int drbd_do_auth(struct drbd_tconn *tconn)
4572 {
4573         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4574         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4575         return -1;
4576 }
4577 #else
4578 #define CHALLENGE_LEN 64
4579
4580 /* Return value:
4581         1 - auth succeeded,
4582         0 - failed, try again (network error),
4583         -1 - auth failed, don't try again.
4584 */
4585
4586 static int drbd_do_auth(struct drbd_tconn *tconn)
4587 {
4588         struct drbd_socket *sock;
4589         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4590         struct scatterlist sg;
4591         char *response = NULL;
4592         char *right_response = NULL;
4593         char *peers_ch = NULL;
4594         unsigned int key_len;
4595         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4596         unsigned int resp_size;
4597         struct hash_desc desc;
4598         struct packet_info pi;
4599         struct net_conf *nc;
4600         int err, rv;
4601
4602         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4603
4604         rcu_read_lock();
4605         nc = rcu_dereference(tconn->net_conf);
4606         key_len = strlen(nc->shared_secret);
4607         memcpy(secret, nc->shared_secret, key_len);
4608         rcu_read_unlock();
4609
4610         desc.tfm = tconn->cram_hmac_tfm;
4611         desc.flags = 0;
4612
4613         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4614         if (rv) {
4615                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4616                 rv = -1;
4617                 goto fail;
4618         }
4619
4620         get_random_bytes(my_challenge, CHALLENGE_LEN);
4621
4622         sock = &tconn->data;
4623         if (!conn_prepare_command(tconn, sock)) {
4624                 rv = 0;
4625                 goto fail;
4626         }
4627         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4628                                 my_challenge, CHALLENGE_LEN);
4629         if (!rv)
4630                 goto fail;
4631
4632         err = drbd_recv_header(tconn, &pi);
4633         if (err) {
4634                 rv = 0;
4635                 goto fail;
4636         }
4637
4638         if (pi.cmd != P_AUTH_CHALLENGE) {
4639                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4640                          cmdname(pi.cmd), pi.cmd);
4641                 rv = 0;
4642                 goto fail;
4643         }
4644
4645         if (pi.size > CHALLENGE_LEN * 2) {
4646                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4647                 rv = -1;
4648                 goto fail;
4649         }
4650
4651         peers_ch = kmalloc(pi.size, GFP_NOIO);
4652         if (peers_ch == NULL) {
4653                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4654                 rv = -1;
4655                 goto fail;
4656         }
4657
4658         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4659         if (err) {
4660                 rv = 0;
4661                 goto fail;
4662         }
4663
4664         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4665         response = kmalloc(resp_size, GFP_NOIO);
4666         if (response == NULL) {
4667                 conn_err(tconn, "kmalloc of response failed\n");
4668                 rv = -1;
4669                 goto fail;
4670         }
4671
4672         sg_init_table(&sg, 1);
4673         sg_set_buf(&sg, peers_ch, pi.size);
4674
4675         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4676         if (rv) {
4677                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4678                 rv = -1;
4679                 goto fail;
4680         }
4681
4682         if (!conn_prepare_command(tconn, sock)) {
4683                 rv = 0;
4684                 goto fail;
4685         }
4686         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4687                                 response, resp_size);
4688         if (!rv)
4689                 goto fail;
4690
4691         err = drbd_recv_header(tconn, &pi);
4692         if (err) {
4693                 rv = 0;
4694                 goto fail;
4695         }
4696
4697         if (pi.cmd != P_AUTH_RESPONSE) {
4698                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4699                          cmdname(pi.cmd), pi.cmd);
4700                 rv = 0;
4701                 goto fail;
4702         }
4703
4704         if (pi.size != resp_size) {
4705                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4706                 rv = 0;
4707                 goto fail;
4708         }
4709
4710         err = drbd_recv_all_warn(tconn, response , resp_size);
4711         if (err) {
4712                 rv = 0;
4713                 goto fail;
4714         }
4715
4716         right_response = kmalloc(resp_size, GFP_NOIO);
4717         if (right_response == NULL) {
4718                 conn_err(tconn, "kmalloc of right_response failed\n");
4719                 rv = -1;
4720                 goto fail;
4721         }
4722
4723         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4724
4725         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4726         if (rv) {
4727                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4728                 rv = -1;
4729                 goto fail;
4730         }
4731
4732         rv = !memcmp(response, right_response, resp_size);
4733
4734         if (rv)
4735                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4736                      resp_size);
4737         else
4738                 rv = -1;
4739
4740  fail:
4741         kfree(peers_ch);
4742         kfree(response);
4743         kfree(right_response);
4744
4745         return rv;
4746 }
4747 #endif
4748
4749 int drbdd_init(struct drbd_thread *thi)
4750 {
4751         struct drbd_tconn *tconn = thi->tconn;
4752         int h;
4753
4754         conn_info(tconn, "receiver (re)started\n");
4755
4756         do {
4757                 h = conn_connect(tconn);
4758                 if (h == 0) {
4759                         conn_disconnect(tconn);
4760                         schedule_timeout_interruptible(HZ);
4761                 }
4762                 if (h == -1) {
4763                         conn_warn(tconn, "Discarding network configuration.\n");
4764                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4765                 }
4766         } while (h == 0);
4767
4768         if (h > 0)
4769                 drbdd(tconn);
4770
4771         conn_disconnect(tconn);
4772
4773         conn_info(tconn, "receiver terminated\n");
4774         return 0;
4775 }
4776
4777 /* ********* acknowledge sender ******** */
4778
4779 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4780 {
4781         struct p_req_state_reply *p = pi->data;
4782         int retcode = be32_to_cpu(p->retcode);
4783
4784         if (retcode >= SS_SUCCESS) {
4785                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4786         } else {
4787                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4788                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4789                          drbd_set_st_err_str(retcode), retcode);
4790         }
4791         wake_up(&tconn->ping_wait);
4792
4793         return 0;
4794 }
4795
4796 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4797 {
4798         struct drbd_conf *mdev;
4799         struct p_req_state_reply *p = pi->data;
4800         int retcode = be32_to_cpu(p->retcode);
4801
4802         mdev = vnr_to_mdev(tconn, pi->vnr);
4803         if (!mdev)
4804                 return -EIO;
4805
4806         if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4807                 D_ASSERT(tconn->agreed_pro_version < 100);
4808                 return got_conn_RqSReply(tconn, pi);
4809         }
4810
4811         if (retcode >= SS_SUCCESS) {
4812                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4813         } else {
4814                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4815                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4816                         drbd_set_st_err_str(retcode), retcode);
4817         }
4818         wake_up(&mdev->state_wait);
4819
4820         return 0;
4821 }
4822
4823 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4824 {
4825         return drbd_send_ping_ack(tconn);
4826
4827 }
4828
4829 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4830 {
4831         /* restore idle timeout */
4832         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4833         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4834                 wake_up(&tconn->ping_wait);
4835
4836         return 0;
4837 }
4838
4839 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4840 {
4841         struct drbd_conf *mdev;
4842         struct p_block_ack *p = pi->data;
4843         sector_t sector = be64_to_cpu(p->sector);
4844         int blksize = be32_to_cpu(p->blksize);
4845
4846         mdev = vnr_to_mdev(tconn, pi->vnr);
4847         if (!mdev)
4848                 return -EIO;
4849
4850         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4851
4852         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4853
4854         if (get_ldev(mdev)) {
4855                 drbd_rs_complete_io(mdev, sector);
4856                 drbd_set_in_sync(mdev, sector, blksize);
4857                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4858                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4859                 put_ldev(mdev);
4860         }
4861         dec_rs_pending(mdev);
4862         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4863
4864         return 0;
4865 }
4866
4867 static int
4868 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4869                               struct rb_root *root, const char *func,
4870                               enum drbd_req_event what, bool missing_ok)
4871 {
4872         struct drbd_request *req;
4873         struct bio_and_error m;
4874
4875         spin_lock_irq(&mdev->tconn->req_lock);
4876         req = find_request(mdev, root, id, sector, missing_ok, func);
4877         if (unlikely(!req)) {
4878                 spin_unlock_irq(&mdev->tconn->req_lock);
4879                 return -EIO;
4880         }
4881         __req_mod(req, what, &m);
4882         spin_unlock_irq(&mdev->tconn->req_lock);
4883
4884         if (m.bio)
4885                 complete_master_bio(mdev, &m);
4886         return 0;
4887 }
4888
4889 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4890 {
4891         struct drbd_conf *mdev;
4892         struct p_block_ack *p = pi->data;
4893         sector_t sector = be64_to_cpu(p->sector);
4894         int blksize = be32_to_cpu(p->blksize);
4895         enum drbd_req_event what;
4896
4897         mdev = vnr_to_mdev(tconn, pi->vnr);
4898         if (!mdev)
4899                 return -EIO;
4900
4901         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4902
4903         if (p->block_id == ID_SYNCER) {
4904                 drbd_set_in_sync(mdev, sector, blksize);
4905                 dec_rs_pending(mdev);
4906                 return 0;
4907         }
4908         switch (pi->cmd) {
4909         case P_RS_WRITE_ACK:
4910                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4911                 break;
4912         case P_WRITE_ACK:
4913                 what = WRITE_ACKED_BY_PEER;
4914                 break;
4915         case P_RECV_ACK:
4916                 what = RECV_ACKED_BY_PEER;
4917                 break;
4918         case P_DISCARD_WRITE:
4919                 what = DISCARD_WRITE;
4920                 break;
4921         case P_RETRY_WRITE:
4922                 what = POSTPONE_WRITE;
4923                 break;
4924         default:
4925                 BUG();
4926         }
4927
4928         return validate_req_change_req_state(mdev, p->block_id, sector,
4929                                              &mdev->write_requests, __func__,
4930                                              what, false);
4931 }
4932
4933 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4934 {
4935         struct drbd_conf *mdev;
4936         struct p_block_ack *p = pi->data;
4937         sector_t sector = be64_to_cpu(p->sector);
4938         int size = be32_to_cpu(p->blksize);
4939         int err;
4940
4941         mdev = vnr_to_mdev(tconn, pi->vnr);
4942         if (!mdev)
4943                 return -EIO;
4944
4945         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4946
4947         if (p->block_id == ID_SYNCER) {
4948                 dec_rs_pending(mdev);
4949                 drbd_rs_failed_io(mdev, sector, size);
4950                 return 0;
4951         }
4952
4953         err = validate_req_change_req_state(mdev, p->block_id, sector,
4954                                             &mdev->write_requests, __func__,
4955                                             NEG_ACKED, true);
4956         if (err) {
4957                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4958                    The master bio might already be completed, therefore the
4959                    request is no longer in the collision hash. */
4960                 /* In Protocol B we might already have got a P_RECV_ACK
4961                    but then get a P_NEG_ACK afterwards. */
4962                 drbd_set_out_of_sync(mdev, sector, size);
4963         }
4964         return 0;
4965 }
4966
4967 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4968 {
4969         struct drbd_conf *mdev;
4970         struct p_block_ack *p = pi->data;
4971         sector_t sector = be64_to_cpu(p->sector);
4972
4973         mdev = vnr_to_mdev(tconn, pi->vnr);
4974         if (!mdev)
4975                 return -EIO;
4976
4977         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4978
4979         dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
4980             (unsigned long long)sector, be32_to_cpu(p->blksize));
4981
4982         return validate_req_change_req_state(mdev, p->block_id, sector,
4983                                              &mdev->read_requests, __func__,
4984                                              NEG_ACKED, false);
4985 }
4986
4987 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4988 {
4989         struct drbd_conf *mdev;
4990         sector_t sector;
4991         int size;
4992         struct p_block_ack *p = pi->data;
4993
4994         mdev = vnr_to_mdev(tconn, pi->vnr);
4995         if (!mdev)
4996                 return -EIO;
4997
4998         sector = be64_to_cpu(p->sector);
4999         size = be32_to_cpu(p->blksize);
5000
5001         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5002
5003         dec_rs_pending(mdev);
5004
5005         if (get_ldev_if_state(mdev, D_FAILED)) {
5006                 drbd_rs_complete_io(mdev, sector);
5007                 switch (pi->cmd) {
5008                 case P_NEG_RS_DREPLY:
5009                         drbd_rs_failed_io(mdev, sector, size);
5010                 case P_RS_CANCEL:
5011                         break;
5012                 default:
5013                         BUG();
5014                 }
5015                 put_ldev(mdev);
5016         }
5017
5018         return 0;
5019 }
5020
5021 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5022 {
5023         struct p_barrier_ack *p = pi->data;
5024         struct drbd_conf *mdev;
5025         int vnr;
5026
5027         tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5028
5029         rcu_read_lock();
5030         idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5031                 if (mdev->state.conn == C_AHEAD &&
5032                     atomic_read(&mdev->ap_in_flight) == 0 &&
5033                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5034                         mdev->start_resync_timer.expires = jiffies + HZ;
5035                         add_timer(&mdev->start_resync_timer);
5036                 }
5037         }
5038         rcu_read_unlock();
5039
5040         return 0;
5041 }
5042
5043 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5044 {
5045         struct drbd_conf *mdev;
5046         struct p_block_ack *p = pi->data;
5047         struct drbd_work *w;
5048         sector_t sector;
5049         int size;
5050
5051         mdev = vnr_to_mdev(tconn, pi->vnr);
5052         if (!mdev)
5053                 return -EIO;
5054
5055         sector = be64_to_cpu(p->sector);
5056         size = be32_to_cpu(p->blksize);
5057
5058         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5059
5060         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5061                 drbd_ov_out_of_sync_found(mdev, sector, size);
5062         else
5063                 ov_out_of_sync_print(mdev);
5064
5065         if (!get_ldev(mdev))
5066                 return 0;
5067
5068         drbd_rs_complete_io(mdev, sector);
5069         dec_rs_pending(mdev);
5070
5071         --mdev->ov_left;
5072
5073         /* let's advance progress step marks only for every other megabyte */
5074         if ((mdev->ov_left & 0x200) == 0x200)
5075                 drbd_advance_rs_marks(mdev, mdev->ov_left);
5076
5077         if (mdev->ov_left == 0) {
5078                 w = kmalloc(sizeof(*w), GFP_NOIO);
5079                 if (w) {
5080                         w->cb = w_ov_finished;
5081                         w->mdev = mdev;
5082                         drbd_queue_work_front(&mdev->tconn->data.work, w);
5083                 } else {
5084                         dev_err(DEV, "kmalloc(w) failed.");
5085                         ov_out_of_sync_print(mdev);
5086                         drbd_resync_finished(mdev);
5087                 }
5088         }
5089         put_ldev(mdev);
5090         return 0;
5091 }
5092
5093 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5094 {
5095         return 0;
5096 }
5097
5098 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5099 {
5100         struct drbd_conf *mdev;
5101         int vnr, not_empty = 0;
5102
5103         do {
5104                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5105                 flush_signals(current);
5106
5107                 rcu_read_lock();
5108                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5109                         kref_get(&mdev->kref);
5110                         rcu_read_unlock();
5111                         if (drbd_finish_peer_reqs(mdev)) {
5112                                 kref_put(&mdev->kref, &drbd_minor_destroy);
5113                                 return 1;
5114                         }
5115                         kref_put(&mdev->kref, &drbd_minor_destroy);
5116                         rcu_read_lock();
5117                 }
5118                 set_bit(SIGNAL_ASENDER, &tconn->flags);
5119
5120                 spin_lock_irq(&tconn->req_lock);
5121                 idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5122                         not_empty = !list_empty(&mdev->done_ee);
5123                         if (not_empty)
5124                                 break;
5125                 }
5126                 spin_unlock_irq(&tconn->req_lock);
5127                 rcu_read_unlock();
5128         } while (not_empty);
5129
5130         return 0;
5131 }
5132
5133 struct asender_cmd {
5134         size_t pkt_size;
5135         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5136 };
5137
5138 static struct asender_cmd asender_tbl[] = {
5139         [P_PING]            = { 0, got_Ping },
5140         [P_PING_ACK]        = { 0, got_PingAck },
5141         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5142         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5143         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5144         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
5145         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5146         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5147         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5148         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5149         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5150         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5151         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5152         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5153         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5154         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5155         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5156 };
5157
5158 int drbd_asender(struct drbd_thread *thi)
5159 {
5160         struct drbd_tconn *tconn = thi->tconn;
5161         struct asender_cmd *cmd = NULL;
5162         struct packet_info pi;
5163         int rv;
5164         void *buf    = tconn->meta.rbuf;
5165         int received = 0;
5166         unsigned int header_size = drbd_header_size(tconn);
5167         int expect   = header_size;
5168         bool ping_timeout_active = false;
5169         struct net_conf *nc;
5170         int ping_timeo, tcp_cork, ping_int;
5171
5172         current->policy = SCHED_RR;  /* Make this a realtime task! */
5173         current->rt_priority = 2;    /* more important than all other tasks */
5174
5175         while (get_t_state(thi) == RUNNING) {
5176                 drbd_thread_current_set_cpu(thi);
5177
5178                 rcu_read_lock();
5179                 nc = rcu_dereference(tconn->net_conf);
5180                 ping_timeo = nc->ping_timeo;
5181                 tcp_cork = nc->tcp_cork;
5182                 ping_int = nc->ping_int;
5183                 rcu_read_unlock();
5184
5185                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5186                         if (drbd_send_ping(tconn)) {
5187                                 conn_err(tconn, "drbd_send_ping has failed\n");
5188                                 goto reconnect;
5189                         }
5190                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5191                         ping_timeout_active = true;
5192                 }
5193
5194                 /* TODO: conditionally cork; it may hurt latency if we cork without
5195                    much to send */
5196                 if (tcp_cork)
5197                         drbd_tcp_cork(tconn->meta.socket);
5198                 if (tconn_finish_peer_reqs(tconn)) {
5199                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5200                         goto reconnect;
5201                 }
5202                 /* but unconditionally uncork unless disabled */
5203                 if (tcp_cork)
5204                         drbd_tcp_uncork(tconn->meta.socket);
5205
5206                 /* short circuit, recv_msg would return EINTR anyways. */
5207                 if (signal_pending(current))
5208                         continue;
5209
5210                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5211                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5212
5213                 flush_signals(current);
5214
5215                 /* Note:
5216                  * -EINTR        (on meta) we got a signal
5217                  * -EAGAIN       (on meta) rcvtimeo expired
5218                  * -ECONNRESET   other side closed the connection
5219                  * -ERESTARTSYS  (on data) we got a signal
5220                  * rv <  0       other than above: unexpected error!
5221                  * rv == expected: full header or command
5222                  * rv <  expected: "woken" by signal during receive
5223                  * rv == 0       : "connection shut down by peer"
5224                  */
5225                 if (likely(rv > 0)) {
5226                         received += rv;
5227                         buf      += rv;
5228                 } else if (rv == 0) {
5229                         conn_err(tconn, "meta connection shut down by peer.\n");
5230                         goto reconnect;
5231                 } else if (rv == -EAGAIN) {
5232                         /* If the data socket received something meanwhile,
5233                          * that is good enough: peer is still alive. */
5234                         if (time_after(tconn->last_received,
5235                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5236                                 continue;
5237                         if (ping_timeout_active) {
5238                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5239                                 goto reconnect;
5240                         }
5241                         set_bit(SEND_PING, &tconn->flags);
5242                         continue;
5243                 } else if (rv == -EINTR) {
5244                         continue;
5245                 } else {
5246                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5247                         goto reconnect;
5248                 }
5249
5250                 if (received == expect && cmd == NULL) {
5251                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5252                                 goto reconnect;
5253                         cmd = &asender_tbl[pi.cmd];
5254                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5255                                 conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5256                                          cmdname(pi.cmd), pi.cmd);
5257                                 goto disconnect;
5258                         }
5259                         expect = header_size + cmd->pkt_size;
5260                         if (pi.size != expect - header_size) {
5261                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5262                                         pi.cmd, pi.size);
5263                                 goto reconnect;
5264                         }
5265                 }
5266                 if (received == expect) {
5267                         bool err;
5268
5269                         err = cmd->fn(tconn, &pi);
5270                         if (err) {
5271                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5272                                 goto reconnect;
5273                         }
5274
5275                         tconn->last_received = jiffies;
5276
5277                         if (cmd == &asender_tbl[P_PING_ACK]) {
5278                                 /* restore idle timeout */
5279                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5280                                 ping_timeout_active = false;
5281                         }
5282
5283                         buf      = tconn->meta.rbuf;
5284                         received = 0;
5285                         expect   = header_size;
5286                         cmd      = NULL;
5287                 }
5288         }
5289
5290         if (0) {
5291 reconnect:
5292                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5293         }
5294         if (0) {
5295 disconnect:
5296                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5297         }
5298         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5299
5300         conn_info(tconn, "asender terminated\n");
5301
5302         return 0;
5303 }