drbd: fix various disconnecting races
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_req.h"
48
49 #include "drbd_vli.h"
50
51 struct packet_info {
52         enum drbd_packet cmd;
53         unsigned int size;
54         unsigned int vnr;
55         void *data;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_features(struct drbd_tconn *tconn);
65 static int drbd_do_auth(struct drbd_tconn *tconn);
66 static int drbd_disconnected(int vnr, void *p, void *data);
67
68 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
69 static int e_end_block(struct drbd_work *, int);
70
71
72 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
73
74 /*
75  * some helper functions to deal with single linked page lists,
76  * page->private being our "next" pointer.
77  */
78
79 /* If at least n pages are linked at head, get n pages off.
80  * Otherwise, don't modify head, and return NULL.
81  * Locking is the responsibility of the caller.
82  */
83 static struct page *page_chain_del(struct page **head, int n)
84 {
85         struct page *page;
86         struct page *tmp;
87
88         BUG_ON(!n);
89         BUG_ON(!head);
90
91         page = *head;
92
93         if (!page)
94                 return NULL;
95
96         while (page) {
97                 tmp = page_chain_next(page);
98                 if (--n == 0)
99                         break; /* found sufficient pages */
100                 if (tmp == NULL)
101                         /* insufficient pages, don't use any of them. */
102                         return NULL;
103                 page = tmp;
104         }
105
106         /* add end of list marker for the returned list */
107         set_page_private(page, 0);
108         /* actual return value, and adjustment of head */
109         page = *head;
110         *head = tmp;
111         return page;
112 }
113
114 /* may be used outside of locks to find the tail of a (usually short)
115  * "private" page chain, before adding it back to a global chain head
116  * with page_chain_add() under a spinlock. */
117 static struct page *page_chain_tail(struct page *page, int *len)
118 {
119         struct page *tmp;
120         int i = 1;
121         while ((tmp = page_chain_next(page)))
122                 ++i, page = tmp;
123         if (len)
124                 *len = i;
125         return page;
126 }
127
128 static int page_chain_free(struct page *page)
129 {
130         struct page *tmp;
131         int i = 0;
132         page_chain_for_each_safe(page, tmp) {
133                 put_page(page);
134                 ++i;
135         }
136         return i;
137 }
138
139 static void page_chain_add(struct page **head,
140                 struct page *chain_first, struct page *chain_last)
141 {
142 #if 1
143         struct page *tmp;
144         tmp = page_chain_tail(chain_first, NULL);
145         BUG_ON(tmp != chain_last);
146 #endif
147
148         /* add chain to head */
149         set_page_private(chain_last, (unsigned long)*head);
150         *head = chain_first;
151 }
152
153 static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
154                                        unsigned int number)
155 {
156         struct page *page = NULL;
157         struct page *tmp = NULL;
158         unsigned int i = 0;
159
160         /* Yes, testing drbd_pp_vacant outside the lock is racy.
161          * So what. It saves a spin_lock. */
162         if (drbd_pp_vacant >= number) {
163                 spin_lock(&drbd_pp_lock);
164                 page = page_chain_del(&drbd_pp_pool, number);
165                 if (page)
166                         drbd_pp_vacant -= number;
167                 spin_unlock(&drbd_pp_lock);
168                 if (page)
169                         return page;
170         }
171
172         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
173          * "criss-cross" setup, that might cause write-out on some other DRBD,
174          * which in turn might block on the other node at this very place.  */
175         for (i = 0; i < number; i++) {
176                 tmp = alloc_page(GFP_TRY);
177                 if (!tmp)
178                         break;
179                 set_page_private(tmp, (unsigned long)page);
180                 page = tmp;
181         }
182
183         if (i == number)
184                 return page;
185
186         /* Not enough pages immediately available this time.
187          * No need to jump around here, drbd_alloc_pages will retry this
188          * function "soon". */
189         if (page) {
190                 tmp = page_chain_tail(page, NULL);
191                 spin_lock(&drbd_pp_lock);
192                 page_chain_add(&drbd_pp_pool, page, tmp);
193                 drbd_pp_vacant += i;
194                 spin_unlock(&drbd_pp_lock);
195         }
196         return NULL;
197 }
198
199 static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
200                                            struct list_head *to_be_freed)
201 {
202         struct drbd_peer_request *peer_req;
203         struct list_head *le, *tle;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_safe(le, tle, &mdev->net_ee) {
211                 peer_req = list_entry(le, struct drbd_peer_request, w.list);
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(le, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&mdev->tconn->req_lock);
224         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
225         spin_unlock_irq(&mdev->tconn->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(mdev, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @mdev:       DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel, unless this allocation would exceed the max_buffers setting.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * Returns a page chain linked via page->private.
242  */
243 struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
244                               bool retry)
245 {
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(mdev->tconn->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&mdev->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(mdev, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(mdev);
265
266                 if (atomic_read(&mdev->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(mdev, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &mdev->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
296         int i;
297
298         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
299                 i = page_chain_free(page);
300         else {
301                 struct page *tmp;
302                 tmp = page_chain_tail(page, &i);
303                 spin_lock(&drbd_pp_lock);
304                 page_chain_add(&drbd_pp_pool, page, tmp);
305                 drbd_pp_vacant += i;
306                 spin_unlock(&drbd_pp_lock);
307         }
308         i = atomic_sub_return(i, a);
309         if (i < 0)
310                 dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
311                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
312         wake_up(&drbd_pp_wait);
313 }
314
315 /*
316 You need to hold the req_lock:
317  _drbd_wait_ee_list_empty()
318
319 You must not have the req_lock:
320  drbd_free_peer_req()
321  drbd_alloc_peer_req()
322  drbd_free_peer_reqs()
323  drbd_ee_fix_bhs()
324  drbd_finish_peer_reqs()
325  drbd_clear_done_ee()
326  drbd_wait_ee_list_empty()
327 */
328
329 struct drbd_peer_request *
330 drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
331                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
332 {
333         struct drbd_peer_request *peer_req;
334         struct page *page;
335         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
336
337         if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
338                 return NULL;
339
340         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
341         if (!peer_req) {
342                 if (!(gfp_mask & __GFP_NOWARN))
343                         dev_err(DEV, "%s: allocation failed\n", __func__);
344                 return NULL;
345         }
346
347         page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
348         if (!page)
349                 goto fail;
350
351         drbd_clear_interval(&peer_req->i);
352         peer_req->i.size = data_size;
353         peer_req->i.sector = sector;
354         peer_req->i.local = false;
355         peer_req->i.waiting = false;
356
357         peer_req->epoch = NULL;
358         peer_req->w.mdev = mdev;
359         peer_req->pages = page;
360         atomic_set(&peer_req->pending_bios, 0);
361         peer_req->flags = 0;
362         /*
363          * The block_id is opaque to the receiver.  It is not endianness
364          * converted, and sent back to the sender unchanged.
365          */
366         peer_req->block_id = id;
367
368         return peer_req;
369
370  fail:
371         mempool_free(peer_req, drbd_ee_mempool);
372         return NULL;
373 }
374
375 void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
376                        int is_net)
377 {
378         if (peer_req->flags & EE_HAS_DIGEST)
379                 kfree(peer_req->digest);
380         drbd_free_pages(mdev, peer_req->pages, is_net);
381         D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
382         D_ASSERT(drbd_interval_empty(&peer_req->i));
383         mempool_free(peer_req, drbd_ee_mempool);
384 }
385
386 int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
387 {
388         LIST_HEAD(work_list);
389         struct drbd_peer_request *peer_req, *t;
390         int count = 0;
391         int is_net = list == &mdev->net_ee;
392
393         spin_lock_irq(&mdev->tconn->req_lock);
394         list_splice_init(list, &work_list);
395         spin_unlock_irq(&mdev->tconn->req_lock);
396
397         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
398                 __drbd_free_peer_req(mdev, peer_req, is_net);
399                 count++;
400         }
401         return count;
402 }
403
404 /*
405  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
406  */
407 static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
408 {
409         LIST_HEAD(work_list);
410         LIST_HEAD(reclaimed);
411         struct drbd_peer_request *peer_req, *t;
412         int err = 0;
413
414         spin_lock_irq(&mdev->tconn->req_lock);
415         reclaim_finished_net_peer_reqs(mdev, &reclaimed);
416         list_splice_init(&mdev->done_ee, &work_list);
417         spin_unlock_irq(&mdev->tconn->req_lock);
418
419         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
420                 drbd_free_net_peer_req(mdev, peer_req);
421
422         /* possible callbacks here:
423          * e_end_block, and e_end_resync_block, e_send_discard_write.
424          * all ignore the last argument.
425          */
426         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
427                 int err2;
428
429                 /* list_del not necessary, next/prev members not touched */
430                 err2 = peer_req->w.cb(&peer_req->w, !!err);
431                 if (!err)
432                         err = err2;
433                 drbd_free_peer_req(mdev, peer_req);
434         }
435         wake_up(&mdev->ee_wait);
436
437         return err;
438 }
439
440 static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
441                                      struct list_head *head)
442 {
443         DEFINE_WAIT(wait);
444
445         /* avoids spin_lock/unlock
446          * and calling prepare_to_wait in the fast path */
447         while (!list_empty(head)) {
448                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
449                 spin_unlock_irq(&mdev->tconn->req_lock);
450                 io_schedule();
451                 finish_wait(&mdev->ee_wait, &wait);
452                 spin_lock_irq(&mdev->tconn->req_lock);
453         }
454 }
455
456 static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
457                                     struct list_head *head)
458 {
459         spin_lock_irq(&mdev->tconn->req_lock);
460         _drbd_wait_ee_list_empty(mdev, head);
461         spin_unlock_irq(&mdev->tconn->req_lock);
462 }
463
464 /* see also kernel_accept; which is only present since 2.6.18.
465  * also we want to log which part of it failed, exactly */
466 static int drbd_accept(const char **what, struct socket *sock, struct socket **newsock)
467 {
468         struct sock *sk = sock->sk;
469         int err = 0;
470
471         *what = "listen";
472         err = sock->ops->listen(sock, 5);
473         if (err < 0)
474                 goto out;
475
476         *what = "sock_create_lite";
477         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
478                                newsock);
479         if (err < 0)
480                 goto out;
481
482         *what = "accept";
483         err = sock->ops->accept(sock, *newsock, 0);
484         if (err < 0) {
485                 sock_release(*newsock);
486                 *newsock = NULL;
487                 goto out;
488         }
489         (*newsock)->ops  = sock->ops;
490
491 out:
492         return err;
493 }
494
495 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
496 {
497         mm_segment_t oldfs;
498         struct kvec iov = {
499                 .iov_base = buf,
500                 .iov_len = size,
501         };
502         struct msghdr msg = {
503                 .msg_iovlen = 1,
504                 .msg_iov = (struct iovec *)&iov,
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         int rv;
508
509         oldfs = get_fs();
510         set_fs(KERNEL_DS);
511         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
512         set_fs(oldfs);
513
514         return rv;
515 }
516
517 static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
518 {
519         mm_segment_t oldfs;
520         struct kvec iov = {
521                 .iov_base = buf,
522                 .iov_len = size,
523         };
524         struct msghdr msg = {
525                 .msg_iovlen = 1,
526                 .msg_iov = (struct iovec *)&iov,
527                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
528         };
529         int rv;
530
531         oldfs = get_fs();
532         set_fs(KERNEL_DS);
533
534         for (;;) {
535                 rv = sock_recvmsg(tconn->data.socket, &msg, size, msg.msg_flags);
536                 if (rv == size)
537                         break;
538
539                 /* Note:
540                  * ECONNRESET   other side closed the connection
541                  * ERESTARTSYS  (on  sock) we got a signal
542                  */
543
544                 if (rv < 0) {
545                         if (rv == -ECONNRESET)
546                                 conn_info(tconn, "sock was reset by peer\n");
547                         else if (rv != -ERESTARTSYS)
548                                 conn_err(tconn, "sock_recvmsg returned %d\n", rv);
549                         break;
550                 } else if (rv == 0) {
551                         conn_info(tconn, "sock was shut down by peer\n");
552                         break;
553                 } else  {
554                         /* signal came in, or peer/link went down,
555                          * after we read a partial message
556                          */
557                         /* D_ASSERT(signal_pending(current)); */
558                         break;
559                 }
560         };
561
562         set_fs(oldfs);
563
564         if (rv != size)
565                 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
566
567         return rv;
568 }
569
570 static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
571 {
572         int err;
573
574         err = drbd_recv(tconn, buf, size);
575         if (err != size) {
576                 if (err >= 0)
577                         err = -EIO;
578         } else
579                 err = 0;
580         return err;
581 }
582
583 static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
584 {
585         int err;
586
587         err = drbd_recv_all(tconn, buf, size);
588         if (err && !signal_pending(current))
589                 conn_warn(tconn, "short read (expected size %d)\n", (int)size);
590         return err;
591 }
592
593 /* quoting tcp(7):
594  *   On individual connections, the socket buffer size must be set prior to the
595  *   listen(2) or connect(2) calls in order to have it take effect.
596  * This is our wrapper to do so.
597  */
598 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
599                 unsigned int rcv)
600 {
601         /* open coded SO_SNDBUF, SO_RCVBUF */
602         if (snd) {
603                 sock->sk->sk_sndbuf = snd;
604                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
605         }
606         if (rcv) {
607                 sock->sk->sk_rcvbuf = rcv;
608                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
609         }
610 }
611
612 static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
613 {
614         const char *what;
615         struct socket *sock;
616         struct sockaddr_in6 src_in6;
617         struct sockaddr_in6 peer_in6;
618         struct net_conf *nc;
619         int err, peer_addr_len, my_addr_len;
620         int sndbuf_size, rcvbuf_size, try_connect_int;
621         int disconnect_on_error = 1;
622
623         rcu_read_lock();
624         nc = rcu_dereference(tconn->net_conf);
625         if (!nc) {
626                 rcu_read_unlock();
627                 return NULL;
628         }
629
630         sndbuf_size = nc->sndbuf_size;
631         rcvbuf_size = nc->rcvbuf_size;
632         try_connect_int = nc->try_connect_int;
633
634         my_addr_len = min_t(int, nc->my_addr_len, sizeof(src_in6));
635         memcpy(&src_in6, nc->my_addr, my_addr_len);
636
637         if (((struct sockaddr *)nc->my_addr)->sa_family == AF_INET6)
638                 src_in6.sin6_port = 0;
639         else
640                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
641
642         peer_addr_len = min_t(int, nc->peer_addr_len, sizeof(src_in6));
643         memcpy(&peer_in6, nc->peer_addr, peer_addr_len);
644
645         rcu_read_unlock();
646
647         what = "sock_create_kern";
648         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
649                                SOCK_STREAM, IPPROTO_TCP, &sock);
650         if (err < 0) {
651                 sock = NULL;
652                 goto out;
653         }
654
655         sock->sk->sk_rcvtimeo =
656         sock->sk->sk_sndtimeo = try_connect_int * HZ;
657         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
658
659        /* explicitly bind to the configured IP as source IP
660         *  for the outgoing connections.
661         *  This is needed for multihomed hosts and to be
662         *  able to use lo: interfaces for drbd.
663         * Make sure to use 0 as port number, so linux selects
664         *  a free one dynamically.
665         */
666         what = "bind before connect";
667         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
668         if (err < 0)
669                 goto out;
670
671         /* connect may fail, peer not yet available.
672          * stay C_WF_CONNECTION, don't go Disconnecting! */
673         disconnect_on_error = 0;
674         what = "connect";
675         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
676
677 out:
678         if (err < 0) {
679                 if (sock) {
680                         sock_release(sock);
681                         sock = NULL;
682                 }
683                 switch (-err) {
684                         /* timeout, busy, signal pending */
685                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
686                 case EINTR: case ERESTARTSYS:
687                         /* peer not (yet) available, network problem */
688                 case ECONNREFUSED: case ENETUNREACH:
689                 case EHOSTDOWN:    case EHOSTUNREACH:
690                         disconnect_on_error = 0;
691                         break;
692                 default:
693                         conn_err(tconn, "%s failed, err = %d\n", what, err);
694                 }
695                 if (disconnect_on_error)
696                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
697         }
698
699         return sock;
700 }
701
702 static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn)
703 {
704         int timeo, err, my_addr_len;
705         int sndbuf_size, rcvbuf_size, try_connect_int;
706         struct socket *s_estab = NULL, *s_listen;
707         struct sockaddr_in6 my_addr;
708         struct net_conf *nc;
709         const char *what;
710
711         rcu_read_lock();
712         nc = rcu_dereference(tconn->net_conf);
713         if (!nc) {
714                 rcu_read_unlock();
715                 return NULL;
716         }
717
718         sndbuf_size = nc->sndbuf_size;
719         rcvbuf_size = nc->rcvbuf_size;
720         try_connect_int = nc->try_connect_int;
721
722         my_addr_len = min_t(int, nc->my_addr_len, sizeof(struct sockaddr_in6));
723         memcpy(&my_addr, nc->my_addr, my_addr_len);
724         rcu_read_unlock();
725
726         what = "sock_create_kern";
727         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
728                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         timeo = try_connect_int * HZ;
735         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
736
737         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
738         s_listen->sk->sk_rcvtimeo = timeo;
739         s_listen->sk->sk_sndtimeo = timeo;
740         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
741
742         what = "bind before listen";
743         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
744         if (err < 0)
745                 goto out;
746
747         err = drbd_accept(&what, s_listen, &s_estab);
748
749 out:
750         if (s_listen)
751                 sock_release(s_listen);
752         if (err < 0) {
753                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
754                         conn_err(tconn, "%s failed, err = %d\n", what, err);
755                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
756                 }
757         }
758
759         return s_estab;
760 }
761
762 static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
763
764 static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
765                              enum drbd_packet cmd)
766 {
767         if (!conn_prepare_command(tconn, sock))
768                 return -EIO;
769         return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
770 }
771
772 static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
773 {
774         unsigned int header_size = drbd_header_size(tconn);
775         struct packet_info pi;
776         int err;
777
778         err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
779         if (err != header_size) {
780                 if (err >= 0)
781                         err = -EIO;
782                 return err;
783         }
784         err = decode_header(tconn, tconn->data.rbuf, &pi);
785         if (err)
786                 return err;
787         return pi.cmd;
788 }
789
790 /**
791  * drbd_socket_okay() - Free the socket if its connection is not okay
792  * @sock:       pointer to the pointer to the socket.
793  */
794 static int drbd_socket_okay(struct socket **sock)
795 {
796         int rr;
797         char tb[4];
798
799         if (!*sock)
800                 return false;
801
802         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
803
804         if (rr > 0 || rr == -EAGAIN) {
805                 return true;
806         } else {
807                 sock_release(*sock);
808                 *sock = NULL;
809                 return false;
810         }
811 }
812 /* Gets called if a connection is established, or if a new minor gets created
813    in a connection */
814 int drbd_connected(int vnr, void *p, void *data)
815 {
816         struct drbd_conf *mdev = (struct drbd_conf *)p;
817         int err;
818
819         atomic_set(&mdev->packet_seq, 0);
820         mdev->peer_seq = 0;
821
822         mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
823                 &mdev->tconn->cstate_mutex :
824                 &mdev->own_state_mutex;
825
826         err = drbd_send_sync_param(mdev);
827         if (!err)
828                 err = drbd_send_sizes(mdev, 0, 0);
829         if (!err)
830                 err = drbd_send_uuids(mdev);
831         if (!err)
832                 err = drbd_send_state(mdev);
833         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
834         clear_bit(RESIZE_PENDING, &mdev->flags);
835         mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
836         return err;
837 }
838
839 /*
840  * return values:
841  *   1 yes, we have a valid connection
842  *   0 oops, did not work out, please try again
843  *  -1 peer talks different language,
844  *     no point in trying again, please go standalone.
845  *  -2 We do not have a network config...
846  */
847 static int drbd_connect(struct drbd_tconn *tconn)
848 {
849         struct socket *sock, *msock;
850         struct net_conf *nc;
851         int timeout, try, h, ok;
852
853         if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
854                 return -2;
855
856         clear_bit(DISCARD_CONCURRENT, &tconn->flags);
857
858         /* Assume that the peer only understands protocol 80 until we know better.  */
859         tconn->agreed_pro_version = 80;
860
861         do {
862                 struct socket *s;
863
864                 for (try = 0;;) {
865                         /* 3 tries, this should take less than a second! */
866                         s = drbd_try_connect(tconn);
867                         if (s || ++try >= 3)
868                                 break;
869                         /* give the other side time to call bind() & listen() */
870                         schedule_timeout_interruptible(HZ / 10);
871                 }
872
873                 if (s) {
874                         if (!tconn->data.socket) {
875                                 tconn->data.socket = s;
876                                 send_first_packet(tconn, &tconn->data, P_INITIAL_DATA);
877                         } else if (!tconn->meta.socket) {
878                                 tconn->meta.socket = s;
879                                 send_first_packet(tconn, &tconn->meta, P_INITIAL_META);
880                         } else {
881                                 conn_err(tconn, "Logic error in drbd_connect()\n");
882                                 goto out_release_sockets;
883                         }
884                 }
885
886                 if (tconn->data.socket && tconn->meta.socket) {
887                         schedule_timeout_interruptible(tconn->net_conf->ping_timeo*HZ/10);
888                         ok = drbd_socket_okay(&tconn->data.socket);
889                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
890                         if (ok)
891                                 break;
892                 }
893
894 retry:
895                 s = drbd_wait_for_connect(tconn);
896                 if (s) {
897                         try = receive_first_packet(tconn, s);
898                         drbd_socket_okay(&tconn->data.socket);
899                         drbd_socket_okay(&tconn->meta.socket);
900                         switch (try) {
901                         case P_INITIAL_DATA:
902                                 if (tconn->data.socket) {
903                                         conn_warn(tconn, "initial packet S crossed\n");
904                                         sock_release(tconn->data.socket);
905                                 }
906                                 tconn->data.socket = s;
907                                 break;
908                         case P_INITIAL_META:
909                                 if (tconn->meta.socket) {
910                                         conn_warn(tconn, "initial packet M crossed\n");
911                                         sock_release(tconn->meta.socket);
912                                 }
913                                 tconn->meta.socket = s;
914                                 set_bit(DISCARD_CONCURRENT, &tconn->flags);
915                                 break;
916                         default:
917                                 conn_warn(tconn, "Error receiving initial packet\n");
918                                 sock_release(s);
919                                 if (random32() & 1)
920                                         goto retry;
921                         }
922                 }
923
924                 if (tconn->cstate <= C_DISCONNECTING)
925                         goto out_release_sockets;
926                 if (signal_pending(current)) {
927                         flush_signals(current);
928                         smp_rmb();
929                         if (get_t_state(&tconn->receiver) == EXITING)
930                                 goto out_release_sockets;
931                 }
932
933                 if (tconn->data.socket && &tconn->meta.socket) {
934                         ok = drbd_socket_okay(&tconn->data.socket);
935                         ok = drbd_socket_okay(&tconn->meta.socket) && ok;
936                         if (ok)
937                                 break;
938                 }
939         } while (1);
940
941         sock  = tconn->data.socket;
942         msock = tconn->meta.socket;
943
944         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
945         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
946
947         sock->sk->sk_allocation = GFP_NOIO;
948         msock->sk->sk_allocation = GFP_NOIO;
949
950         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
951         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
952
953         /* NOT YET ...
954          * sock->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
955          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
956          * first set it to the P_CONNECTION_FEATURES timeout,
957          * which we set to 4x the configured ping_timeout. */
958         rcu_read_lock();
959         nc = rcu_dereference(tconn->net_conf);
960
961         sock->sk->sk_sndtimeo =
962         sock->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
963
964         msock->sk->sk_rcvtimeo = nc->ping_int*HZ;
965         timeout = nc->timeout * HZ / 10;
966         rcu_read_unlock();
967
968         msock->sk->sk_sndtimeo = timeout;
969
970         /* we don't want delays.
971          * we use TCP_CORK where appropriate, though */
972         drbd_tcp_nodelay(sock);
973         drbd_tcp_nodelay(msock);
974
975         tconn->last_received = jiffies;
976
977         h = drbd_do_features(tconn);
978         if (h <= 0)
979                 return h;
980
981         if (tconn->cram_hmac_tfm) {
982                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
983                 switch (drbd_do_auth(tconn)) {
984                 case -1:
985                         conn_err(tconn, "Authentication of peer failed\n");
986                         return -1;
987                 case 0:
988                         conn_err(tconn, "Authentication of peer failed, trying again.\n");
989                         return 0;
990                 }
991         }
992
993         if (conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE) < SS_SUCCESS)
994                 return 0;
995
996         sock->sk->sk_sndtimeo = timeout;
997         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
998
999         drbd_thread_start(&tconn->asender);
1000
1001         if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1002                 return -1;
1003
1004         down_read(&drbd_cfg_rwsem);
1005         h = !idr_for_each(&tconn->volumes, drbd_connected, tconn);
1006         up_read(&drbd_cfg_rwsem);
1007         return h;
1008
1009 out_release_sockets:
1010         if (tconn->data.socket) {
1011                 sock_release(tconn->data.socket);
1012                 tconn->data.socket = NULL;
1013         }
1014         if (tconn->meta.socket) {
1015                 sock_release(tconn->meta.socket);
1016                 tconn->meta.socket = NULL;
1017         }
1018         return -1;
1019 }
1020
1021 static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1022 {
1023         unsigned int header_size = drbd_header_size(tconn);
1024
1025         if (header_size == sizeof(struct p_header100) &&
1026             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1027                 struct p_header100 *h = header;
1028                 if (h->pad != 0) {
1029                         conn_err(tconn, "Header padding is not zero\n");
1030                         return -EINVAL;
1031                 }
1032                 pi->vnr = be16_to_cpu(h->volume);
1033                 pi->cmd = be16_to_cpu(h->command);
1034                 pi->size = be32_to_cpu(h->length);
1035         } else if (header_size == sizeof(struct p_header95) &&
1036                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1037                 struct p_header95 *h = header;
1038                 pi->cmd = be16_to_cpu(h->command);
1039                 pi->size = be32_to_cpu(h->length);
1040                 pi->vnr = 0;
1041         } else if (header_size == sizeof(struct p_header80) &&
1042                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1043                 struct p_header80 *h = header;
1044                 pi->cmd = be16_to_cpu(h->command);
1045                 pi->size = be16_to_cpu(h->length);
1046                 pi->vnr = 0;
1047         } else {
1048                 conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1049                          be32_to_cpu(*(__be32 *)header),
1050                          tconn->agreed_pro_version);
1051                 return -EINVAL;
1052         }
1053         pi->data = header + header_size;
1054         return 0;
1055 }
1056
1057 static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1058 {
1059         void *buffer = tconn->data.rbuf;
1060         int err;
1061
1062         err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1063         if (err)
1064                 return err;
1065
1066         err = decode_header(tconn, buffer, pi);
1067         tconn->last_received = jiffies;
1068
1069         return err;
1070 }
1071
1072 static void drbd_flush(struct drbd_conf *mdev)
1073 {
1074         int rv;
1075
1076         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
1077                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
1078                                         NULL);
1079                 if (rv) {
1080                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
1081                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1082                          * don't try again for ANY return value != 0
1083                          * if (rv == -EOPNOTSUPP) */
1084                         drbd_bump_write_ordering(mdev, WO_drain_io);
1085                 }
1086                 put_ldev(mdev);
1087         }
1088 }
1089
1090 /**
1091  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1092  * @mdev:       DRBD device.
1093  * @epoch:      Epoch object.
1094  * @ev:         Epoch event.
1095  */
1096 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
1097                                                struct drbd_epoch *epoch,
1098                                                enum epoch_event ev)
1099 {
1100         int epoch_size;
1101         struct drbd_epoch *next_epoch;
1102         enum finish_epoch rv = FE_STILL_LIVE;
1103
1104         spin_lock(&mdev->epoch_lock);
1105         do {
1106                 next_epoch = NULL;
1107
1108                 epoch_size = atomic_read(&epoch->epoch_size);
1109
1110                 switch (ev & ~EV_CLEANUP) {
1111                 case EV_PUT:
1112                         atomic_dec(&epoch->active);
1113                         break;
1114                 case EV_GOT_BARRIER_NR:
1115                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1116                         break;
1117                 case EV_BECAME_LAST:
1118                         /* nothing to do*/
1119                         break;
1120                 }
1121
1122                 if (epoch_size != 0 &&
1123                     atomic_read(&epoch->active) == 0 &&
1124                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1125                         if (!(ev & EV_CLEANUP)) {
1126                                 spin_unlock(&mdev->epoch_lock);
1127                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1128                                 spin_lock(&mdev->epoch_lock);
1129                         }
1130                         dec_unacked(mdev);
1131
1132                         if (mdev->current_epoch != epoch) {
1133                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1134                                 list_del(&epoch->list);
1135                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1136                                 mdev->epochs--;
1137                                 kfree(epoch);
1138
1139                                 if (rv == FE_STILL_LIVE)
1140                                         rv = FE_DESTROYED;
1141                         } else {
1142                                 epoch->flags = 0;
1143                                 atomic_set(&epoch->epoch_size, 0);
1144                                 /* atomic_set(&epoch->active, 0); is already zero */
1145                                 if (rv == FE_STILL_LIVE)
1146                                         rv = FE_RECYCLED;
1147                                 wake_up(&mdev->ee_wait);
1148                         }
1149                 }
1150
1151                 if (!next_epoch)
1152                         break;
1153
1154                 epoch = next_epoch;
1155         } while (1);
1156
1157         spin_unlock(&mdev->epoch_lock);
1158
1159         return rv;
1160 }
1161
1162 /**
1163  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1164  * @mdev:       DRBD device.
1165  * @wo:         Write ordering method to try.
1166  */
1167 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1168 {
1169         enum write_ordering_e pwo;
1170         static char *write_ordering_str[] = {
1171                 [WO_none] = "none",
1172                 [WO_drain_io] = "drain",
1173                 [WO_bdev_flush] = "flush",
1174         };
1175
1176         pwo = mdev->write_ordering;
1177         wo = min(pwo, wo);
1178         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1179                 wo = WO_drain_io;
1180         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1181                 wo = WO_none;
1182         mdev->write_ordering = wo;
1183         if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1184                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1185 }
1186
1187 /**
1188  * drbd_submit_peer_request()
1189  * @mdev:       DRBD device.
1190  * @peer_req:   peer request
1191  * @rw:         flag field, see bio->bi_rw
1192  *
1193  * May spread the pages to multiple bios,
1194  * depending on bio_add_page restrictions.
1195  *
1196  * Returns 0 if all bios have been submitted,
1197  * -ENOMEM if we could not allocate enough bios,
1198  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1199  *  single page to an empty bio (which should never happen and likely indicates
1200  *  that the lower level IO stack is in some way broken). This has been observed
1201  *  on certain Xen deployments.
1202  */
1203 /* TODO allocate from our own bio_set. */
1204 int drbd_submit_peer_request(struct drbd_conf *mdev,
1205                              struct drbd_peer_request *peer_req,
1206                              const unsigned rw, const int fault_type)
1207 {
1208         struct bio *bios = NULL;
1209         struct bio *bio;
1210         struct page *page = peer_req->pages;
1211         sector_t sector = peer_req->i.sector;
1212         unsigned ds = peer_req->i.size;
1213         unsigned n_bios = 0;
1214         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1215         int err = -ENOMEM;
1216
1217         /* In most cases, we will only need one bio.  But in case the lower
1218          * level restrictions happen to be different at this offset on this
1219          * side than those of the sending peer, we may need to submit the
1220          * request in more than one bio.
1221          *
1222          * Plain bio_alloc is good enough here, this is no DRBD internally
1223          * generated bio, but a bio allocated on behalf of the peer.
1224          */
1225 next_bio:
1226         bio = bio_alloc(GFP_NOIO, nr_pages);
1227         if (!bio) {
1228                 dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1229                 goto fail;
1230         }
1231         /* > peer_req->i.sector, unless this is the first bio */
1232         bio->bi_sector = sector;
1233         bio->bi_bdev = mdev->ldev->backing_bdev;
1234         bio->bi_rw = rw;
1235         bio->bi_private = peer_req;
1236         bio->bi_end_io = drbd_peer_request_endio;
1237
1238         bio->bi_next = bios;
1239         bios = bio;
1240         ++n_bios;
1241
1242         page_chain_for_each(page) {
1243                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1244                 if (!bio_add_page(bio, page, len, 0)) {
1245                         /* A single page must always be possible!
1246                          * But in case it fails anyways,
1247                          * we deal with it, and complain (below). */
1248                         if (bio->bi_vcnt == 0) {
1249                                 dev_err(DEV,
1250                                         "bio_add_page failed for len=%u, "
1251                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1252                                         len, (unsigned long long)bio->bi_sector);
1253                                 err = -ENOSPC;
1254                                 goto fail;
1255                         }
1256                         goto next_bio;
1257                 }
1258                 ds -= len;
1259                 sector += len >> 9;
1260                 --nr_pages;
1261         }
1262         D_ASSERT(page == NULL);
1263         D_ASSERT(ds == 0);
1264
1265         atomic_set(&peer_req->pending_bios, n_bios);
1266         do {
1267                 bio = bios;
1268                 bios = bios->bi_next;
1269                 bio->bi_next = NULL;
1270
1271                 drbd_generic_make_request(mdev, fault_type, bio);
1272         } while (bios);
1273         return 0;
1274
1275 fail:
1276         while (bios) {
1277                 bio = bios;
1278                 bios = bios->bi_next;
1279                 bio_put(bio);
1280         }
1281         return err;
1282 }
1283
1284 static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1285                                              struct drbd_peer_request *peer_req)
1286 {
1287         struct drbd_interval *i = &peer_req->i;
1288
1289         drbd_remove_interval(&mdev->write_requests, i);
1290         drbd_clear_interval(i);
1291
1292         /* Wake up any processes waiting for this peer request to complete.  */
1293         if (i->waiting)
1294                 wake_up(&mdev->misc_wait);
1295 }
1296
1297 static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1298 {
1299         struct drbd_conf *mdev;
1300         int rv;
1301         struct p_barrier *p = pi->data;
1302         struct drbd_epoch *epoch;
1303
1304         mdev = vnr_to_mdev(tconn, pi->vnr);
1305         if (!mdev)
1306                 return -EIO;
1307
1308         inc_unacked(mdev);
1309
1310         mdev->current_epoch->barrier_nr = p->barrier;
1311         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1312
1313         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1314          * the activity log, which means it would not be resynced in case the
1315          * R_PRIMARY crashes now.
1316          * Therefore we must send the barrier_ack after the barrier request was
1317          * completed. */
1318         switch (mdev->write_ordering) {
1319         case WO_none:
1320                 if (rv == FE_RECYCLED)
1321                         return 0;
1322
1323                 /* receiver context, in the writeout path of the other node.
1324                  * avoid potential distributed deadlock */
1325                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1326                 if (epoch)
1327                         break;
1328                 else
1329                         dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1330                         /* Fall through */
1331
1332         case WO_bdev_flush:
1333         case WO_drain_io:
1334                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1335                 drbd_flush(mdev);
1336
1337                 if (atomic_read(&mdev->current_epoch->epoch_size)) {
1338                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1339                         if (epoch)
1340                                 break;
1341                 }
1342
1343                 epoch = mdev->current_epoch;
1344                 wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1345
1346                 D_ASSERT(atomic_read(&epoch->active) == 0);
1347                 D_ASSERT(epoch->flags == 0);
1348
1349                 return 0;
1350         default:
1351                 dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1352                 return -EIO;
1353         }
1354
1355         epoch->flags = 0;
1356         atomic_set(&epoch->epoch_size, 0);
1357         atomic_set(&epoch->active, 0);
1358
1359         spin_lock(&mdev->epoch_lock);
1360         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1361                 list_add(&epoch->list, &mdev->current_epoch->list);
1362                 mdev->current_epoch = epoch;
1363                 mdev->epochs++;
1364         } else {
1365                 /* The current_epoch got recycled while we allocated this one... */
1366                 kfree(epoch);
1367         }
1368         spin_unlock(&mdev->epoch_lock);
1369
1370         return 0;
1371 }
1372
1373 /* used from receive_RSDataReply (recv_resync_read)
1374  * and from receive_Data */
1375 static struct drbd_peer_request *
1376 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1377               int data_size) __must_hold(local)
1378 {
1379         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1380         struct drbd_peer_request *peer_req;
1381         struct page *page;
1382         int dgs, ds, err;
1383         void *dig_in = mdev->tconn->int_dig_in;
1384         void *dig_vv = mdev->tconn->int_dig_vv;
1385         unsigned long *data;
1386
1387         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1388                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1389
1390         if (dgs) {
1391                 /*
1392                  * FIXME: Receive the incoming digest into the receive buffer
1393                  *        here, together with its struct p_data?
1394                  */
1395                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1396                 if (err)
1397                         return NULL;
1398         }
1399
1400         data_size -= dgs;
1401
1402         if (!expect(data_size != 0))
1403                 return NULL;
1404         if (!expect(IS_ALIGNED(data_size, 512)))
1405                 return NULL;
1406         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1407                 return NULL;
1408
1409         /* even though we trust out peer,
1410          * we sometimes have to double check. */
1411         if (sector + (data_size>>9) > capacity) {
1412                 dev_err(DEV, "request from peer beyond end of local disk: "
1413                         "capacity: %llus < sector: %llus + size: %u\n",
1414                         (unsigned long long)capacity,
1415                         (unsigned long long)sector, data_size);
1416                 return NULL;
1417         }
1418
1419         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1420          * "criss-cross" setup, that might cause write-out on some other DRBD,
1421          * which in turn might block on the other node at this very place.  */
1422         peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1423         if (!peer_req)
1424                 return NULL;
1425
1426         ds = data_size;
1427         page = peer_req->pages;
1428         page_chain_for_each(page) {
1429                 unsigned len = min_t(int, ds, PAGE_SIZE);
1430                 data = kmap(page);
1431                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1432                 if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1433                         dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1434                         data[0] = data[0] ^ (unsigned long)-1;
1435                 }
1436                 kunmap(page);
1437                 if (err) {
1438                         drbd_free_peer_req(mdev, peer_req);
1439                         return NULL;
1440                 }
1441                 ds -= len;
1442         }
1443
1444         if (dgs) {
1445                 drbd_csum_ee(mdev, mdev->tconn->integrity_r_tfm, peer_req, dig_vv);
1446                 if (memcmp(dig_in, dig_vv, dgs)) {
1447                         dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1448                                 (unsigned long long)sector, data_size);
1449                         drbd_free_peer_req(mdev, peer_req);
1450                         return NULL;
1451                 }
1452         }
1453         mdev->recv_cnt += data_size>>9;
1454         return peer_req;
1455 }
1456
1457 /* drbd_drain_block() just takes a data block
1458  * out of the socket input buffer, and discards it.
1459  */
1460 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1461 {
1462         struct page *page;
1463         int err = 0;
1464         void *data;
1465
1466         if (!data_size)
1467                 return 0;
1468
1469         page = drbd_alloc_pages(mdev, 1, 1);
1470
1471         data = kmap(page);
1472         while (data_size) {
1473                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1474
1475                 err = drbd_recv_all_warn(mdev->tconn, data, len);
1476                 if (err)
1477                         break;
1478                 data_size -= len;
1479         }
1480         kunmap(page);
1481         drbd_free_pages(mdev, page, 0);
1482         return err;
1483 }
1484
1485 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1486                            sector_t sector, int data_size)
1487 {
1488         struct bio_vec *bvec;
1489         struct bio *bio;
1490         int dgs, err, i, expect;
1491         void *dig_in = mdev->tconn->int_dig_in;
1492         void *dig_vv = mdev->tconn->int_dig_vv;
1493
1494         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1495                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1496
1497         if (dgs) {
1498                 err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1499                 if (err)
1500                         return err;
1501         }
1502
1503         data_size -= dgs;
1504
1505         /* optimistically update recv_cnt.  if receiving fails below,
1506          * we disconnect anyways, and counters will be reset. */
1507         mdev->recv_cnt += data_size>>9;
1508
1509         bio = req->master_bio;
1510         D_ASSERT(sector == bio->bi_sector);
1511
1512         bio_for_each_segment(bvec, bio, i) {
1513                 void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1514                 expect = min_t(int, data_size, bvec->bv_len);
1515                 err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1516                 kunmap(bvec->bv_page);
1517                 if (err)
1518                         return err;
1519                 data_size -= expect;
1520         }
1521
1522         if (dgs) {
1523                 drbd_csum_bio(mdev, mdev->tconn->integrity_r_tfm, bio, dig_vv);
1524                 if (memcmp(dig_in, dig_vv, dgs)) {
1525                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1526                         return -EINVAL;
1527                 }
1528         }
1529
1530         D_ASSERT(data_size == 0);
1531         return 0;
1532 }
1533
1534 /*
1535  * e_end_resync_block() is called in asender context via
1536  * drbd_finish_peer_reqs().
1537  */
1538 static int e_end_resync_block(struct drbd_work *w, int unused)
1539 {
1540         struct drbd_peer_request *peer_req =
1541                 container_of(w, struct drbd_peer_request, w);
1542         struct drbd_conf *mdev = w->mdev;
1543         sector_t sector = peer_req->i.sector;
1544         int err;
1545
1546         D_ASSERT(drbd_interval_empty(&peer_req->i));
1547
1548         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1549                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1550                 err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1551         } else {
1552                 /* Record failure to sync */
1553                 drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1554
1555                 err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1556         }
1557         dec_unacked(mdev);
1558
1559         return err;
1560 }
1561
1562 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1563 {
1564         struct drbd_peer_request *peer_req;
1565
1566         peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1567         if (!peer_req)
1568                 goto fail;
1569
1570         dec_rs_pending(mdev);
1571
1572         inc_unacked(mdev);
1573         /* corresponding dec_unacked() in e_end_resync_block()
1574          * respective _drbd_clear_done_ee */
1575
1576         peer_req->w.cb = e_end_resync_block;
1577
1578         spin_lock_irq(&mdev->tconn->req_lock);
1579         list_add(&peer_req->w.list, &mdev->sync_ee);
1580         spin_unlock_irq(&mdev->tconn->req_lock);
1581
1582         atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1583         if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1584                 return 0;
1585
1586         /* don't care for the reason here */
1587         dev_err(DEV, "submit failed, triggering re-connect\n");
1588         spin_lock_irq(&mdev->tconn->req_lock);
1589         list_del(&peer_req->w.list);
1590         spin_unlock_irq(&mdev->tconn->req_lock);
1591
1592         drbd_free_peer_req(mdev, peer_req);
1593 fail:
1594         put_ldev(mdev);
1595         return -EIO;
1596 }
1597
1598 static struct drbd_request *
1599 find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1600              sector_t sector, bool missing_ok, const char *func)
1601 {
1602         struct drbd_request *req;
1603
1604         /* Request object according to our peer */
1605         req = (struct drbd_request *)(unsigned long)id;
1606         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1607                 return req;
1608         if (!missing_ok) {
1609                 dev_err(DEV, "%s: failed to find request %lu, sector %llus\n", func,
1610                         (unsigned long)id, (unsigned long long)sector);
1611         }
1612         return NULL;
1613 }
1614
1615 static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1616 {
1617         struct drbd_conf *mdev;
1618         struct drbd_request *req;
1619         sector_t sector;
1620         int err;
1621         struct p_data *p = pi->data;
1622
1623         mdev = vnr_to_mdev(tconn, pi->vnr);
1624         if (!mdev)
1625                 return -EIO;
1626
1627         sector = be64_to_cpu(p->sector);
1628
1629         spin_lock_irq(&mdev->tconn->req_lock);
1630         req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1631         spin_unlock_irq(&mdev->tconn->req_lock);
1632         if (unlikely(!req))
1633                 return -EIO;
1634
1635         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1636          * special casing it there for the various failure cases.
1637          * still no race with drbd_fail_pending_reads */
1638         err = recv_dless_read(mdev, req, sector, pi->size);
1639         if (!err)
1640                 req_mod(req, DATA_RECEIVED);
1641         /* else: nothing. handled from drbd_disconnect...
1642          * I don't think we may complete this just yet
1643          * in case we are "on-disconnect: freeze" */
1644
1645         return err;
1646 }
1647
1648 static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1649 {
1650         struct drbd_conf *mdev;
1651         sector_t sector;
1652         int err;
1653         struct p_data *p = pi->data;
1654
1655         mdev = vnr_to_mdev(tconn, pi->vnr);
1656         if (!mdev)
1657                 return -EIO;
1658
1659         sector = be64_to_cpu(p->sector);
1660         D_ASSERT(p->block_id == ID_SYNCER);
1661
1662         if (get_ldev(mdev)) {
1663                 /* data is submitted to disk within recv_resync_read.
1664                  * corresponding put_ldev done below on error,
1665                  * or in drbd_peer_request_endio. */
1666                 err = recv_resync_read(mdev, sector, pi->size);
1667         } else {
1668                 if (__ratelimit(&drbd_ratelimit_state))
1669                         dev_err(DEV, "Can not write resync data to local disk.\n");
1670
1671                 err = drbd_drain_block(mdev, pi->size);
1672
1673                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1674         }
1675
1676         atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1677
1678         return err;
1679 }
1680
1681 static int w_restart_write(struct drbd_work *w, int cancel)
1682 {
1683         struct drbd_request *req = container_of(w, struct drbd_request, w);
1684         struct drbd_conf *mdev = w->mdev;
1685         struct bio *bio;
1686         unsigned long start_time;
1687         unsigned long flags;
1688
1689         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1690         if (!expect(req->rq_state & RQ_POSTPONED)) {
1691                 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1692                 return -EIO;
1693         }
1694         bio = req->master_bio;
1695         start_time = req->start_time;
1696         /* Postponed requests will not have their master_bio completed!  */
1697         __req_mod(req, DISCARD_WRITE, NULL);
1698         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1699
1700         while (__drbd_make_request(mdev, bio, start_time))
1701                 /* retry */ ;
1702         return 0;
1703 }
1704
1705 static void restart_conflicting_writes(struct drbd_conf *mdev,
1706                                        sector_t sector, int size)
1707 {
1708         struct drbd_interval *i;
1709         struct drbd_request *req;
1710
1711         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1712                 if (!i->local)
1713                         continue;
1714                 req = container_of(i, struct drbd_request, i);
1715                 if (req->rq_state & RQ_LOCAL_PENDING ||
1716                     !(req->rq_state & RQ_POSTPONED))
1717                         continue;
1718                 if (expect(list_empty(&req->w.list))) {
1719                         req->w.mdev = mdev;
1720                         req->w.cb = w_restart_write;
1721                         drbd_queue_work(&mdev->tconn->data.work, &req->w);
1722                 }
1723         }
1724 }
1725
1726 /*
1727  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1728  */
1729 static int e_end_block(struct drbd_work *w, int cancel)
1730 {
1731         struct drbd_peer_request *peer_req =
1732                 container_of(w, struct drbd_peer_request, w);
1733         struct drbd_conf *mdev = w->mdev;
1734         sector_t sector = peer_req->i.sector;
1735         int err = 0, pcmd;
1736
1737         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1738                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1739                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1740                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1741                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1742                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1743                         err = drbd_send_ack(mdev, pcmd, peer_req);
1744                         if (pcmd == P_RS_WRITE_ACK)
1745                                 drbd_set_in_sync(mdev, sector, peer_req->i.size);
1746                 } else {
1747                         err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1748                         /* we expect it to be marked out of sync anyways...
1749                          * maybe assert this?  */
1750                 }
1751                 dec_unacked(mdev);
1752         }
1753         /* we delete from the conflict detection hash _after_ we sent out the
1754          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1755         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1756                 spin_lock_irq(&mdev->tconn->req_lock);
1757                 D_ASSERT(!drbd_interval_empty(&peer_req->i));
1758                 drbd_remove_epoch_entry_interval(mdev, peer_req);
1759                 if (peer_req->flags & EE_RESTART_REQUESTS)
1760                         restart_conflicting_writes(mdev, sector, peer_req->i.size);
1761                 spin_unlock_irq(&mdev->tconn->req_lock);
1762         } else
1763                 D_ASSERT(drbd_interval_empty(&peer_req->i));
1764
1765         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1766
1767         return err;
1768 }
1769
1770 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1771 {
1772         struct drbd_conf *mdev = w->mdev;
1773         struct drbd_peer_request *peer_req =
1774                 container_of(w, struct drbd_peer_request, w);
1775         int err;
1776
1777         err = drbd_send_ack(mdev, ack, peer_req);
1778         dec_unacked(mdev);
1779
1780         return err;
1781 }
1782
1783 static int e_send_discard_write(struct drbd_work *w, int unused)
1784 {
1785         return e_send_ack(w, P_DISCARD_WRITE);
1786 }
1787
1788 static int e_send_retry_write(struct drbd_work *w, int unused)
1789 {
1790         struct drbd_tconn *tconn = w->mdev->tconn;
1791
1792         return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1793                              P_RETRY_WRITE : P_DISCARD_WRITE);
1794 }
1795
1796 static bool seq_greater(u32 a, u32 b)
1797 {
1798         /*
1799          * We assume 32-bit wrap-around here.
1800          * For 24-bit wrap-around, we would have to shift:
1801          *  a <<= 8; b <<= 8;
1802          */
1803         return (s32)a - (s32)b > 0;
1804 }
1805
1806 static u32 seq_max(u32 a, u32 b)
1807 {
1808         return seq_greater(a, b) ? a : b;
1809 }
1810
1811 static bool need_peer_seq(struct drbd_conf *mdev)
1812 {
1813         struct drbd_tconn *tconn = mdev->tconn;
1814         int tp;
1815
1816         /*
1817          * We only need to keep track of the last packet_seq number of our peer
1818          * if we are in dual-primary mode and we have the discard flag set; see
1819          * handle_write_conflicts().
1820          */
1821
1822         rcu_read_lock();
1823         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1824         rcu_read_unlock();
1825
1826         return tp && test_bit(DISCARD_CONCURRENT, &tconn->flags);
1827 }
1828
1829 static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1830 {
1831         unsigned int newest_peer_seq;
1832
1833         if (need_peer_seq(mdev)) {
1834                 spin_lock(&mdev->peer_seq_lock);
1835                 newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1836                 mdev->peer_seq = newest_peer_seq;
1837                 spin_unlock(&mdev->peer_seq_lock);
1838                 /* wake up only if we actually changed mdev->peer_seq */
1839                 if (peer_seq == newest_peer_seq)
1840                         wake_up(&mdev->seq_wait);
1841         }
1842 }
1843
1844 /* Called from receive_Data.
1845  * Synchronize packets on sock with packets on msock.
1846  *
1847  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1848  * packet traveling on msock, they are still processed in the order they have
1849  * been sent.
1850  *
1851  * Note: we don't care for Ack packets overtaking P_DATA packets.
1852  *
1853  * In case packet_seq is larger than mdev->peer_seq number, there are
1854  * outstanding packets on the msock. We wait for them to arrive.
1855  * In case we are the logically next packet, we update mdev->peer_seq
1856  * ourselves. Correctly handles 32bit wrap around.
1857  *
1858  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1859  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1860  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1861  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1862  *
1863  * returns 0 if we may process the packet,
1864  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1865 static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1866 {
1867         DEFINE_WAIT(wait);
1868         long timeout;
1869         int ret;
1870
1871         if (!need_peer_seq(mdev))
1872                 return 0;
1873
1874         spin_lock(&mdev->peer_seq_lock);
1875         for (;;) {
1876                 if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1877                         mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1878                         ret = 0;
1879                         break;
1880                 }
1881                 if (signal_pending(current)) {
1882                         ret = -ERESTARTSYS;
1883                         break;
1884                 }
1885                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1886                 spin_unlock(&mdev->peer_seq_lock);
1887                 rcu_read_lock();
1888                 timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1889                 rcu_read_unlock();
1890                 timeout = schedule_timeout(timeout);
1891                 spin_lock(&mdev->peer_seq_lock);
1892                 if (!timeout) {
1893                         ret = -ETIMEDOUT;
1894                         dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
1895                         break;
1896                 }
1897         }
1898         spin_unlock(&mdev->peer_seq_lock);
1899         finish_wait(&mdev->seq_wait, &wait);
1900         return ret;
1901 }
1902
1903 /* see also bio_flags_to_wire()
1904  * DRBD_REQ_*, because we need to semantically map the flags to data packet
1905  * flags and back. We may replicate to other kernel versions. */
1906 static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1907 {
1908         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1909                 (dpf & DP_FUA ? REQ_FUA : 0) |
1910                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
1911                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1912 }
1913
1914 static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
1915                                     unsigned int size)
1916 {
1917         struct drbd_interval *i;
1918
1919     repeat:
1920         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1921                 struct drbd_request *req;
1922                 struct bio_and_error m;
1923
1924                 if (!i->local)
1925                         continue;
1926                 req = container_of(i, struct drbd_request, i);
1927                 if (!(req->rq_state & RQ_POSTPONED))
1928                         continue;
1929                 req->rq_state &= ~RQ_POSTPONED;
1930                 __req_mod(req, NEG_ACKED, &m);
1931                 spin_unlock_irq(&mdev->tconn->req_lock);
1932                 if (m.bio)
1933                         complete_master_bio(mdev, &m);
1934                 spin_lock_irq(&mdev->tconn->req_lock);
1935                 goto repeat;
1936         }
1937 }
1938
1939 static int handle_write_conflicts(struct drbd_conf *mdev,
1940                                   struct drbd_peer_request *peer_req)
1941 {
1942         struct drbd_tconn *tconn = mdev->tconn;
1943         bool resolve_conflicts = test_bit(DISCARD_CONCURRENT, &tconn->flags);
1944         sector_t sector = peer_req->i.sector;
1945         const unsigned int size = peer_req->i.size;
1946         struct drbd_interval *i;
1947         bool equal;
1948         int err;
1949
1950         /*
1951          * Inserting the peer request into the write_requests tree will prevent
1952          * new conflicting local requests from being added.
1953          */
1954         drbd_insert_interval(&mdev->write_requests, &peer_req->i);
1955
1956     repeat:
1957         drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1958                 if (i == &peer_req->i)
1959                         continue;
1960
1961                 if (!i->local) {
1962                         /*
1963                          * Our peer has sent a conflicting remote request; this
1964                          * should not happen in a two-node setup.  Wait for the
1965                          * earlier peer request to complete.
1966                          */
1967                         err = drbd_wait_misc(mdev, i);
1968                         if (err)
1969                                 goto out;
1970                         goto repeat;
1971                 }
1972
1973                 equal = i->sector == sector && i->size == size;
1974                 if (resolve_conflicts) {
1975                         /*
1976                          * If the peer request is fully contained within the
1977                          * overlapping request, it can be discarded; otherwise,
1978                          * it will be retried once all overlapping requests
1979                          * have completed.
1980                          */
1981                         bool discard = i->sector <= sector && i->sector +
1982                                        (i->size >> 9) >= sector + (size >> 9);
1983
1984                         if (!equal)
1985                                 dev_alert(DEV, "Concurrent writes detected: "
1986                                                "local=%llus +%u, remote=%llus +%u, "
1987                                                "assuming %s came first\n",
1988                                           (unsigned long long)i->sector, i->size,
1989                                           (unsigned long long)sector, size,
1990                                           discard ? "local" : "remote");
1991
1992                         inc_unacked(mdev);
1993                         peer_req->w.cb = discard ? e_send_discard_write :
1994                                                    e_send_retry_write;
1995                         list_add_tail(&peer_req->w.list, &mdev->done_ee);
1996                         wake_asender(mdev->tconn);
1997
1998                         err = -ENOENT;
1999                         goto out;
2000                 } else {
2001                         struct drbd_request *req =
2002                                 container_of(i, struct drbd_request, i);
2003
2004                         if (!equal)
2005                                 dev_alert(DEV, "Concurrent writes detected: "
2006                                                "local=%llus +%u, remote=%llus +%u\n",
2007                                           (unsigned long long)i->sector, i->size,
2008                                           (unsigned long long)sector, size);
2009
2010                         if (req->rq_state & RQ_LOCAL_PENDING ||
2011                             !(req->rq_state & RQ_POSTPONED)) {
2012                                 /*
2013                                  * Wait for the node with the discard flag to
2014                                  * decide if this request will be discarded or
2015                                  * retried.  Requests that are discarded will
2016                                  * disappear from the write_requests tree.
2017                                  *
2018                                  * In addition, wait for the conflicting
2019                                  * request to finish locally before submitting
2020                                  * the conflicting peer request.
2021                                  */
2022                                 err = drbd_wait_misc(mdev, &req->i);
2023                                 if (err) {
2024                                         _conn_request_state(mdev->tconn,
2025                                                             NS(conn, C_TIMEOUT),
2026                                                             CS_HARD);
2027                                         fail_postponed_requests(mdev, sector, size);
2028                                         goto out;
2029                                 }
2030                                 goto repeat;
2031                         }
2032                         /*
2033                          * Remember to restart the conflicting requests after
2034                          * the new peer request has completed.
2035                          */
2036                         peer_req->flags |= EE_RESTART_REQUESTS;
2037                 }
2038         }
2039         err = 0;
2040
2041     out:
2042         if (err)
2043                 drbd_remove_epoch_entry_interval(mdev, peer_req);
2044         return err;
2045 }
2046
2047 /* mirrored write */
2048 static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2049 {
2050         struct drbd_conf *mdev;
2051         sector_t sector;
2052         struct drbd_peer_request *peer_req;
2053         struct p_data *p = pi->data;
2054         u32 peer_seq = be32_to_cpu(p->seq_num);
2055         int rw = WRITE;
2056         u32 dp_flags;
2057         int err, tp;
2058
2059         mdev = vnr_to_mdev(tconn, pi->vnr);
2060         if (!mdev)
2061                 return -EIO;
2062
2063         if (!get_ldev(mdev)) {
2064                 int err2;
2065
2066                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2067                 drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2068                 atomic_inc(&mdev->current_epoch->epoch_size);
2069                 err2 = drbd_drain_block(mdev, pi->size);
2070                 if (!err)
2071                         err = err2;
2072                 return err;
2073         }
2074
2075         /*
2076          * Corresponding put_ldev done either below (on various errors), or in
2077          * drbd_peer_request_endio, if we successfully submit the data at the
2078          * end of this function.
2079          */
2080
2081         sector = be64_to_cpu(p->sector);
2082         peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2083         if (!peer_req) {
2084                 put_ldev(mdev);
2085                 return -EIO;
2086         }
2087
2088         peer_req->w.cb = e_end_block;
2089
2090         dp_flags = be32_to_cpu(p->dp_flags);
2091         rw |= wire_flags_to_bio(mdev, dp_flags);
2092
2093         if (dp_flags & DP_MAY_SET_IN_SYNC)
2094                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2095
2096         spin_lock(&mdev->epoch_lock);
2097         peer_req->epoch = mdev->current_epoch;
2098         atomic_inc(&peer_req->epoch->epoch_size);
2099         atomic_inc(&peer_req->epoch->active);
2100         spin_unlock(&mdev->epoch_lock);
2101
2102         rcu_read_lock();
2103         tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2104         rcu_read_unlock();
2105         if (tp) {
2106                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2107                 err = wait_for_and_update_peer_seq(mdev, peer_seq);
2108                 if (err)
2109                         goto out_interrupted;
2110                 spin_lock_irq(&mdev->tconn->req_lock);
2111                 err = handle_write_conflicts(mdev, peer_req);
2112                 if (err) {
2113                         spin_unlock_irq(&mdev->tconn->req_lock);
2114                         if (err == -ENOENT) {
2115                                 put_ldev(mdev);
2116                                 return 0;
2117                         }
2118                         goto out_interrupted;
2119                 }
2120         } else
2121                 spin_lock_irq(&mdev->tconn->req_lock);
2122         list_add(&peer_req->w.list, &mdev->active_ee);
2123         spin_unlock_irq(&mdev->tconn->req_lock);
2124
2125         if (mdev->tconn->agreed_pro_version < 100) {
2126                 rcu_read_lock();
2127                 switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2128                 case DRBD_PROT_C:
2129                         dp_flags |= DP_SEND_WRITE_ACK;
2130                         break;
2131                 case DRBD_PROT_B:
2132                         dp_flags |= DP_SEND_RECEIVE_ACK;
2133                         break;
2134                 }
2135                 rcu_read_unlock();
2136         }
2137
2138         if (dp_flags & DP_SEND_WRITE_ACK) {
2139                 peer_req->flags |= EE_SEND_WRITE_ACK;
2140                 inc_unacked(mdev);
2141                 /* corresponding dec_unacked() in e_end_block()
2142                  * respective _drbd_clear_done_ee */
2143         }
2144
2145         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2146                 /* I really don't like it that the receiver thread
2147                  * sends on the msock, but anyways */
2148                 drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2149         }
2150
2151         if (mdev->state.pdsk < D_INCONSISTENT) {
2152                 /* In case we have the only disk of the cluster, */
2153                 drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2154                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2155                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2156                 drbd_al_begin_io(mdev, &peer_req->i);
2157         }
2158
2159         err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2160         if (!err)
2161                 return 0;
2162
2163         /* don't care for the reason here */
2164         dev_err(DEV, "submit failed, triggering re-connect\n");
2165         spin_lock_irq(&mdev->tconn->req_lock);
2166         list_del(&peer_req->w.list);
2167         drbd_remove_epoch_entry_interval(mdev, peer_req);
2168         spin_unlock_irq(&mdev->tconn->req_lock);
2169         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2170                 drbd_al_complete_io(mdev, &peer_req->i);
2171
2172 out_interrupted:
2173         drbd_may_finish_epoch(mdev, peer_req->epoch, EV_PUT + EV_CLEANUP);
2174         put_ldev(mdev);
2175         drbd_free_peer_req(mdev, peer_req);
2176         return err;
2177 }
2178
2179 /* We may throttle resync, if the lower device seems to be busy,
2180  * and current sync rate is above c_min_rate.
2181  *
2182  * To decide whether or not the lower device is busy, we use a scheme similar
2183  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2184  * (more than 64 sectors) of activity we cannot account for with our own resync
2185  * activity, it obviously is "busy".
2186  *
2187  * The current sync rate used here uses only the most recent two step marks,
2188  * to have a short time average so we can react faster.
2189  */
2190 int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2191 {
2192         struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2193         unsigned long db, dt, dbdt;
2194         struct lc_element *tmp;
2195         int curr_events;
2196         int throttle = 0;
2197
2198         /* feature disabled? */
2199         if (mdev->ldev->dc.c_min_rate == 0)
2200                 return 0;
2201
2202         spin_lock_irq(&mdev->al_lock);
2203         tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2204         if (tmp) {
2205                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2206                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2207                         spin_unlock_irq(&mdev->al_lock);
2208                         return 0;
2209                 }
2210                 /* Do not slow down if app IO is already waiting for this extent */
2211         }
2212         spin_unlock_irq(&mdev->al_lock);
2213
2214         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2215                       (int)part_stat_read(&disk->part0, sectors[1]) -
2216                         atomic_read(&mdev->rs_sect_ev);
2217
2218         if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2219                 unsigned long rs_left;
2220                 int i;
2221
2222                 mdev->rs_last_events = curr_events;
2223
2224                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2225                  * approx. */
2226                 i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2227
2228                 if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2229                         rs_left = mdev->ov_left;
2230                 else
2231                         rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2232
2233                 dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2234                 if (!dt)
2235                         dt++;
2236                 db = mdev->rs_mark_left[i] - rs_left;
2237                 dbdt = Bit2KB(db/dt);
2238
2239                 if (dbdt > mdev->ldev->dc.c_min_rate)
2240                         throttle = 1;
2241         }
2242         return throttle;
2243 }
2244
2245
2246 static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2247 {
2248         struct drbd_conf *mdev;
2249         sector_t sector;
2250         sector_t capacity;
2251         struct drbd_peer_request *peer_req;
2252         struct digest_info *di = NULL;
2253         int size, verb;
2254         unsigned int fault_type;
2255         struct p_block_req *p = pi->data;
2256
2257         mdev = vnr_to_mdev(tconn, pi->vnr);
2258         if (!mdev)
2259                 return -EIO;
2260         capacity = drbd_get_capacity(mdev->this_bdev);
2261
2262         sector = be64_to_cpu(p->sector);
2263         size   = be32_to_cpu(p->blksize);
2264
2265         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2266                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2267                                 (unsigned long long)sector, size);
2268                 return -EINVAL;
2269         }
2270         if (sector + (size>>9) > capacity) {
2271                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2272                                 (unsigned long long)sector, size);
2273                 return -EINVAL;
2274         }
2275
2276         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2277                 verb = 1;
2278                 switch (pi->cmd) {
2279                 case P_DATA_REQUEST:
2280                         drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2281                         break;
2282                 case P_RS_DATA_REQUEST:
2283                 case P_CSUM_RS_REQUEST:
2284                 case P_OV_REQUEST:
2285                         drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2286                         break;
2287                 case P_OV_REPLY:
2288                         verb = 0;
2289                         dec_rs_pending(mdev);
2290                         drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2291                         break;
2292                 default:
2293                         BUG();
2294                 }
2295                 if (verb && __ratelimit(&drbd_ratelimit_state))
2296                         dev_err(DEV, "Can not satisfy peer's read request, "
2297                             "no local data.\n");
2298
2299                 /* drain possibly payload */
2300                 return drbd_drain_block(mdev, pi->size);
2301         }
2302
2303         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2304          * "criss-cross" setup, that might cause write-out on some other DRBD,
2305          * which in turn might block on the other node at this very place.  */
2306         peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2307         if (!peer_req) {
2308                 put_ldev(mdev);
2309                 return -ENOMEM;
2310         }
2311
2312         switch (pi->cmd) {
2313         case P_DATA_REQUEST:
2314                 peer_req->w.cb = w_e_end_data_req;
2315                 fault_type = DRBD_FAULT_DT_RD;
2316                 /* application IO, don't drbd_rs_begin_io */
2317                 goto submit;
2318
2319         case P_RS_DATA_REQUEST:
2320                 peer_req->w.cb = w_e_end_rsdata_req;
2321                 fault_type = DRBD_FAULT_RS_RD;
2322                 /* used in the sector offset progress display */
2323                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2324                 break;
2325
2326         case P_OV_REPLY:
2327         case P_CSUM_RS_REQUEST:
2328                 fault_type = DRBD_FAULT_RS_RD;
2329                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2330                 if (!di)
2331                         goto out_free_e;
2332
2333                 di->digest_size = pi->size;
2334                 di->digest = (((char *)di)+sizeof(struct digest_info));
2335
2336                 peer_req->digest = di;
2337                 peer_req->flags |= EE_HAS_DIGEST;
2338
2339                 if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2340                         goto out_free_e;
2341
2342                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2343                         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2344                         peer_req->w.cb = w_e_end_csum_rs_req;
2345                         /* used in the sector offset progress display */
2346                         mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2347                 } else if (pi->cmd == P_OV_REPLY) {
2348                         /* track progress, we may need to throttle */
2349                         atomic_add(size >> 9, &mdev->rs_sect_in);
2350                         peer_req->w.cb = w_e_end_ov_reply;
2351                         dec_rs_pending(mdev);
2352                         /* drbd_rs_begin_io done when we sent this request,
2353                          * but accounting still needs to be done. */
2354                         goto submit_for_resync;
2355                 }
2356                 break;
2357
2358         case P_OV_REQUEST:
2359                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2360                     mdev->tconn->agreed_pro_version >= 90) {
2361                         unsigned long now = jiffies;
2362                         int i;
2363                         mdev->ov_start_sector = sector;
2364                         mdev->ov_position = sector;
2365                         mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2366                         mdev->rs_total = mdev->ov_left;
2367                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2368                                 mdev->rs_mark_left[i] = mdev->ov_left;
2369                                 mdev->rs_mark_time[i] = now;
2370                         }
2371                         dev_info(DEV, "Online Verify start sector: %llu\n",
2372                                         (unsigned long long)sector);
2373                 }
2374                 peer_req->w.cb = w_e_end_ov_req;
2375                 fault_type = DRBD_FAULT_RS_RD;
2376                 break;
2377
2378         default:
2379                 BUG();
2380         }
2381
2382         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2383          * wrt the receiver, but it is not as straightforward as it may seem.
2384          * Various places in the resync start and stop logic assume resync
2385          * requests are processed in order, requeuing this on the worker thread
2386          * introduces a bunch of new code for synchronization between threads.
2387          *
2388          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2389          * "forever", throttling after drbd_rs_begin_io will lock that extent
2390          * for application writes for the same time.  For now, just throttle
2391          * here, where the rest of the code expects the receiver to sleep for
2392          * a while, anyways.
2393          */
2394
2395         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2396          * this defers syncer requests for some time, before letting at least
2397          * on request through.  The resync controller on the receiving side
2398          * will adapt to the incoming rate accordingly.
2399          *
2400          * We cannot throttle here if remote is Primary/SyncTarget:
2401          * we would also throttle its application reads.
2402          * In that case, throttling is done on the SyncTarget only.
2403          */
2404         if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2405                 schedule_timeout_uninterruptible(HZ/10);
2406         if (drbd_rs_begin_io(mdev, sector))
2407                 goto out_free_e;
2408
2409 submit_for_resync:
2410         atomic_add(size >> 9, &mdev->rs_sect_ev);
2411
2412 submit:
2413         inc_unacked(mdev);
2414         spin_lock_irq(&mdev->tconn->req_lock);
2415         list_add_tail(&peer_req->w.list, &mdev->read_ee);
2416         spin_unlock_irq(&mdev->tconn->req_lock);
2417
2418         if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2419                 return 0;
2420
2421         /* don't care for the reason here */
2422         dev_err(DEV, "submit failed, triggering re-connect\n");
2423         spin_lock_irq(&mdev->tconn->req_lock);
2424         list_del(&peer_req->w.list);
2425         spin_unlock_irq(&mdev->tconn->req_lock);
2426         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2427
2428 out_free_e:
2429         put_ldev(mdev);
2430         drbd_free_peer_req(mdev, peer_req);
2431         return -EIO;
2432 }
2433
2434 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2435 {
2436         int self, peer, rv = -100;
2437         unsigned long ch_self, ch_peer;
2438         enum drbd_after_sb_p after_sb_0p;
2439
2440         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2441         peer = mdev->p_uuid[UI_BITMAP] & 1;
2442
2443         ch_peer = mdev->p_uuid[UI_SIZE];
2444         ch_self = mdev->comm_bm_set;
2445
2446         rcu_read_lock();
2447         after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2448         rcu_read_unlock();
2449         switch (after_sb_0p) {
2450         case ASB_CONSENSUS:
2451         case ASB_DISCARD_SECONDARY:
2452         case ASB_CALL_HELPER:
2453         case ASB_VIOLENTLY:
2454                 dev_err(DEV, "Configuration error.\n");
2455                 break;
2456         case ASB_DISCONNECT:
2457                 break;
2458         case ASB_DISCARD_YOUNGER_PRI:
2459                 if (self == 0 && peer == 1) {
2460                         rv = -1;
2461                         break;
2462                 }
2463                 if (self == 1 && peer == 0) {
2464                         rv =  1;
2465                         break;
2466                 }
2467                 /* Else fall through to one of the other strategies... */
2468         case ASB_DISCARD_OLDER_PRI:
2469                 if (self == 0 && peer == 1) {
2470                         rv = 1;
2471                         break;
2472                 }
2473                 if (self == 1 && peer == 0) {
2474                         rv = -1;
2475                         break;
2476                 }
2477                 /* Else fall through to one of the other strategies... */
2478                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2479                      "Using discard-least-changes instead\n");
2480         case ASB_DISCARD_ZERO_CHG:
2481                 if (ch_peer == 0 && ch_self == 0) {
2482                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2483                                 ? -1 : 1;
2484                         break;
2485                 } else {
2486                         if (ch_peer == 0) { rv =  1; break; }
2487                         if (ch_self == 0) { rv = -1; break; }
2488                 }
2489                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2490                         break;
2491         case ASB_DISCARD_LEAST_CHG:
2492                 if      (ch_self < ch_peer)
2493                         rv = -1;
2494                 else if (ch_self > ch_peer)
2495                         rv =  1;
2496                 else /* ( ch_self == ch_peer ) */
2497                      /* Well, then use something else. */
2498                         rv = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags)
2499                                 ? -1 : 1;
2500                 break;
2501         case ASB_DISCARD_LOCAL:
2502                 rv = -1;
2503                 break;
2504         case ASB_DISCARD_REMOTE:
2505                 rv =  1;
2506         }
2507
2508         return rv;
2509 }
2510
2511 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2512 {
2513         int hg, rv = -100;
2514         enum drbd_after_sb_p after_sb_1p;
2515
2516         rcu_read_lock();
2517         after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2518         rcu_read_unlock();
2519         switch (after_sb_1p) {
2520         case ASB_DISCARD_YOUNGER_PRI:
2521         case ASB_DISCARD_OLDER_PRI:
2522         case ASB_DISCARD_LEAST_CHG:
2523         case ASB_DISCARD_LOCAL:
2524         case ASB_DISCARD_REMOTE:
2525         case ASB_DISCARD_ZERO_CHG:
2526                 dev_err(DEV, "Configuration error.\n");
2527                 break;
2528         case ASB_DISCONNECT:
2529                 break;
2530         case ASB_CONSENSUS:
2531                 hg = drbd_asb_recover_0p(mdev);
2532                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2533                         rv = hg;
2534                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2535                         rv = hg;
2536                 break;
2537         case ASB_VIOLENTLY:
2538                 rv = drbd_asb_recover_0p(mdev);
2539                 break;
2540         case ASB_DISCARD_SECONDARY:
2541                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2542         case ASB_CALL_HELPER:
2543                 hg = drbd_asb_recover_0p(mdev);
2544                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2545                         enum drbd_state_rv rv2;
2546
2547                         drbd_set_role(mdev, R_SECONDARY, 0);
2548                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2549                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2550                           * we do not need to wait for the after state change work either. */
2551                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2552                         if (rv2 != SS_SUCCESS) {
2553                                 drbd_khelper(mdev, "pri-lost-after-sb");
2554                         } else {
2555                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2556                                 rv = hg;
2557                         }
2558                 } else
2559                         rv = hg;
2560         }
2561
2562         return rv;
2563 }
2564
2565 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2566 {
2567         int hg, rv = -100;
2568         enum drbd_after_sb_p after_sb_2p;
2569
2570         rcu_read_lock();
2571         after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2572         rcu_read_unlock();
2573         switch (after_sb_2p) {
2574         case ASB_DISCARD_YOUNGER_PRI:
2575         case ASB_DISCARD_OLDER_PRI:
2576         case ASB_DISCARD_LEAST_CHG:
2577         case ASB_DISCARD_LOCAL:
2578         case ASB_DISCARD_REMOTE:
2579         case ASB_CONSENSUS:
2580         case ASB_DISCARD_SECONDARY:
2581         case ASB_DISCARD_ZERO_CHG:
2582                 dev_err(DEV, "Configuration error.\n");
2583                 break;
2584         case ASB_VIOLENTLY:
2585                 rv = drbd_asb_recover_0p(mdev);
2586                 break;
2587         case ASB_DISCONNECT:
2588                 break;
2589         case ASB_CALL_HELPER:
2590                 hg = drbd_asb_recover_0p(mdev);
2591                 if (hg == -1) {
2592                         enum drbd_state_rv rv2;
2593
2594                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2595                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2596                           * we do not need to wait for the after state change work either. */
2597                         rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2598                         if (rv2 != SS_SUCCESS) {
2599                                 drbd_khelper(mdev, "pri-lost-after-sb");
2600                         } else {
2601                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2602                                 rv = hg;
2603                         }
2604                 } else
2605                         rv = hg;
2606         }
2607
2608         return rv;
2609 }
2610
2611 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2612                            u64 bits, u64 flags)
2613 {
2614         if (!uuid) {
2615                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2616                 return;
2617         }
2618         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2619              text,
2620              (unsigned long long)uuid[UI_CURRENT],
2621              (unsigned long long)uuid[UI_BITMAP],
2622              (unsigned long long)uuid[UI_HISTORY_START],
2623              (unsigned long long)uuid[UI_HISTORY_END],
2624              (unsigned long long)bits,
2625              (unsigned long long)flags);
2626 }
2627
2628 /*
2629   100   after split brain try auto recover
2630     2   C_SYNC_SOURCE set BitMap
2631     1   C_SYNC_SOURCE use BitMap
2632     0   no Sync
2633    -1   C_SYNC_TARGET use BitMap
2634    -2   C_SYNC_TARGET set BitMap
2635  -100   after split brain, disconnect
2636 -1000   unrelated data
2637 -1091   requires proto 91
2638 -1096   requires proto 96
2639  */
2640 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2641 {
2642         u64 self, peer;
2643         int i, j;
2644
2645         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2646         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2647
2648         *rule_nr = 10;
2649         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2650                 return 0;
2651
2652         *rule_nr = 20;
2653         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2654              peer != UUID_JUST_CREATED)
2655                 return -2;
2656
2657         *rule_nr = 30;
2658         if (self != UUID_JUST_CREATED &&
2659             (peer == UUID_JUST_CREATED || peer == (u64)0))
2660                 return 2;
2661
2662         if (self == peer) {
2663                 int rct, dc; /* roles at crash time */
2664
2665                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2666
2667                         if (mdev->tconn->agreed_pro_version < 91)
2668                                 return -1091;
2669
2670                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2671                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2672                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2673                                 drbd_uuid_set_bm(mdev, 0UL);
2674
2675                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2676                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2677                                 *rule_nr = 34;
2678                         } else {
2679                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2680                                 *rule_nr = 36;
2681                         }
2682
2683                         return 1;
2684                 }
2685
2686                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2687
2688                         if (mdev->tconn->agreed_pro_version < 91)
2689                                 return -1091;
2690
2691                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2692                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2693                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2694
2695                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2696                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2697                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2698
2699                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2700                                 *rule_nr = 35;
2701                         } else {
2702                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2703                                 *rule_nr = 37;
2704                         }
2705
2706                         return -1;
2707                 }
2708
2709                 /* Common power [off|failure] */
2710                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2711                         (mdev->p_uuid[UI_FLAGS] & 2);
2712                 /* lowest bit is set when we were primary,
2713                  * next bit (weight 2) is set when peer was primary */
2714                 *rule_nr = 40;
2715
2716                 switch (rct) {
2717                 case 0: /* !self_pri && !peer_pri */ return 0;
2718                 case 1: /*  self_pri && !peer_pri */ return 1;
2719                 case 2: /* !self_pri &&  peer_pri */ return -1;
2720                 case 3: /*  self_pri &&  peer_pri */
2721                         dc = test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags);
2722                         return dc ? -1 : 1;
2723                 }
2724         }
2725
2726         *rule_nr = 50;
2727         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2728         if (self == peer)
2729                 return -1;
2730
2731         *rule_nr = 51;
2732         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2733         if (self == peer) {
2734                 if (mdev->tconn->agreed_pro_version < 96 ?
2735                     (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2736                     (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2737                     peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2738                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2739                            resync as sync source modifications of the peer's UUIDs. */
2740
2741                         if (mdev->tconn->agreed_pro_version < 91)
2742                                 return -1091;
2743
2744                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2745                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2746
2747                         dev_info(DEV, "Did not got last syncUUID packet, corrected:\n");
2748                         drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2749
2750                         return -1;
2751                 }
2752         }
2753
2754         *rule_nr = 60;
2755         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2756         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2757                 peer = mdev->p_uuid[i] & ~((u64)1);
2758                 if (self == peer)
2759                         return -2;
2760         }
2761
2762         *rule_nr = 70;
2763         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2764         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2765         if (self == peer)
2766                 return 1;
2767
2768         *rule_nr = 71;
2769         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2770         if (self == peer) {
2771                 if (mdev->tconn->agreed_pro_version < 96 ?
2772                     (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2773                     (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2774                     self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2775                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2776                            resync as sync source modifications of our UUIDs. */
2777
2778                         if (mdev->tconn->agreed_pro_version < 91)
2779                                 return -1091;
2780
2781                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2782                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2783
2784                         dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2785                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2786                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2787
2788                         return 1;
2789                 }
2790         }
2791
2792
2793         *rule_nr = 80;
2794         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2795         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2796                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2797                 if (self == peer)
2798                         return 2;
2799         }
2800
2801         *rule_nr = 90;
2802         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2803         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2804         if (self == peer && self != ((u64)0))
2805                 return 100;
2806
2807         *rule_nr = 100;
2808         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2809                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2810                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2811                         peer = mdev->p_uuid[j] & ~((u64)1);
2812                         if (self == peer)
2813                                 return -100;
2814                 }
2815         }
2816
2817         return -1000;
2818 }
2819
2820 /* drbd_sync_handshake() returns the new conn state on success, or
2821    CONN_MASK (-1) on failure.
2822  */
2823 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2824                                            enum drbd_disk_state peer_disk) __must_hold(local)
2825 {
2826         enum drbd_conns rv = C_MASK;
2827         enum drbd_disk_state mydisk;
2828         struct net_conf *nc;
2829         int hg, rule_nr, rr_conflict, dry_run;
2830
2831         mydisk = mdev->state.disk;
2832         if (mydisk == D_NEGOTIATING)
2833                 mydisk = mdev->new_state_tmp.disk;
2834
2835         dev_info(DEV, "drbd_sync_handshake:\n");
2836         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2837         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2838                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2839
2840         hg = drbd_uuid_compare(mdev, &rule_nr);
2841
2842         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2843
2844         if (hg == -1000) {
2845                 dev_alert(DEV, "Unrelated data, aborting!\n");
2846                 return C_MASK;
2847         }
2848         if (hg < -1000) {
2849                 dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2850                 return C_MASK;
2851         }
2852
2853         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2854             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2855                 int f = (hg == -100) || abs(hg) == 2;
2856                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2857                 if (f)
2858                         hg = hg*2;
2859                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2860                      hg > 0 ? "source" : "target");
2861         }
2862
2863         if (abs(hg) == 100)
2864                 drbd_khelper(mdev, "initial-split-brain");
2865
2866         rcu_read_lock();
2867         nc = rcu_dereference(mdev->tconn->net_conf);
2868
2869         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2870                 int pcount = (mdev->state.role == R_PRIMARY)
2871                            + (peer_role == R_PRIMARY);
2872                 int forced = (hg == -100);
2873
2874                 switch (pcount) {
2875                 case 0:
2876                         hg = drbd_asb_recover_0p(mdev);
2877                         break;
2878                 case 1:
2879                         hg = drbd_asb_recover_1p(mdev);
2880                         break;
2881                 case 2:
2882                         hg = drbd_asb_recover_2p(mdev);
2883                         break;
2884                 }
2885                 if (abs(hg) < 100) {
2886                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2887                              "automatically solved. Sync from %s node\n",
2888                              pcount, (hg < 0) ? "peer" : "this");
2889                         if (forced) {
2890                                 dev_warn(DEV, "Doing a full sync, since"
2891                                      " UUIDs where ambiguous.\n");
2892                                 hg = hg*2;
2893                         }
2894                 }
2895         }
2896
2897         if (hg == -100) {
2898                 if (nc->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2899                         hg = -1;
2900                 if (!nc->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2901                         hg = 1;
2902
2903                 if (abs(hg) < 100)
2904                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2905                              "Sync from %s node\n",
2906                              (hg < 0) ? "peer" : "this");
2907         }
2908         rr_conflict = nc->rr_conflict;
2909         dry_run = nc->dry_run;
2910         rcu_read_unlock();
2911
2912         if (hg == -100) {
2913                 /* FIXME this log message is not correct if we end up here
2914                  * after an attempted attach on a diskless node.
2915                  * We just refuse to attach -- well, we drop the "connection"
2916                  * to that disk, in a way... */
2917                 dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2918                 drbd_khelper(mdev, "split-brain");
2919                 return C_MASK;
2920         }
2921
2922         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2923                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2924                 return C_MASK;
2925         }
2926
2927         if (hg < 0 && /* by intention we do not use mydisk here. */
2928             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2929                 switch (rr_conflict) {
2930                 case ASB_CALL_HELPER:
2931                         drbd_khelper(mdev, "pri-lost");
2932                         /* fall through */
2933                 case ASB_DISCONNECT:
2934                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2935                         return C_MASK;
2936                 case ASB_VIOLENTLY:
2937                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2938                              "assumption\n");
2939                 }
2940         }
2941
2942         if (dry_run || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
2943                 if (hg == 0)
2944                         dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2945                 else
2946                         dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2947                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2948                                  abs(hg) >= 2 ? "full" : "bit-map based");
2949                 return C_MASK;
2950         }
2951
2952         if (abs(hg) >= 2) {
2953                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2954                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
2955                                         BM_LOCKED_SET_ALLOWED))
2956                         return C_MASK;
2957         }
2958
2959         if (hg > 0) { /* become sync source. */
2960                 rv = C_WF_BITMAP_S;
2961         } else if (hg < 0) { /* become sync target */
2962                 rv = C_WF_BITMAP_T;
2963         } else {
2964                 rv = C_CONNECTED;
2965                 if (drbd_bm_total_weight(mdev)) {
2966                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2967                              drbd_bm_total_weight(mdev));
2968                 }
2969         }
2970
2971         return rv;
2972 }
2973
2974 /* returns 1 if invalid */
2975 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2976 {
2977         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2978         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2979             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2980                 return 0;
2981
2982         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2983         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2984             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2985                 return 1;
2986
2987         /* everything else is valid if they are equal on both sides. */
2988         if (peer == self)
2989                 return 0;
2990
2991         /* everything es is invalid. */
2992         return 1;
2993 }
2994
2995 static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
2996 {
2997         struct p_protocol *p = pi->data;
2998         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2999         int p_want_lose, p_two_primaries, cf;
3000         char p_integrity_alg[SHARED_SECRET_MAX] = "";
3001         unsigned char *my_alg;
3002         struct net_conf *nc;
3003
3004         p_proto         = be32_to_cpu(p->protocol);
3005         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3006         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3007         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3008         p_two_primaries = be32_to_cpu(p->two_primaries);
3009         cf              = be32_to_cpu(p->conn_flags);
3010         p_want_lose = cf & CF_WANT_LOSE;
3011
3012         clear_bit(CONN_DRY_RUN, &tconn->flags);
3013
3014         if (cf & CF_DRY_RUN)
3015                 set_bit(CONN_DRY_RUN, &tconn->flags);
3016
3017         rcu_read_lock();
3018         nc = rcu_dereference(tconn->net_conf);
3019
3020         if (p_proto != nc->wire_protocol && tconn->agreed_pro_version < 100) {
3021                 conn_err(tconn, "incompatible communication protocols\n");
3022                 goto disconnect_rcu_unlock;
3023         }
3024
3025         if (cmp_after_sb(p_after_sb_0p, nc->after_sb_0p)) {
3026                 conn_err(tconn, "incompatible after-sb-0pri settings\n");
3027                 goto disconnect_rcu_unlock;
3028         }
3029
3030         if (cmp_after_sb(p_after_sb_1p, nc->after_sb_1p)) {
3031                 conn_err(tconn, "incompatible after-sb-1pri settings\n");
3032                 goto disconnect_rcu_unlock;
3033         }
3034
3035         if (cmp_after_sb(p_after_sb_2p, nc->after_sb_2p)) {
3036                 conn_err(tconn, "incompatible after-sb-2pri settings\n");
3037                 goto disconnect_rcu_unlock;
3038         }
3039
3040         if (p_want_lose && nc->want_lose) {
3041                 conn_err(tconn, "both sides have the 'want_lose' flag set\n");
3042                 goto disconnect_rcu_unlock;
3043         }
3044
3045         if (p_two_primaries != nc->two_primaries) {
3046                 conn_err(tconn, "incompatible setting of the two-primaries options\n");
3047                 goto disconnect_rcu_unlock;
3048         }
3049
3050         my_alg = nc->integrity_alg;
3051         rcu_read_unlock();
3052
3053         if (tconn->agreed_pro_version >= 87) {
3054                 int err;
3055
3056                 err = drbd_recv_all(tconn, p_integrity_alg, pi->size);
3057                 if (err)
3058                         return err;
3059
3060                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
3061                 if (strcmp(p_integrity_alg, my_alg)) {
3062                         conn_err(tconn, "incompatible setting of the data-integrity-alg\n");
3063                         goto disconnect;
3064                 }
3065                 conn_info(tconn, "data-integrity-alg: %s\n",
3066                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
3067         }
3068
3069         return 0;
3070
3071 disconnect_rcu_unlock:
3072         rcu_read_unlock();
3073 disconnect:
3074         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3075         return -EIO;
3076 }
3077
3078 /* helper function
3079  * input: alg name, feature name
3080  * return: NULL (alg name was "")
3081  *         ERR_PTR(error) if something goes wrong
3082  *         or the crypto hash ptr, if it worked out ok. */
3083 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3084                 const char *alg, const char *name)
3085 {
3086         struct crypto_hash *tfm;
3087
3088         if (!alg[0])
3089                 return NULL;
3090
3091         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3092         if (IS_ERR(tfm)) {
3093                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3094                         alg, name, PTR_ERR(tfm));
3095                 return tfm;
3096         }
3097         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
3098                 crypto_free_hash(tfm);
3099                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
3100                 return ERR_PTR(-EINVAL);
3101         }
3102         return tfm;
3103 }
3104
3105 static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3106 {
3107         void *buffer = tconn->data.rbuf;
3108         int size = pi->size;
3109
3110         while (size) {
3111                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3112                 s = drbd_recv(tconn, buffer, s);
3113                 if (s <= 0) {
3114                         if (s < 0)
3115                                 return s;
3116                         break;
3117                 }
3118                 size -= s;
3119         }
3120         if (size)
3121                 return -EIO;
3122         return 0;
3123 }
3124
3125 /*
3126  * config_unknown_volume  -  device configuration command for unknown volume
3127  *
3128  * When a device is added to an existing connection, the node on which the
3129  * device is added first will send configuration commands to its peer but the
3130  * peer will not know about the device yet.  It will warn and ignore these
3131  * commands.  Once the device is added on the second node, the second node will
3132  * send the same device configuration commands, but in the other direction.
3133  *
3134  * (We can also end up here if drbd is misconfigured.)
3135  */
3136 static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3137 {
3138         conn_warn(tconn, "Volume %u unknown; ignoring %s packet\n",
3139                   pi->vnr, cmdname(pi->cmd));
3140         return ignore_remaining_packet(tconn, pi);
3141 }
3142
3143 static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3144 {
3145         struct drbd_conf *mdev;
3146         struct p_rs_param_95 *p;
3147         unsigned int header_size, data_size, exp_max_sz;
3148         struct crypto_hash *verify_tfm = NULL;
3149         struct crypto_hash *csums_tfm = NULL;
3150         struct net_conf *old_conf, *new_conf = NULL;
3151         const int apv = tconn->agreed_pro_version;
3152         int *rs_plan_s = NULL;
3153         int fifo_size = 0;
3154         int err;
3155
3156         mdev = vnr_to_mdev(tconn, pi->vnr);
3157         if (!mdev)
3158                 return config_unknown_volume(tconn, pi);
3159
3160         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3161                     : apv == 88 ? sizeof(struct p_rs_param)
3162                                         + SHARED_SECRET_MAX
3163                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3164                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3165
3166         if (pi->size > exp_max_sz) {
3167                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3168                     pi->size, exp_max_sz);
3169                 return -EIO;
3170         }
3171
3172         if (apv <= 88) {
3173                 header_size = sizeof(struct p_rs_param);
3174                 data_size = pi->size - header_size;
3175         } else if (apv <= 94) {
3176                 header_size = sizeof(struct p_rs_param_89);
3177                 data_size = pi->size - header_size;
3178                 D_ASSERT(data_size == 0);
3179         } else {
3180                 header_size = sizeof(struct p_rs_param_95);
3181                 data_size = pi->size - header_size;
3182                 D_ASSERT(data_size == 0);
3183         }
3184
3185         /* initialize verify_alg and csums_alg */
3186         p = pi->data;
3187         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3188
3189         err = drbd_recv_all(mdev->tconn, p, header_size);
3190         if (err)
3191                 return err;
3192
3193         if (get_ldev(mdev)) {
3194                 mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3195                 put_ldev(mdev);
3196         }
3197
3198         if (apv >= 88) {
3199                 if (apv == 88) {
3200                         if (data_size > SHARED_SECRET_MAX) {
3201                                 dev_err(DEV, "verify-alg too long, "
3202                                     "peer wants %u, accepting only %u byte\n",
3203                                                 data_size, SHARED_SECRET_MAX);
3204                                 return -EIO;
3205                         }
3206
3207                         err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3208                         if (err)
3209                                 return err;
3210
3211                         /* we expect NUL terminated string */
3212                         /* but just in case someone tries to be evil */
3213                         D_ASSERT(p->verify_alg[data_size-1] == 0);
3214                         p->verify_alg[data_size-1] = 0;
3215
3216                 } else /* apv >= 89 */ {
3217                         /* we still expect NUL terminated strings */
3218                         /* but just in case someone tries to be evil */
3219                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3220                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3221                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3222                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3223                 }
3224
3225                 mutex_lock(&mdev->tconn->net_conf_update);
3226                 old_conf = mdev->tconn->net_conf;
3227
3228                 if (strcmp(old_conf->verify_alg, p->verify_alg)) {
3229                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3230                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3231                                     old_conf->verify_alg, p->verify_alg);
3232                                 goto disconnect;
3233                         }
3234                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3235                                         p->verify_alg, "verify-alg");
3236                         if (IS_ERR(verify_tfm)) {
3237                                 verify_tfm = NULL;
3238                                 goto disconnect;
3239                         }
3240                 }
3241
3242                 if (apv >= 89 && strcmp(old_conf->csums_alg, p->csums_alg)) {
3243                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3244                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3245                                     old_conf->csums_alg, p->csums_alg);
3246                                 goto disconnect;
3247                         }
3248                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3249                                         p->csums_alg, "csums-alg");
3250                         if (IS_ERR(csums_tfm)) {
3251                                 csums_tfm = NULL;
3252                                 goto disconnect;
3253                         }
3254                 }
3255
3256                 if (apv > 94 && get_ldev(mdev)) {
3257                         mdev->ldev->dc.resync_rate = be32_to_cpu(p->rate);
3258                         mdev->ldev->dc.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3259                         mdev->ldev->dc.c_delay_target = be32_to_cpu(p->c_delay_target);
3260                         mdev->ldev->dc.c_fill_target = be32_to_cpu(p->c_fill_target);
3261                         mdev->ldev->dc.c_max_rate = be32_to_cpu(p->c_max_rate);
3262
3263                         fifo_size = (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3264                         if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
3265                                 rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
3266                                 if (!rs_plan_s) {
3267                                         dev_err(DEV, "kmalloc of fifo_buffer failed");
3268                                         put_ldev(mdev);
3269                                         goto disconnect;
3270                                 }
3271                         }
3272                         put_ldev(mdev);
3273                 }
3274
3275                 if (verify_tfm || csums_tfm) {
3276                         new_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3277                         if (!new_conf) {
3278                                 dev_err(DEV, "Allocation of new net_conf failed\n");
3279                                 goto disconnect;
3280                         }
3281
3282                         *new_conf = *old_conf;
3283
3284                         if (verify_tfm) {
3285                                 strcpy(new_conf->verify_alg, p->verify_alg);
3286                                 new_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3287                                 crypto_free_hash(mdev->tconn->verify_tfm);
3288                                 mdev->tconn->verify_tfm = verify_tfm;
3289                                 dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3290                         }
3291                         if (csums_tfm) {
3292                                 strcpy(new_conf->csums_alg, p->csums_alg);
3293                                 new_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3294                                 crypto_free_hash(mdev->tconn->csums_tfm);
3295                                 mdev->tconn->csums_tfm = csums_tfm;
3296                                 dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3297                         }
3298                         rcu_assign_pointer(tconn->net_conf, new_conf);
3299                 }
3300                 mutex_unlock(&mdev->tconn->net_conf_update);
3301                 if (new_conf) {
3302                         synchronize_rcu();
3303                         kfree(old_conf);
3304                 }
3305
3306                 spin_lock(&mdev->peer_seq_lock);
3307                 if (fifo_size != mdev->rs_plan_s.size) {
3308                         kfree(mdev->rs_plan_s.values);
3309                         mdev->rs_plan_s.values = rs_plan_s;
3310                         mdev->rs_plan_s.size   = fifo_size;
3311                         mdev->rs_planed = 0;
3312                 }
3313                 spin_unlock(&mdev->peer_seq_lock);
3314         }
3315         return 0;
3316
3317 disconnect:
3318         mutex_unlock(&mdev->tconn->net_conf_update);
3319         /* just for completeness: actually not needed,
3320          * as this is not reached if csums_tfm was ok. */
3321         crypto_free_hash(csums_tfm);
3322         /* but free the verify_tfm again, if csums_tfm did not work out */
3323         crypto_free_hash(verify_tfm);
3324         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3325         return -EIO;
3326 }
3327
3328 /* warn if the arguments differ by more than 12.5% */
3329 static void warn_if_differ_considerably(struct drbd_conf *mdev,
3330         const char *s, sector_t a, sector_t b)
3331 {
3332         sector_t d;
3333         if (a == 0 || b == 0)
3334                 return;
3335         d = (a > b) ? (a - b) : (b - a);
3336         if (d > (a>>3) || d > (b>>3))
3337                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3338                      (unsigned long long)a, (unsigned long long)b);
3339 }
3340
3341 static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3342 {
3343         struct drbd_conf *mdev;
3344         struct p_sizes *p = pi->data;
3345         enum determine_dev_size dd = unchanged;
3346         sector_t p_size, p_usize, my_usize;
3347         int ldsc = 0; /* local disk size changed */
3348         enum dds_flags ddsf;
3349
3350         mdev = vnr_to_mdev(tconn, pi->vnr);
3351         if (!mdev)
3352                 return config_unknown_volume(tconn, pi);
3353
3354         p_size = be64_to_cpu(p->d_size);
3355         p_usize = be64_to_cpu(p->u_size);
3356
3357         /* just store the peer's disk size for now.
3358          * we still need to figure out whether we accept that. */
3359         mdev->p_size = p_size;
3360
3361         if (get_ldev(mdev)) {
3362                 warn_if_differ_considerably(mdev, "lower level device sizes",
3363                            p_size, drbd_get_max_capacity(mdev->ldev));
3364                 warn_if_differ_considerably(mdev, "user requested size",
3365                                             p_usize, mdev->ldev->dc.disk_size);
3366
3367                 /* if this is the first connect, or an otherwise expected
3368                  * param exchange, choose the minimum */
3369                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
3370                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
3371                                              p_usize);
3372
3373                 my_usize = mdev->ldev->dc.disk_size;
3374
3375                 if (mdev->ldev->dc.disk_size != p_usize) {
3376                         mdev->ldev->dc.disk_size = p_usize;
3377                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3378                              (unsigned long)mdev->ldev->dc.disk_size);
3379                 }
3380
3381                 /* Never shrink a device with usable data during connect.
3382                    But allow online shrinking if we are connected. */
3383                 if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
3384                    drbd_get_capacity(mdev->this_bdev) &&
3385                    mdev->state.disk >= D_OUTDATED &&
3386                    mdev->state.conn < C_CONNECTED) {
3387                         dev_err(DEV, "The peer's disk size is too small!\n");
3388                         conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3389                         mdev->ldev->dc.disk_size = my_usize;
3390                         put_ldev(mdev);
3391                         return -EIO;
3392                 }
3393                 put_ldev(mdev);
3394         }
3395
3396         ddsf = be16_to_cpu(p->dds_flags);
3397         if (get_ldev(mdev)) {
3398                 dd = drbd_determine_dev_size(mdev, ddsf);
3399                 put_ldev(mdev);
3400                 if (dd == dev_size_error)
3401                         return -EIO;
3402                 drbd_md_sync(mdev);
3403         } else {
3404                 /* I am diskless, need to accept the peer's size. */
3405                 drbd_set_my_capacity(mdev, p_size);
3406         }
3407
3408         mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3409         drbd_reconsider_max_bio_size(mdev);
3410
3411         if (get_ldev(mdev)) {
3412                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3413                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3414                         ldsc = 1;
3415                 }
3416
3417                 put_ldev(mdev);
3418         }
3419
3420         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3421                 if (be64_to_cpu(p->c_size) !=
3422                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
3423                         /* we have different sizes, probably peer
3424                          * needs to know my new size... */
3425                         drbd_send_sizes(mdev, 0, ddsf);
3426                 }
3427                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3428                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
3429                         if (mdev->state.pdsk >= D_INCONSISTENT &&
3430                             mdev->state.disk >= D_INCONSISTENT) {
3431                                 if (ddsf & DDSF_NO_RESYNC)
3432                                         dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3433                                 else
3434                                         resync_after_online_grow(mdev);
3435                         } else
3436                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3437                 }
3438         }
3439
3440         return 0;
3441 }
3442
3443 static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3444 {
3445         struct drbd_conf *mdev;
3446         struct p_uuids *p = pi->data;
3447         u64 *p_uuid;
3448         int i, updated_uuids = 0;
3449
3450         mdev = vnr_to_mdev(tconn, pi->vnr);
3451         if (!mdev)
3452                 return config_unknown_volume(tconn, pi);
3453
3454         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3455
3456         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3457                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3458
3459         kfree(mdev->p_uuid);
3460         mdev->p_uuid = p_uuid;
3461
3462         if (mdev->state.conn < C_CONNECTED &&
3463             mdev->state.disk < D_INCONSISTENT &&
3464             mdev->state.role == R_PRIMARY &&
3465             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3466                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3467                     (unsigned long long)mdev->ed_uuid);
3468                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3469                 return -EIO;
3470         }
3471
3472         if (get_ldev(mdev)) {
3473                 int skip_initial_sync =
3474                         mdev->state.conn == C_CONNECTED &&
3475                         mdev->tconn->agreed_pro_version >= 90 &&
3476                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3477                         (p_uuid[UI_FLAGS] & 8);
3478                 if (skip_initial_sync) {
3479                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3480                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3481                                         "clear_n_write from receive_uuids",
3482                                         BM_LOCKED_TEST_ALLOWED);
3483                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3484                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
3485                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3486                                         CS_VERBOSE, NULL);
3487                         drbd_md_sync(mdev);
3488                         updated_uuids = 1;
3489                 }
3490                 put_ldev(mdev);
3491         } else if (mdev->state.disk < D_INCONSISTENT &&
3492                    mdev->state.role == R_PRIMARY) {
3493                 /* I am a diskless primary, the peer just created a new current UUID
3494                    for me. */
3495                 updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3496         }
3497
3498         /* Before we test for the disk state, we should wait until an eventually
3499            ongoing cluster wide state change is finished. That is important if
3500            we are primary and are detaching from our disk. We need to see the
3501            new disk state... */
3502         mutex_lock(mdev->state_mutex);
3503         mutex_unlock(mdev->state_mutex);
3504         if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3505                 updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3506
3507         if (updated_uuids)
3508                 drbd_print_uuids(mdev, "receiver updated UUIDs to");
3509
3510         return 0;
3511 }
3512
3513 /**
3514  * convert_state() - Converts the peer's view of the cluster state to our point of view
3515  * @ps:         The state as seen by the peer.
3516  */
3517 static union drbd_state convert_state(union drbd_state ps)
3518 {
3519         union drbd_state ms;
3520
3521         static enum drbd_conns c_tab[] = {
3522                 [C_CONNECTED] = C_CONNECTED,
3523
3524                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3525                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3526                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3527                 [C_VERIFY_S]       = C_VERIFY_T,
3528                 [C_MASK]   = C_MASK,
3529         };
3530
3531         ms.i = ps.i;
3532
3533         ms.conn = c_tab[ps.conn];
3534         ms.peer = ps.role;
3535         ms.role = ps.peer;
3536         ms.pdsk = ps.disk;
3537         ms.disk = ps.pdsk;
3538         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3539
3540         return ms;
3541 }
3542
3543 static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3544 {
3545         struct drbd_conf *mdev;
3546         struct p_req_state *p = pi->data;
3547         union drbd_state mask, val;
3548         enum drbd_state_rv rv;
3549
3550         mdev = vnr_to_mdev(tconn, pi->vnr);
3551         if (!mdev)
3552                 return -EIO;
3553
3554         mask.i = be32_to_cpu(p->mask);
3555         val.i = be32_to_cpu(p->val);
3556
3557         if (test_bit(DISCARD_CONCURRENT, &mdev->tconn->flags) &&
3558             mutex_is_locked(mdev->state_mutex)) {
3559                 drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3560                 return 0;
3561         }
3562
3563         mask = convert_state(mask);
3564         val = convert_state(val);
3565
3566         rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3567         drbd_send_sr_reply(mdev, rv);
3568
3569         drbd_md_sync(mdev);
3570
3571         return 0;
3572 }
3573
3574 static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3575 {
3576         struct p_req_state *p = pi->data;
3577         union drbd_state mask, val;
3578         enum drbd_state_rv rv;
3579
3580         mask.i = be32_to_cpu(p->mask);
3581         val.i = be32_to_cpu(p->val);
3582
3583         if (test_bit(DISCARD_CONCURRENT, &tconn->flags) &&
3584             mutex_is_locked(&tconn->cstate_mutex)) {
3585                 conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3586                 return 0;
3587         }
3588
3589         mask = convert_state(mask);
3590         val = convert_state(val);
3591
3592         rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3593         conn_send_sr_reply(tconn, rv);
3594
3595         return 0;
3596 }
3597
3598 static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3599 {
3600         struct drbd_conf *mdev;
3601         struct p_state *p = pi->data;
3602         union drbd_state os, ns, peer_state;
3603         enum drbd_disk_state real_peer_disk;
3604         enum chg_state_flags cs_flags;
3605         int rv;
3606
3607         mdev = vnr_to_mdev(tconn, pi->vnr);
3608         if (!mdev)
3609                 return config_unknown_volume(tconn, pi);
3610
3611         peer_state.i = be32_to_cpu(p->state);
3612
3613         real_peer_disk = peer_state.disk;
3614         if (peer_state.disk == D_NEGOTIATING) {
3615                 real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3616                 dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3617         }
3618
3619         spin_lock_irq(&mdev->tconn->req_lock);
3620  retry:
3621         os = ns = drbd_read_state(mdev);
3622         spin_unlock_irq(&mdev->tconn->req_lock);
3623
3624         /* peer says his disk is uptodate, while we think it is inconsistent,
3625          * and this happens while we think we have a sync going on. */
3626         if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3627             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3628                 /* If we are (becoming) SyncSource, but peer is still in sync
3629                  * preparation, ignore its uptodate-ness to avoid flapping, it
3630                  * will change to inconsistent once the peer reaches active
3631                  * syncing states.
3632                  * It may have changed syncer-paused flags, however, so we
3633                  * cannot ignore this completely. */
3634                 if (peer_state.conn > C_CONNECTED &&
3635                     peer_state.conn < C_SYNC_SOURCE)
3636                         real_peer_disk = D_INCONSISTENT;
3637
3638                 /* if peer_state changes to connected at the same time,
3639                  * it explicitly notifies us that it finished resync.
3640                  * Maybe we should finish it up, too? */
3641                 else if (os.conn >= C_SYNC_SOURCE &&
3642                          peer_state.conn == C_CONNECTED) {
3643                         if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3644                                 drbd_resync_finished(mdev);
3645                         return 0;
3646                 }
3647         }
3648
3649         /* peer says his disk is inconsistent, while we think it is uptodate,
3650          * and this happens while the peer still thinks we have a sync going on,
3651          * but we think we are already done with the sync.
3652          * We ignore this to avoid flapping pdsk.
3653          * This should not happen, if the peer is a recent version of drbd. */
3654         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3655             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3656                 real_peer_disk = D_UP_TO_DATE;
3657
3658         if (ns.conn == C_WF_REPORT_PARAMS)
3659                 ns.conn = C_CONNECTED;
3660
3661         if (peer_state.conn == C_AHEAD)
3662                 ns.conn = C_BEHIND;
3663
3664         if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3665             get_ldev_if_state(mdev, D_NEGOTIATING)) {
3666                 int cr; /* consider resync */
3667
3668                 /* if we established a new connection */
3669                 cr  = (os.conn < C_CONNECTED);
3670                 /* if we had an established connection
3671                  * and one of the nodes newly attaches a disk */
3672                 cr |= (os.conn == C_CONNECTED &&
3673                        (peer_state.disk == D_NEGOTIATING ||
3674                         os.disk == D_NEGOTIATING));
3675                 /* if we have both been inconsistent, and the peer has been
3676                  * forced to be UpToDate with --overwrite-data */
3677                 cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3678                 /* if we had been plain connected, and the admin requested to
3679                  * start a sync by "invalidate" or "invalidate-remote" */
3680                 cr |= (os.conn == C_CONNECTED &&
3681                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3682                                  peer_state.conn <= C_WF_BITMAP_T));
3683
3684                 if (cr)
3685                         ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3686
3687                 put_ldev(mdev);
3688                 if (ns.conn == C_MASK) {
3689                         ns.conn = C_CONNECTED;
3690                         if (mdev->state.disk == D_NEGOTIATING) {
3691                                 drbd_force_state(mdev, NS(disk, D_FAILED));
3692                         } else if (peer_state.disk == D_NEGOTIATING) {
3693                                 dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3694                                 peer_state.disk = D_DISKLESS;
3695                                 real_peer_disk = D_DISKLESS;
3696                         } else {
3697                                 if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3698                                         return -EIO;
3699                                 D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3700                                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3701                                 return -EIO;
3702                         }
3703                 }
3704         }
3705
3706         spin_lock_irq(&mdev->tconn->req_lock);
3707         if (os.i != drbd_read_state(mdev).i)
3708                 goto retry;
3709         clear_bit(CONSIDER_RESYNC, &mdev->flags);
3710         ns.peer = peer_state.role;
3711         ns.pdsk = real_peer_disk;
3712         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3713         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3714                 ns.disk = mdev->new_state_tmp.disk;
3715         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3716         if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3717             test_bit(NEW_CUR_UUID, &mdev->flags)) {
3718                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3719                    for temporal network outages! */
3720                 spin_unlock_irq(&mdev->tconn->req_lock);
3721                 dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3722                 tl_clear(mdev->tconn);
3723                 drbd_uuid_new_current(mdev);
3724                 clear_bit(NEW_CUR_UUID, &mdev->flags);
3725                 conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3726                 return -EIO;
3727         }
3728         rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3729         ns = drbd_read_state(mdev);
3730         spin_unlock_irq(&mdev->tconn->req_lock);
3731
3732         if (rv < SS_SUCCESS) {
3733                 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3734                 return -EIO;
3735         }
3736
3737         if (os.conn > C_WF_REPORT_PARAMS) {
3738                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3739                     peer_state.disk != D_NEGOTIATING ) {
3740                         /* we want resync, peer has not yet decided to sync... */
3741                         /* Nowadays only used when forcing a node into primary role and
3742                            setting its disk to UpToDate with that */
3743                         drbd_send_uuids(mdev);
3744                         drbd_send_state(mdev);
3745                 }
3746         }
3747
3748         mutex_lock(&mdev->tconn->net_conf_update);
3749         mdev->tconn->net_conf->want_lose = 0; /* without copy; single bit op is atomic */
3750         mutex_unlock(&mdev->tconn->net_conf_update);
3751
3752         drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3753
3754         return 0;
3755 }
3756
3757 static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
3758 {
3759         struct drbd_conf *mdev;
3760         struct p_rs_uuid *p = pi->data;
3761
3762         mdev = vnr_to_mdev(tconn, pi->vnr);
3763         if (!mdev)
3764                 return -EIO;
3765
3766         wait_event(mdev->misc_wait,
3767                    mdev->state.conn == C_WF_SYNC_UUID ||
3768                    mdev->state.conn == C_BEHIND ||
3769                    mdev->state.conn < C_CONNECTED ||
3770                    mdev->state.disk < D_NEGOTIATING);
3771
3772         /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3773
3774         /* Here the _drbd_uuid_ functions are right, current should
3775            _not_ be rotated into the history */
3776         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3777                 _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3778                 _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3779
3780                 drbd_print_uuids(mdev, "updated sync uuid");
3781                 drbd_start_resync(mdev, C_SYNC_TARGET);
3782
3783                 put_ldev(mdev);
3784         } else
3785                 dev_err(DEV, "Ignoring SyncUUID packet!\n");
3786
3787         return 0;
3788 }
3789
3790 /**
3791  * receive_bitmap_plain
3792  *
3793  * Return 0 when done, 1 when another iteration is needed, and a negative error
3794  * code upon failure.
3795  */
3796 static int
3797 receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
3798                      unsigned long *p, struct bm_xfer_ctx *c)
3799 {
3800         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
3801                                  drbd_header_size(mdev->tconn);
3802         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
3803                                        c->bm_words - c->word_offset);
3804         unsigned int want = num_words * sizeof(*p);
3805         int err;
3806
3807         if (want != size) {
3808                 dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
3809                 return -EIO;
3810         }
3811         if (want == 0)
3812                 return 0;
3813         err = drbd_recv_all(mdev->tconn, p, want);
3814         if (err)
3815                 return err;
3816
3817         drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
3818
3819         c->word_offset += num_words;
3820         c->bit_offset = c->word_offset * BITS_PER_LONG;
3821         if (c->bit_offset > c->bm_bits)
3822                 c->bit_offset = c->bm_bits;
3823
3824         return 1;
3825 }
3826
3827 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
3828 {
3829         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
3830 }
3831
3832 static int dcbp_get_start(struct p_compressed_bm *p)
3833 {
3834         return (p->encoding & 0x80) != 0;
3835 }
3836
3837 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
3838 {
3839         return (p->encoding >> 4) & 0x7;
3840 }
3841
3842 /**
3843  * recv_bm_rle_bits
3844  *
3845  * Return 0 when done, 1 when another iteration is needed, and a negative error
3846  * code upon failure.
3847  */
3848 static int
3849 recv_bm_rle_bits(struct drbd_conf *mdev,
3850                 struct p_compressed_bm *p,
3851                  struct bm_xfer_ctx *c,
3852                  unsigned int len)
3853 {
3854         struct bitstream bs;
3855         u64 look_ahead;
3856         u64 rl;
3857         u64 tmp;
3858         unsigned long s = c->bit_offset;
3859         unsigned long e;
3860         int toggle = dcbp_get_start(p);
3861         int have;
3862         int bits;
3863
3864         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
3865
3866         bits = bitstream_get_bits(&bs, &look_ahead, 64);
3867         if (bits < 0)
3868                 return -EIO;
3869
3870         for (have = bits; have > 0; s += rl, toggle = !toggle) {
3871                 bits = vli_decode_bits(&rl, look_ahead);
3872                 if (bits <= 0)
3873                         return -EIO;
3874
3875                 if (toggle) {
3876                         e = s + rl -1;
3877                         if (e >= c->bm_bits) {
3878                                 dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3879                                 return -EIO;
3880                         }
3881                         _drbd_bm_set_bits(mdev, s, e);
3882                 }
3883
3884                 if (have < bits) {
3885                         dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3886                                 have, bits, look_ahead,
3887                                 (unsigned int)(bs.cur.b - p->code),
3888                                 (unsigned int)bs.buf_len);
3889                         return -EIO;
3890                 }
3891                 look_ahead >>= bits;
3892                 have -= bits;
3893
3894                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3895                 if (bits < 0)
3896                         return -EIO;
3897                 look_ahead |= tmp << have;
3898                 have += bits;
3899         }
3900
3901         c->bit_offset = s;
3902         bm_xfer_ctx_bit_to_word_offset(c);
3903
3904         return (s != c->bm_bits);
3905 }
3906
3907 /**
3908  * decode_bitmap_c
3909  *
3910  * Return 0 when done, 1 when another iteration is needed, and a negative error
3911  * code upon failure.
3912  */
3913 static int
3914 decode_bitmap_c(struct drbd_conf *mdev,
3915                 struct p_compressed_bm *p,
3916                 struct bm_xfer_ctx *c,
3917                 unsigned int len)
3918 {
3919         if (dcbp_get_code(p) == RLE_VLI_Bits)
3920                 return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
3921
3922         /* other variants had been implemented for evaluation,
3923          * but have been dropped as this one turned out to be "best"
3924          * during all our tests. */
3925
3926         dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3927         conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
3928         return -EIO;
3929 }
3930
3931 void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3932                 const char *direction, struct bm_xfer_ctx *c)
3933 {
3934         /* what would it take to transfer it "plaintext" */
3935         unsigned int header_size = drbd_header_size(mdev->tconn);
3936         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
3937         unsigned int plain =
3938                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
3939                 c->bm_words * sizeof(unsigned long);
3940         unsigned int total = c->bytes[0] + c->bytes[1];
3941         unsigned int r;
3942
3943         /* total can not be zero. but just in case: */
3944         if (total == 0)
3945                 return;
3946
3947         /* don't report if not compressed */
3948         if (total >= plain)
3949                 return;
3950
3951         /* total < plain. check for overflow, still */
3952         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3953                                     : (1000 * total / plain);
3954
3955         if (r > 1000)
3956                 r = 1000;
3957
3958         r = 1000 - r;
3959         dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3960              "total %u; compression: %u.%u%%\n",
3961                         direction,
3962                         c->bytes[1], c->packets[1],
3963                         c->bytes[0], c->packets[0],
3964                         total, r/10, r % 10);
3965 }
3966
3967 /* Since we are processing the bitfield from lower addresses to higher,
3968    it does not matter if the process it in 32 bit chunks or 64 bit
3969    chunks as long as it is little endian. (Understand it as byte stream,
3970    beginning with the lowest byte...) If we would use big endian
3971    we would need to process it from the highest address to the lowest,
3972    in order to be agnostic to the 32 vs 64 bits issue.
3973
3974    returns 0 on failure, 1 if we successfully received it. */
3975 static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
3976 {
3977         struct drbd_conf *mdev;
3978         struct bm_xfer_ctx c;
3979         int err;
3980
3981         mdev = vnr_to_mdev(tconn, pi->vnr);
3982         if (!mdev)
3983                 return -EIO;
3984
3985         drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
3986         /* you are supposed to send additional out-of-sync information
3987          * if you actually set bits during this phase */
3988
3989         c = (struct bm_xfer_ctx) {
3990                 .bm_bits = drbd_bm_bits(mdev),
3991                 .bm_words = drbd_bm_words(mdev),
3992         };
3993
3994         for(;;) {
3995                 if (pi->cmd == P_BITMAP)
3996                         err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
3997                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
3998                         /* MAYBE: sanity check that we speak proto >= 90,
3999                          * and the feature is enabled! */
4000                         struct p_compressed_bm *p = pi->data;
4001
4002                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4003                                 dev_err(DEV, "ReportCBitmap packet too large\n");
4004                                 err = -EIO;
4005                                 goto out;
4006                         }
4007                         if (pi->size <= sizeof(*p)) {
4008                                 dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4009                                 err = -EIO;
4010                                 goto out;
4011                         }
4012                         err = drbd_recv_all(mdev->tconn, p, pi->size);
4013                         if (err)
4014                                goto out;
4015                         err = decode_bitmap_c(mdev, p, &c, pi->size);
4016                 } else {
4017                         dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4018                         err = -EIO;
4019                         goto out;
4020                 }
4021
4022                 c.packets[pi->cmd == P_BITMAP]++;
4023                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4024
4025                 if (err <= 0) {
4026                         if (err < 0)
4027                                 goto out;
4028                         break;
4029                 }
4030                 err = drbd_recv_header(mdev->tconn, pi);
4031                 if (err)
4032                         goto out;
4033         }
4034
4035         INFO_bm_xfer_stats(mdev, "receive", &c);
4036
4037         if (mdev->state.conn == C_WF_BITMAP_T) {
4038                 enum drbd_state_rv rv;
4039
4040                 err = drbd_send_bitmap(mdev);
4041                 if (err)
4042                         goto out;
4043                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4044                 rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4045                 D_ASSERT(rv == SS_SUCCESS);
4046         } else if (mdev->state.conn != C_WF_BITMAP_S) {
4047                 /* admin may have requested C_DISCONNECTING,
4048                  * other threads may have noticed network errors */
4049                 dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4050                     drbd_conn_str(mdev->state.conn));
4051         }
4052         err = 0;
4053
4054  out:
4055         drbd_bm_unlock(mdev);
4056         if (!err && mdev->state.conn == C_WF_BITMAP_S)
4057                 drbd_start_resync(mdev, C_SYNC_SOURCE);
4058         return err;
4059 }
4060
4061 static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4062 {
4063         conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4064                  pi->cmd, pi->size);
4065
4066         return ignore_remaining_packet(tconn, pi);
4067 }
4068
4069 static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4070 {
4071         /* Make sure we've acked all the TCP data associated
4072          * with the data requests being unplugged */
4073         drbd_tcp_quickack(tconn->data.socket);
4074
4075         return 0;
4076 }
4077
4078 static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4079 {
4080         struct drbd_conf *mdev;
4081         struct p_block_desc *p = pi->data;
4082
4083         mdev = vnr_to_mdev(tconn, pi->vnr);
4084         if (!mdev)
4085                 return -EIO;
4086
4087         switch (mdev->state.conn) {
4088         case C_WF_SYNC_UUID:
4089         case C_WF_BITMAP_T:
4090         case C_BEHIND:
4091                         break;
4092         default:
4093                 dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4094                                 drbd_conn_str(mdev->state.conn));
4095         }
4096
4097         drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4098
4099         return 0;
4100 }
4101
4102 struct data_cmd {
4103         int expect_payload;
4104         size_t pkt_size;
4105         int (*fn)(struct drbd_tconn *, struct packet_info *);
4106 };
4107
4108 static struct data_cmd drbd_cmd_handler[] = {
4109         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4110         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4111         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4112         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4113         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4114         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4115         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4116         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4117         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4118         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4119         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4120         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4121         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4122         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4123         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4124         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4125         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4126         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4127         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4128         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4129         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4130         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4131         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4132 };
4133
4134 static void drbdd(struct drbd_tconn *tconn)
4135 {
4136         struct packet_info pi;
4137         size_t shs; /* sub header size */
4138         int err;
4139
4140         while (get_t_state(&tconn->receiver) == RUNNING) {
4141                 struct data_cmd *cmd;
4142
4143                 drbd_thread_current_set_cpu(&tconn->receiver);
4144                 if (drbd_recv_header(tconn, &pi))
4145                         goto err_out;
4146
4147                 cmd = &drbd_cmd_handler[pi.cmd];
4148                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4149                         conn_err(tconn, "unknown packet type %d, l: %d!\n", pi.cmd, pi.size);
4150                         goto err_out;
4151                 }
4152
4153                 shs = cmd->pkt_size;
4154                 if (pi.size > shs && !cmd->expect_payload) {
4155                         conn_err(tconn, "No payload expected %s l:%d\n", cmdname(pi.cmd), pi.size);
4156                         goto err_out;
4157                 }
4158
4159                 if (shs) {
4160                         err = drbd_recv_all_warn(tconn, pi.data, shs);
4161                         if (err)
4162                                 goto err_out;
4163                         pi.size -= shs;
4164                 }
4165
4166                 err = cmd->fn(tconn, &pi);
4167                 if (err) {
4168                         conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4169                                  cmdname(pi.cmd), err, pi.size);
4170                         goto err_out;
4171                 }
4172         }
4173         return;
4174
4175     err_out:
4176         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4177 }
4178
4179 void conn_flush_workqueue(struct drbd_tconn *tconn)
4180 {
4181         struct drbd_wq_barrier barr;
4182
4183         barr.w.cb = w_prev_work_done;
4184         barr.w.tconn = tconn;
4185         init_completion(&barr.done);
4186         drbd_queue_work(&tconn->data.work, &barr.w);
4187         wait_for_completion(&barr.done);
4188 }
4189
4190 static void drbd_disconnect(struct drbd_tconn *tconn)
4191 {
4192         enum drbd_conns oc;
4193         int rv = SS_UNKNOWN_ERROR;
4194
4195         if (tconn->cstate == C_STANDALONE)
4196                 return;
4197
4198         /* asender does not clean up anything. it must not interfere, either */
4199         drbd_thread_stop(&tconn->asender);
4200         drbd_free_sock(tconn);
4201
4202         down_read(&drbd_cfg_rwsem);
4203         idr_for_each(&tconn->volumes, drbd_disconnected, tconn);
4204         up_read(&drbd_cfg_rwsem);
4205         conn_info(tconn, "Connection closed\n");
4206
4207         if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4208                 conn_try_outdate_peer_async(tconn);
4209
4210         spin_lock_irq(&tconn->req_lock);
4211         oc = tconn->cstate;
4212         if (oc >= C_UNCONNECTED)
4213                 rv = _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4214
4215         spin_unlock_irq(&tconn->req_lock);
4216
4217         if (oc == C_DISCONNECTING) {
4218                 struct net_conf *old_conf;
4219
4220                 mutex_lock(&tconn->net_conf_update);
4221                 old_conf = tconn->net_conf;
4222                 rcu_assign_pointer(tconn->net_conf, NULL);
4223                 conn_free_crypto(tconn);
4224                 mutex_unlock(&tconn->net_conf_update);
4225
4226                 synchronize_rcu();
4227                 kfree(old_conf);
4228
4229                 conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4230         }
4231 }
4232
4233 static int drbd_disconnected(int vnr, void *p, void *data)
4234 {
4235         struct drbd_conf *mdev = (struct drbd_conf *)p;
4236         enum drbd_fencing_p fp;
4237         unsigned int i;
4238
4239         /* wait for current activity to cease. */
4240         spin_lock_irq(&mdev->tconn->req_lock);
4241         _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4242         _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4243         _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4244         spin_unlock_irq(&mdev->tconn->req_lock);
4245
4246         /* We do not have data structures that would allow us to
4247          * get the rs_pending_cnt down to 0 again.
4248          *  * On C_SYNC_TARGET we do not have any data structures describing
4249          *    the pending RSDataRequest's we have sent.
4250          *  * On C_SYNC_SOURCE there is no data structure that tracks
4251          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4252          *  And no, it is not the sum of the reference counts in the
4253          *  resync_LRU. The resync_LRU tracks the whole operation including
4254          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4255          *  on the fly. */
4256         drbd_rs_cancel_all(mdev);
4257         mdev->rs_total = 0;
4258         mdev->rs_failed = 0;
4259         atomic_set(&mdev->rs_pending_cnt, 0);
4260         wake_up(&mdev->misc_wait);
4261
4262         del_timer(&mdev->request_timer);
4263
4264         del_timer_sync(&mdev->resync_timer);
4265         resync_timer_fn((unsigned long)mdev);
4266
4267         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4268          * w_make_resync_request etc. which may still be on the worker queue
4269          * to be "canceled" */
4270         drbd_flush_workqueue(mdev);
4271
4272         drbd_finish_peer_reqs(mdev);
4273
4274         kfree(mdev->p_uuid);
4275         mdev->p_uuid = NULL;
4276
4277         if (!drbd_suspended(mdev))
4278                 tl_clear(mdev->tconn);
4279
4280         drbd_md_sync(mdev);
4281
4282         fp = FP_DONT_CARE;
4283         if (get_ldev(mdev)) {
4284                 fp = mdev->ldev->dc.fencing;
4285                 put_ldev(mdev);
4286         }
4287
4288         /* serialize with bitmap writeout triggered by the state change,
4289          * if any. */
4290         wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4291
4292         /* tcp_close and release of sendpage pages can be deferred.  I don't
4293          * want to use SO_LINGER, because apparently it can be deferred for
4294          * more than 20 seconds (longest time I checked).
4295          *
4296          * Actually we don't care for exactly when the network stack does its
4297          * put_page(), but release our reference on these pages right here.
4298          */
4299         i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4300         if (i)
4301                 dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4302         i = atomic_read(&mdev->pp_in_use_by_net);
4303         if (i)
4304                 dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4305         i = atomic_read(&mdev->pp_in_use);
4306         if (i)
4307                 dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4308
4309         D_ASSERT(list_empty(&mdev->read_ee));
4310         D_ASSERT(list_empty(&mdev->active_ee));
4311         D_ASSERT(list_empty(&mdev->sync_ee));
4312         D_ASSERT(list_empty(&mdev->done_ee));
4313
4314         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4315         atomic_set(&mdev->current_epoch->epoch_size, 0);
4316         D_ASSERT(list_empty(&mdev->current_epoch->list));
4317
4318         return 0;
4319 }
4320
4321 /*
4322  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4323  * we can agree on is stored in agreed_pro_version.
4324  *
4325  * feature flags and the reserved array should be enough room for future
4326  * enhancements of the handshake protocol, and possible plugins...
4327  *
4328  * for now, they are expected to be zero, but ignored.
4329  */
4330 static int drbd_send_features(struct drbd_tconn *tconn)
4331 {
4332         struct drbd_socket *sock;
4333         struct p_connection_features *p;
4334
4335         sock = &tconn->data;
4336         p = conn_prepare_command(tconn, sock);
4337         if (!p)
4338                 return -EIO;
4339         memset(p, 0, sizeof(*p));
4340         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4341         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4342         return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4343 }
4344
4345 /*
4346  * return values:
4347  *   1 yes, we have a valid connection
4348  *   0 oops, did not work out, please try again
4349  *  -1 peer talks different language,
4350  *     no point in trying again, please go standalone.
4351  */
4352 static int drbd_do_features(struct drbd_tconn *tconn)
4353 {
4354         /* ASSERT current == tconn->receiver ... */
4355         struct p_connection_features *p;
4356         const int expect = sizeof(struct p_connection_features);
4357         struct packet_info pi;
4358         int err;
4359
4360         err = drbd_send_features(tconn);
4361         if (err)
4362                 return 0;
4363
4364         err = drbd_recv_header(tconn, &pi);
4365         if (err)
4366                 return 0;
4367
4368         if (pi.cmd != P_CONNECTION_FEATURES) {
4369                 conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4370                      cmdname(pi.cmd), pi.cmd);
4371                 return -1;
4372         }
4373
4374         if (pi.size != expect) {
4375                 conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4376                      expect, pi.size);
4377                 return -1;
4378         }
4379
4380         p = pi.data;
4381         err = drbd_recv_all_warn(tconn, p, expect);
4382         if (err)
4383                 return 0;
4384
4385         p->protocol_min = be32_to_cpu(p->protocol_min);
4386         p->protocol_max = be32_to_cpu(p->protocol_max);
4387         if (p->protocol_max == 0)
4388                 p->protocol_max = p->protocol_min;
4389
4390         if (PRO_VERSION_MAX < p->protocol_min ||
4391             PRO_VERSION_MIN > p->protocol_max)
4392                 goto incompat;
4393
4394         tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4395
4396         conn_info(tconn, "Handshake successful: "
4397              "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4398
4399         return 1;
4400
4401  incompat:
4402         conn_err(tconn, "incompatible DRBD dialects: "
4403             "I support %d-%d, peer supports %d-%d\n",
4404             PRO_VERSION_MIN, PRO_VERSION_MAX,
4405             p->protocol_min, p->protocol_max);
4406         return -1;
4407 }
4408
4409 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4410 static int drbd_do_auth(struct drbd_tconn *tconn)
4411 {
4412         dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4413         dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4414         return -1;
4415 }
4416 #else
4417 #define CHALLENGE_LEN 64
4418
4419 /* Return value:
4420         1 - auth succeeded,
4421         0 - failed, try again (network error),
4422         -1 - auth failed, don't try again.
4423 */
4424
4425 static int drbd_do_auth(struct drbd_tconn *tconn)
4426 {
4427         struct drbd_socket *sock;
4428         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4429         struct scatterlist sg;
4430         char *response = NULL;
4431         char *right_response = NULL;
4432         char *peers_ch = NULL;
4433         unsigned int key_len;
4434         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4435         unsigned int resp_size;
4436         struct hash_desc desc;
4437         struct packet_info pi;
4438         struct net_conf *nc;
4439         int err, rv;
4440
4441         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4442
4443         rcu_read_lock();
4444         nc = rcu_dereference(tconn->net_conf);
4445         key_len = strlen(nc->shared_secret);
4446         memcpy(secret, nc->shared_secret, key_len);
4447         rcu_read_unlock();
4448
4449         desc.tfm = tconn->cram_hmac_tfm;
4450         desc.flags = 0;
4451
4452         rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4453         if (rv) {
4454                 conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4455                 rv = -1;
4456                 goto fail;
4457         }
4458
4459         get_random_bytes(my_challenge, CHALLENGE_LEN);
4460
4461         sock = &tconn->data;
4462         if (!conn_prepare_command(tconn, sock)) {
4463                 rv = 0;
4464                 goto fail;
4465         }
4466         rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4467                                 my_challenge, CHALLENGE_LEN);
4468         if (!rv)
4469                 goto fail;
4470
4471         err = drbd_recv_header(tconn, &pi);
4472         if (err) {
4473                 rv = 0;
4474                 goto fail;
4475         }
4476
4477         if (pi.cmd != P_AUTH_CHALLENGE) {
4478                 conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4479                     cmdname(pi.cmd), pi.cmd);
4480                 rv = 0;
4481                 goto fail;
4482         }
4483
4484         if (pi.size > CHALLENGE_LEN * 2) {
4485                 conn_err(tconn, "expected AuthChallenge payload too big.\n");
4486                 rv = -1;
4487                 goto fail;
4488         }
4489
4490         peers_ch = kmalloc(pi.size, GFP_NOIO);
4491         if (peers_ch == NULL) {
4492                 conn_err(tconn, "kmalloc of peers_ch failed\n");
4493                 rv = -1;
4494                 goto fail;
4495         }
4496
4497         err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4498         if (err) {
4499                 rv = 0;
4500                 goto fail;
4501         }
4502
4503         resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4504         response = kmalloc(resp_size, GFP_NOIO);
4505         if (response == NULL) {
4506                 conn_err(tconn, "kmalloc of response failed\n");
4507                 rv = -1;
4508                 goto fail;
4509         }
4510
4511         sg_init_table(&sg, 1);
4512         sg_set_buf(&sg, peers_ch, pi.size);
4513
4514         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4515         if (rv) {
4516                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4517                 rv = -1;
4518                 goto fail;
4519         }
4520
4521         if (!conn_prepare_command(tconn, sock)) {
4522                 rv = 0;
4523                 goto fail;
4524         }
4525         rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4526                                 response, resp_size);
4527         if (!rv)
4528                 goto fail;
4529
4530         err = drbd_recv_header(tconn, &pi);
4531         if (err) {
4532                 rv = 0;
4533                 goto fail;
4534         }
4535
4536         if (pi.cmd != P_AUTH_RESPONSE) {
4537                 conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4538                         cmdname(pi.cmd), pi.cmd);
4539                 rv = 0;
4540                 goto fail;
4541         }
4542
4543         if (pi.size != resp_size) {
4544                 conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4545                 rv = 0;
4546                 goto fail;
4547         }
4548
4549         err = drbd_recv_all_warn(tconn, response , resp_size);
4550         if (err) {
4551                 rv = 0;
4552                 goto fail;
4553         }
4554
4555         right_response = kmalloc(resp_size, GFP_NOIO);
4556         if (right_response == NULL) {
4557                 conn_err(tconn, "kmalloc of right_response failed\n");
4558                 rv = -1;
4559                 goto fail;
4560         }
4561
4562         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4563
4564         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4565         if (rv) {
4566                 conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4567                 rv = -1;
4568                 goto fail;
4569         }
4570
4571         rv = !memcmp(response, right_response, resp_size);
4572
4573         if (rv)
4574                 conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4575                      resp_size);
4576         else
4577                 rv = -1;
4578
4579  fail:
4580         kfree(peers_ch);
4581         kfree(response);
4582         kfree(right_response);
4583
4584         return rv;
4585 }
4586 #endif
4587
4588 int drbdd_init(struct drbd_thread *thi)
4589 {
4590         struct drbd_tconn *tconn = thi->tconn;
4591         int h;
4592
4593         conn_info(tconn, "receiver (re)started\n");
4594
4595         do {
4596                 h = drbd_connect(tconn);
4597                 if (h == 0) {
4598                         drbd_disconnect(tconn);
4599                         schedule_timeout_interruptible(HZ);
4600                 }
4601                 if (h == -1) {
4602                         conn_warn(tconn, "Discarding network configuration.\n");
4603                         conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4604                 }
4605         } while (h == 0);
4606
4607         if (h > 0)
4608                 drbdd(tconn);
4609
4610         drbd_disconnect(tconn);
4611
4612         conn_info(tconn, "receiver terminated\n");
4613         return 0;
4614 }
4615
4616 /* ********* acknowledge sender ******** */
4617
4618 static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4619 {
4620         struct p_req_state_reply *p = pi->data;
4621         int retcode = be32_to_cpu(p->retcode);
4622
4623         if (retcode >= SS_SUCCESS) {
4624                 set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4625         } else {
4626                 set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4627                 conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4628                          drbd_set_st_err_str(retcode), retcode);
4629         }
4630         wake_up(&tconn->ping_wait);
4631
4632         return 0;
4633 }
4634
4635 static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4636 {
4637         struct drbd_conf *mdev;
4638         struct p_req_state_reply *p = pi->data;
4639         int retcode = be32_to_cpu(p->retcode);
4640
4641         mdev = vnr_to_mdev(tconn, pi->vnr);
4642         if (!mdev)
4643                 return -EIO;
4644
4645         if (retcode >= SS_SUCCESS) {
4646                 set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4647         } else {
4648                 set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4649                 dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4650                         drbd_set_st_err_str(retcode), retcode);
4651         }
4652         wake_up(&mdev->state_wait);
4653
4654         return 0;
4655 }
4656
4657 static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4658 {
4659         return drbd_send_ping_ack(tconn);
4660
4661 }
4662
4663 static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4664 {
4665         /* restore idle timeout */
4666         tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4667         if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4668                 wake_up(&tconn->ping_wait);
4669
4670         return 0;
4671 }
4672
4673 static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4674 {
4675         struct drbd_conf *mdev;
4676         struct p_block_ack *p = pi->data;
4677         sector_t sector = be64_to_cpu(p->sector);
4678         int blksize = be32_to_cpu(p->blksize);
4679
4680         mdev = vnr_to_mdev(tconn, pi->vnr);
4681         if (!mdev)
4682                 return -EIO;
4683
4684         D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4685
4686         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4687
4688         if (get_ldev(mdev)) {
4689                 drbd_rs_complete_io(mdev, sector);
4690                 drbd_set_in_sync(mdev, sector, blksize);
4691                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4692                 mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4693                 put_ldev(mdev);
4694         }
4695         dec_rs_pending(mdev);
4696         atomic_add(blksize >> 9, &mdev->rs_sect_in);
4697
4698         return 0;
4699 }
4700
4701 static int
4702 validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4703                               struct rb_root *root, const char *func,
4704                               enum drbd_req_event what, bool missing_ok)
4705 {
4706         struct drbd_request *req;
4707         struct bio_and_error m;
4708
4709         spin_lock_irq(&mdev->tconn->req_lock);
4710         req = find_request(mdev, root, id, sector, missing_ok, func);
4711         if (unlikely(!req)) {
4712                 spin_unlock_irq(&mdev->tconn->req_lock);
4713                 return -EIO;
4714         }
4715         __req_mod(req, what, &m);
4716         spin_unlock_irq(&mdev->tconn->req_lock);
4717
4718         if (m.bio)
4719                 complete_master_bio(mdev, &m);
4720         return 0;
4721 }
4722
4723 static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4724 {
4725         struct drbd_conf *mdev;
4726         struct p_block_ack *p = pi->data;
4727         sector_t sector = be64_to_cpu(p->sector);
4728         int blksize = be32_to_cpu(p->blksize);
4729         enum drbd_req_event what;
4730
4731         mdev = vnr_to_mdev(tconn, pi->vnr);
4732         if (!mdev)
4733                 return -EIO;
4734
4735         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4736
4737         if (p->block_id == ID_SYNCER) {
4738                 drbd_set_in_sync(mdev, sector, blksize);
4739                 dec_rs_pending(mdev);
4740                 return 0;
4741         }
4742         switch (pi->cmd) {
4743         case P_RS_WRITE_ACK:
4744                 what = WRITE_ACKED_BY_PEER_AND_SIS;
4745                 break;
4746         case P_WRITE_ACK:
4747                 what = WRITE_ACKED_BY_PEER;
4748                 break;
4749         case P_RECV_ACK:
4750                 what = RECV_ACKED_BY_PEER;
4751                 break;
4752         case P_DISCARD_WRITE:
4753                 what = DISCARD_WRITE;
4754                 break;
4755         case P_RETRY_WRITE:
4756                 what = POSTPONE_WRITE;
4757                 break;
4758         default:
4759                 BUG();
4760         }
4761
4762         return validate_req_change_req_state(mdev, p->block_id, sector,
4763                                              &mdev->write_requests, __func__,
4764                                              what, false);
4765 }
4766
4767 static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
4768 {
4769         struct drbd_conf *mdev;
4770         struct p_block_ack *p = pi->data;
4771         sector_t sector = be64_to_cpu(p->sector);
4772         int size = be32_to_cpu(p->blksize);
4773         int err;
4774
4775         mdev = vnr_to_mdev(tconn, pi->vnr);
4776         if (!mdev)
4777                 return -EIO;
4778
4779         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4780
4781         if (p->block_id == ID_SYNCER) {
4782                 dec_rs_pending(mdev);
4783                 drbd_rs_failed_io(mdev, sector, size);
4784                 return 0;
4785         }
4786
4787         err = validate_req_change_req_state(mdev, p->block_id, sector,
4788                                             &mdev->write_requests, __func__,
4789                                             NEG_ACKED, true);
4790         if (err) {
4791                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
4792                    The master bio might already be completed, therefore the
4793                    request is no longer in the collision hash. */
4794                 /* In Protocol B we might already have got a P_RECV_ACK
4795                    but then get a P_NEG_ACK afterwards. */
4796                 drbd_set_out_of_sync(mdev, sector, size);
4797         }
4798         return 0;
4799 }
4800
4801 static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4802 {
4803         struct drbd_conf *mdev;
4804         struct p_block_ack *p = pi->data;
4805         sector_t sector = be64_to_cpu(p->sector);
4806
4807         mdev = vnr_to_mdev(tconn, pi->vnr);
4808         if (!mdev)
4809                 return -EIO;
4810
4811         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4812
4813         dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4814             (unsigned long long)sector, be32_to_cpu(p->blksize));
4815
4816         return validate_req_change_req_state(mdev, p->block_id, sector,
4817                                              &mdev->read_requests, __func__,
4818                                              NEG_ACKED, false);
4819 }
4820
4821 static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
4822 {
4823         struct drbd_conf *mdev;
4824         sector_t sector;
4825         int size;
4826         struct p_block_ack *p = pi->data;
4827
4828         mdev = vnr_to_mdev(tconn, pi->vnr);
4829         if (!mdev)
4830                 return -EIO;
4831
4832         sector = be64_to_cpu(p->sector);
4833         size = be32_to_cpu(p->blksize);
4834
4835         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4836
4837         dec_rs_pending(mdev);
4838
4839         if (get_ldev_if_state(mdev, D_FAILED)) {
4840                 drbd_rs_complete_io(mdev, sector);
4841                 switch (pi->cmd) {
4842                 case P_NEG_RS_DREPLY:
4843                         drbd_rs_failed_io(mdev, sector, size);
4844                 case P_RS_CANCEL:
4845                         break;
4846                 default:
4847                         BUG();
4848                 }
4849                 put_ldev(mdev);
4850         }
4851
4852         return 0;
4853 }
4854
4855 static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
4856 {
4857         struct drbd_conf *mdev;
4858         struct p_barrier_ack *p = pi->data;
4859
4860         mdev = vnr_to_mdev(tconn, pi->vnr);
4861         if (!mdev)
4862                 return -EIO;
4863
4864         tl_release(mdev->tconn, p->barrier, be32_to_cpu(p->set_size));
4865
4866         if (mdev->state.conn == C_AHEAD &&
4867             atomic_read(&mdev->ap_in_flight) == 0 &&
4868             !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags)) {
4869                 mdev->start_resync_timer.expires = jiffies + HZ;
4870                 add_timer(&mdev->start_resync_timer);
4871         }
4872
4873         return 0;
4874 }
4875
4876 static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
4877 {
4878         struct drbd_conf *mdev;
4879         struct p_block_ack *p = pi->data;
4880         struct drbd_work *w;
4881         sector_t sector;
4882         int size;
4883
4884         mdev = vnr_to_mdev(tconn, pi->vnr);
4885         if (!mdev)
4886                 return -EIO;
4887
4888         sector = be64_to_cpu(p->sector);
4889         size = be32_to_cpu(p->blksize);
4890
4891         update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4892
4893         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4894                 drbd_ov_out_of_sync_found(mdev, sector, size);
4895         else
4896                 ov_out_of_sync_print(mdev);
4897
4898         if (!get_ldev(mdev))
4899                 return 0;
4900
4901         drbd_rs_complete_io(mdev, sector);
4902         dec_rs_pending(mdev);
4903
4904         --mdev->ov_left;
4905
4906         /* let's advance progress step marks only for every other megabyte */
4907         if ((mdev->ov_left & 0x200) == 0x200)
4908                 drbd_advance_rs_marks(mdev, mdev->ov_left);
4909
4910         if (mdev->ov_left == 0) {
4911                 w = kmalloc(sizeof(*w), GFP_NOIO);
4912                 if (w) {
4913                         w->cb = w_ov_finished;
4914                         w->mdev = mdev;
4915                         drbd_queue_work_front(&mdev->tconn->data.work, w);
4916                 } else {
4917                         dev_err(DEV, "kmalloc(w) failed.");
4918                         ov_out_of_sync_print(mdev);
4919                         drbd_resync_finished(mdev);
4920                 }
4921         }
4922         put_ldev(mdev);
4923         return 0;
4924 }
4925
4926 static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4927 {
4928         return 0;
4929 }
4930
4931 static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
4932 {
4933         struct drbd_conf *mdev;
4934         int i, not_empty = 0;
4935
4936         do {
4937                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
4938                 flush_signals(current);
4939                 down_read(&drbd_cfg_rwsem);
4940                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4941                         if (drbd_finish_peer_reqs(mdev)) {
4942                                 up_read(&drbd_cfg_rwsem);
4943                                 return 1; /* error */
4944                         }
4945                 }
4946                 up_read(&drbd_cfg_rwsem);
4947                 set_bit(SIGNAL_ASENDER, &tconn->flags);
4948
4949                 spin_lock_irq(&tconn->req_lock);
4950                 rcu_read_lock();
4951                 idr_for_each_entry(&tconn->volumes, mdev, i) {
4952                         not_empty = !list_empty(&mdev->done_ee);
4953                         if (not_empty)
4954                                 break;
4955                 }
4956                 rcu_read_unlock();
4957                 spin_unlock_irq(&tconn->req_lock);
4958         } while (not_empty);
4959
4960         return 0;
4961 }
4962
4963 struct asender_cmd {
4964         size_t pkt_size;
4965         int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
4966 };
4967
4968 static struct asender_cmd asender_tbl[] = {
4969         [P_PING]            = { 0, got_Ping },
4970         [P_PING_ACK]        = { 0, got_PingAck },
4971         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4972         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4973         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4974         [P_DISCARD_WRITE]   = { sizeof(struct p_block_ack), got_BlockAck },
4975         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4976         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4977         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
4978         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4979         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4980         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4981         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4982         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4983         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
4984         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
4985         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
4986 };
4987
4988 int drbd_asender(struct drbd_thread *thi)
4989 {
4990         struct drbd_tconn *tconn = thi->tconn;
4991         struct asender_cmd *cmd = NULL;
4992         struct packet_info pi;
4993         int rv;
4994         void *buf    = tconn->meta.rbuf;
4995         int received = 0;
4996         unsigned int header_size = drbd_header_size(tconn);
4997         int expect   = header_size;
4998         bool ping_timeout_active = false;
4999         struct net_conf *nc;
5000         int ping_timeo, no_cork, ping_int;
5001
5002         current->policy = SCHED_RR;  /* Make this a realtime task! */
5003         current->rt_priority = 2;    /* more important than all other tasks */
5004
5005         while (get_t_state(thi) == RUNNING) {
5006                 drbd_thread_current_set_cpu(thi);
5007
5008                 rcu_read_lock();
5009                 nc = rcu_dereference(tconn->net_conf);
5010                 ping_timeo = nc->ping_timeo;
5011                 no_cork = nc->no_cork;
5012                 ping_int = nc->ping_int;
5013                 rcu_read_unlock();
5014
5015                 if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5016                         if (drbd_send_ping(tconn)) {
5017                                 conn_err(tconn, "drbd_send_ping has failed\n");
5018                                 goto reconnect;
5019                         }
5020                         tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5021                         ping_timeout_active = true;
5022                 }
5023
5024                 /* TODO: conditionally cork; it may hurt latency if we cork without
5025                    much to send */
5026                 if (!no_cork)
5027                         drbd_tcp_cork(tconn->meta.socket);
5028                 if (tconn_finish_peer_reqs(tconn)) {
5029                         conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5030                         goto reconnect;
5031                 }
5032                 /* but unconditionally uncork unless disabled */
5033                 if (!no_cork)
5034                         drbd_tcp_uncork(tconn->meta.socket);
5035
5036                 /* short circuit, recv_msg would return EINTR anyways. */
5037                 if (signal_pending(current))
5038                         continue;
5039
5040                 rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5041                 clear_bit(SIGNAL_ASENDER, &tconn->flags);
5042
5043                 flush_signals(current);
5044
5045                 /* Note:
5046                  * -EINTR        (on meta) we got a signal
5047                  * -EAGAIN       (on meta) rcvtimeo expired
5048                  * -ECONNRESET   other side closed the connection
5049                  * -ERESTARTSYS  (on data) we got a signal
5050                  * rv <  0       other than above: unexpected error!
5051                  * rv == expected: full header or command
5052                  * rv <  expected: "woken" by signal during receive
5053                  * rv == 0       : "connection shut down by peer"
5054                  */
5055                 if (likely(rv > 0)) {
5056                         received += rv;
5057                         buf      += rv;
5058                 } else if (rv == 0) {
5059                         conn_err(tconn, "meta connection shut down by peer.\n");
5060                         goto reconnect;
5061                 } else if (rv == -EAGAIN) {
5062                         /* If the data socket received something meanwhile,
5063                          * that is good enough: peer is still alive. */
5064                         if (time_after(tconn->last_received,
5065                                 jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5066                                 continue;
5067                         if (ping_timeout_active) {
5068                                 conn_err(tconn, "PingAck did not arrive in time.\n");
5069                                 goto reconnect;
5070                         }
5071                         set_bit(SEND_PING, &tconn->flags);
5072                         continue;
5073                 } else if (rv == -EINTR) {
5074                         continue;
5075                 } else {
5076                         conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5077                         goto reconnect;
5078                 }
5079
5080                 if (received == expect && cmd == NULL) {
5081                         if (decode_header(tconn, tconn->meta.rbuf, &pi))
5082                                 goto reconnect;
5083                         cmd = &asender_tbl[pi.cmd];
5084                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5085                                 conn_err(tconn, "unknown command %d on meta (l: %d)\n",
5086                                         pi.cmd, pi.size);
5087                                 goto disconnect;
5088                         }
5089                         expect = header_size + cmd->pkt_size;
5090                         if (pi.size != expect - header_size) {
5091                                 conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5092                                         pi.cmd, pi.size);
5093                                 goto reconnect;
5094                         }
5095                 }
5096                 if (received == expect) {
5097                         bool err;
5098
5099                         err = cmd->fn(tconn, &pi);
5100                         if (err) {
5101                                 conn_err(tconn, "%pf failed\n", cmd->fn);
5102                                 goto reconnect;
5103                         }
5104
5105                         tconn->last_received = jiffies;
5106
5107                         if (cmd == &asender_tbl[P_PING_ACK]) {
5108                                 /* restore idle timeout */
5109                                 tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5110                                 ping_timeout_active = false;
5111                         }
5112
5113                         buf      = tconn->meta.rbuf;
5114                         received = 0;
5115                         expect   = header_size;
5116                         cmd      = NULL;
5117                 }
5118         }
5119
5120         if (0) {
5121 reconnect:
5122                 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5123         }
5124         if (0) {
5125 disconnect:
5126                 conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5127         }
5128         clear_bit(SIGNAL_ASENDER, &tconn->flags);
5129
5130         conn_info(tconn, "asender terminated\n");
5131
5132         return 0;
5133 }