HID: picolcd: sanity check report size in raw_event() callback
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:     DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252                               bool retry)
253 {
254         struct drbd_device *device = peer_device->device;
255         struct page *page = NULL;
256         struct net_conf *nc;
257         DEFINE_WAIT(wait);
258         unsigned int mxb;
259
260         rcu_read_lock();
261         nc = rcu_dereference(peer_device->connection->net_conf);
262         mxb = nc ? nc->max_buffers : 1000000;
263         rcu_read_unlock();
264
265         if (atomic_read(&device->pp_in_use) < mxb)
266                 page = __drbd_alloc_pages(device, number);
267
268         while (page == NULL) {
269                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271                 drbd_kick_lo_and_reclaim_net(device);
272
273                 if (atomic_read(&device->pp_in_use) < mxb) {
274                         page = __drbd_alloc_pages(device, number);
275                         if (page)
276                                 break;
277                 }
278
279                 if (!retry)
280                         break;
281
282                 if (signal_pending(current)) {
283                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284                         break;
285                 }
286
287                 if (schedule_timeout(HZ/10) == 0)
288                         mxb = UINT_MAX;
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &device->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304         int i;
305
306         if (page == NULL)
307                 return;
308
309         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310                 i = page_chain_free(page);
311         else {
312                 struct page *tmp;
313                 tmp = page_chain_tail(page, &i);
314                 spin_lock(&drbd_pp_lock);
315                 page_chain_add(&drbd_pp_pool, page, tmp);
316                 drbd_pp_vacant += i;
317                 spin_unlock(&drbd_pp_lock);
318         }
319         i = atomic_sub_return(i, a);
320         if (i < 0)
321                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323         wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344         struct drbd_device *device = peer_device->device;
345         struct drbd_peer_request *peer_req;
346         struct page *page = NULL;
347         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350                 return NULL;
351
352         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353         if (!peer_req) {
354                 if (!(gfp_mask & __GFP_NOWARN))
355                         drbd_err(device, "%s: allocation failed\n", __func__);
356                 return NULL;
357         }
358
359         if (has_payload && data_size) {
360                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361                 if (!page)
362                         goto fail;
363         }
364
365         drbd_clear_interval(&peer_req->i);
366         peer_req->i.size = data_size;
367         peer_req->i.sector = sector;
368         peer_req->i.local = false;
369         peer_req->i.waiting = false;
370
371         peer_req->epoch = NULL;
372         peer_req->peer_device = peer_device;
373         peer_req->pages = page;
374         atomic_set(&peer_req->pending_bios, 0);
375         peer_req->flags = 0;
376         /*
377          * The block_id is opaque to the receiver.  It is not endianness
378          * converted, and sent back to the sender unchanged.
379          */
380         peer_req->block_id = id;
381
382         return peer_req;
383
384  fail:
385         mempool_free(peer_req, drbd_ee_mempool);
386         return NULL;
387 }
388
389 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
390                        int is_net)
391 {
392         if (peer_req->flags & EE_HAS_DIGEST)
393                 kfree(peer_req->digest);
394         drbd_free_pages(device, peer_req->pages, is_net);
395         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
396         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
397         mempool_free(peer_req, drbd_ee_mempool);
398 }
399
400 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
401 {
402         LIST_HEAD(work_list);
403         struct drbd_peer_request *peer_req, *t;
404         int count = 0;
405         int is_net = list == &device->net_ee;
406
407         spin_lock_irq(&device->resource->req_lock);
408         list_splice_init(list, &work_list);
409         spin_unlock_irq(&device->resource->req_lock);
410
411         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
412                 __drbd_free_peer_req(device, peer_req, is_net);
413                 count++;
414         }
415         return count;
416 }
417
418 /*
419  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
420  */
421 static int drbd_finish_peer_reqs(struct drbd_device *device)
422 {
423         LIST_HEAD(work_list);
424         LIST_HEAD(reclaimed);
425         struct drbd_peer_request *peer_req, *t;
426         int err = 0;
427
428         spin_lock_irq(&device->resource->req_lock);
429         reclaim_finished_net_peer_reqs(device, &reclaimed);
430         list_splice_init(&device->done_ee, &work_list);
431         spin_unlock_irq(&device->resource->req_lock);
432
433         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
434                 drbd_free_net_peer_req(device, peer_req);
435
436         /* possible callbacks here:
437          * e_end_block, and e_end_resync_block, e_send_superseded.
438          * all ignore the last argument.
439          */
440         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
441                 int err2;
442
443                 /* list_del not necessary, next/prev members not touched */
444                 err2 = peer_req->w.cb(&peer_req->w, !!err);
445                 if (!err)
446                         err = err2;
447                 drbd_free_peer_req(device, peer_req);
448         }
449         wake_up(&device->ee_wait);
450
451         return err;
452 }
453
454 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
455                                      struct list_head *head)
456 {
457         DEFINE_WAIT(wait);
458
459         /* avoids spin_lock/unlock
460          * and calling prepare_to_wait in the fast path */
461         while (!list_empty(head)) {
462                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
463                 spin_unlock_irq(&device->resource->req_lock);
464                 io_schedule();
465                 finish_wait(&device->ee_wait, &wait);
466                 spin_lock_irq(&device->resource->req_lock);
467         }
468 }
469
470 static void drbd_wait_ee_list_empty(struct drbd_device *device,
471                                     struct list_head *head)
472 {
473         spin_lock_irq(&device->resource->req_lock);
474         _drbd_wait_ee_list_empty(device, head);
475         spin_unlock_irq(&device->resource->req_lock);
476 }
477
478 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
479 {
480         struct kvec iov = {
481                 .iov_base = buf,
482                 .iov_len = size,
483         };
484         struct msghdr msg = {
485                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486         };
487         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
488 }
489
490 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
491 {
492         int rv;
493
494         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
495
496         if (rv < 0) {
497                 if (rv == -ECONNRESET)
498                         drbd_info(connection, "sock was reset by peer\n");
499                 else if (rv != -ERESTARTSYS)
500                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
501         } else if (rv == 0) {
502                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
503                         long t;
504                         rcu_read_lock();
505                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
506                         rcu_read_unlock();
507
508                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
509
510                         if (t)
511                                 goto out;
512                 }
513                 drbd_info(connection, "sock was shut down by peer\n");
514         }
515
516         if (rv != size)
517                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
518
519 out:
520         return rv;
521 }
522
523 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
524 {
525         int err;
526
527         err = drbd_recv(connection, buf, size);
528         if (err != size) {
529                 if (err >= 0)
530                         err = -EIO;
531         } else
532                 err = 0;
533         return err;
534 }
535
536 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
537 {
538         int err;
539
540         err = drbd_recv_all(connection, buf, size);
541         if (err && !signal_pending(current))
542                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
543         return err;
544 }
545
546 /* quoting tcp(7):
547  *   On individual connections, the socket buffer size must be set prior to the
548  *   listen(2) or connect(2) calls in order to have it take effect.
549  * This is our wrapper to do so.
550  */
551 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
552                 unsigned int rcv)
553 {
554         /* open coded SO_SNDBUF, SO_RCVBUF */
555         if (snd) {
556                 sock->sk->sk_sndbuf = snd;
557                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
558         }
559         if (rcv) {
560                 sock->sk->sk_rcvbuf = rcv;
561                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
562         }
563 }
564
565 static struct socket *drbd_try_connect(struct drbd_connection *connection)
566 {
567         const char *what;
568         struct socket *sock;
569         struct sockaddr_in6 src_in6;
570         struct sockaddr_in6 peer_in6;
571         struct net_conf *nc;
572         int err, peer_addr_len, my_addr_len;
573         int sndbuf_size, rcvbuf_size, connect_int;
574         int disconnect_on_error = 1;
575
576         rcu_read_lock();
577         nc = rcu_dereference(connection->net_conf);
578         if (!nc) {
579                 rcu_read_unlock();
580                 return NULL;
581         }
582         sndbuf_size = nc->sndbuf_size;
583         rcvbuf_size = nc->rcvbuf_size;
584         connect_int = nc->connect_int;
585         rcu_read_unlock();
586
587         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
588         memcpy(&src_in6, &connection->my_addr, my_addr_len);
589
590         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
591                 src_in6.sin6_port = 0;
592         else
593                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
594
595         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
596         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
597
598         what = "sock_create_kern";
599         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
600                                SOCK_STREAM, IPPROTO_TCP, &sock);
601         if (err < 0) {
602                 sock = NULL;
603                 goto out;
604         }
605
606         sock->sk->sk_rcvtimeo =
607         sock->sk->sk_sndtimeo = connect_int * HZ;
608         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
609
610        /* explicitly bind to the configured IP as source IP
611         *  for the outgoing connections.
612         *  This is needed for multihomed hosts and to be
613         *  able to use lo: interfaces for drbd.
614         * Make sure to use 0 as port number, so linux selects
615         *  a free one dynamically.
616         */
617         what = "bind before connect";
618         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
619         if (err < 0)
620                 goto out;
621
622         /* connect may fail, peer not yet available.
623          * stay C_WF_CONNECTION, don't go Disconnecting! */
624         disconnect_on_error = 0;
625         what = "connect";
626         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
627
628 out:
629         if (err < 0) {
630                 if (sock) {
631                         sock_release(sock);
632                         sock = NULL;
633                 }
634                 switch (-err) {
635                         /* timeout, busy, signal pending */
636                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
637                 case EINTR: case ERESTARTSYS:
638                         /* peer not (yet) available, network problem */
639                 case ECONNREFUSED: case ENETUNREACH:
640                 case EHOSTDOWN:    case EHOSTUNREACH:
641                         disconnect_on_error = 0;
642                         break;
643                 default:
644                         drbd_err(connection, "%s failed, err = %d\n", what, err);
645                 }
646                 if (disconnect_on_error)
647                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
648         }
649
650         return sock;
651 }
652
653 struct accept_wait_data {
654         struct drbd_connection *connection;
655         struct socket *s_listen;
656         struct completion door_bell;
657         void (*original_sk_state_change)(struct sock *sk);
658
659 };
660
661 static void drbd_incoming_connection(struct sock *sk)
662 {
663         struct accept_wait_data *ad = sk->sk_user_data;
664         void (*state_change)(struct sock *sk);
665
666         state_change = ad->original_sk_state_change;
667         if (sk->sk_state == TCP_ESTABLISHED)
668                 complete(&ad->door_bell);
669         state_change(sk);
670 }
671
672 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
673 {
674         int err, sndbuf_size, rcvbuf_size, my_addr_len;
675         struct sockaddr_in6 my_addr;
676         struct socket *s_listen;
677         struct net_conf *nc;
678         const char *what;
679
680         rcu_read_lock();
681         nc = rcu_dereference(connection->net_conf);
682         if (!nc) {
683                 rcu_read_unlock();
684                 return -EIO;
685         }
686         sndbuf_size = nc->sndbuf_size;
687         rcvbuf_size = nc->rcvbuf_size;
688         rcu_read_unlock();
689
690         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
691         memcpy(&my_addr, &connection->my_addr, my_addr_len);
692
693         what = "sock_create_kern";
694         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
695                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
696         if (err) {
697                 s_listen = NULL;
698                 goto out;
699         }
700
701         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
702         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
703
704         what = "bind before listen";
705         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
706         if (err < 0)
707                 goto out;
708
709         ad->s_listen = s_listen;
710         write_lock_bh(&s_listen->sk->sk_callback_lock);
711         ad->original_sk_state_change = s_listen->sk->sk_state_change;
712         s_listen->sk->sk_state_change = drbd_incoming_connection;
713         s_listen->sk->sk_user_data = ad;
714         write_unlock_bh(&s_listen->sk->sk_callback_lock);
715
716         what = "listen";
717         err = s_listen->ops->listen(s_listen, 5);
718         if (err < 0)
719                 goto out;
720
721         return 0;
722 out:
723         if (s_listen)
724                 sock_release(s_listen);
725         if (err < 0) {
726                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
727                         drbd_err(connection, "%s failed, err = %d\n", what, err);
728                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
729                 }
730         }
731
732         return -EIO;
733 }
734
735 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
736 {
737         write_lock_bh(&sk->sk_callback_lock);
738         sk->sk_state_change = ad->original_sk_state_change;
739         sk->sk_user_data = NULL;
740         write_unlock_bh(&sk->sk_callback_lock);
741 }
742
743 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
744 {
745         int timeo, connect_int, err = 0;
746         struct socket *s_estab = NULL;
747         struct net_conf *nc;
748
749         rcu_read_lock();
750         nc = rcu_dereference(connection->net_conf);
751         if (!nc) {
752                 rcu_read_unlock();
753                 return NULL;
754         }
755         connect_int = nc->connect_int;
756         rcu_read_unlock();
757
758         timeo = connect_int * HZ;
759         /* 28.5% random jitter */
760         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
761
762         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
763         if (err <= 0)
764                 return NULL;
765
766         err = kernel_accept(ad->s_listen, &s_estab, 0);
767         if (err < 0) {
768                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
769                         drbd_err(connection, "accept failed, err = %d\n", err);
770                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
771                 }
772         }
773
774         if (s_estab)
775                 unregister_state_change(s_estab->sk, ad);
776
777         return s_estab;
778 }
779
780 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
781
782 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
783                              enum drbd_packet cmd)
784 {
785         if (!conn_prepare_command(connection, sock))
786                 return -EIO;
787         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
788 }
789
790 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
791 {
792         unsigned int header_size = drbd_header_size(connection);
793         struct packet_info pi;
794         int err;
795
796         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
797         if (err != header_size) {
798                 if (err >= 0)
799                         err = -EIO;
800                 return err;
801         }
802         err = decode_header(connection, connection->data.rbuf, &pi);
803         if (err)
804                 return err;
805         return pi.cmd;
806 }
807
808 /**
809  * drbd_socket_okay() - Free the socket if its connection is not okay
810  * @sock:       pointer to the pointer to the socket.
811  */
812 static int drbd_socket_okay(struct socket **sock)
813 {
814         int rr;
815         char tb[4];
816
817         if (!*sock)
818                 return false;
819
820         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
821
822         if (rr > 0 || rr == -EAGAIN) {
823                 return true;
824         } else {
825                 sock_release(*sock);
826                 *sock = NULL;
827                 return false;
828         }
829 }
830 /* Gets called if a connection is established, or if a new minor gets created
831    in a connection */
832 int drbd_connected(struct drbd_peer_device *peer_device)
833 {
834         struct drbd_device *device = peer_device->device;
835         int err;
836
837         atomic_set(&device->packet_seq, 0);
838         device->peer_seq = 0;
839
840         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
841                 &peer_device->connection->cstate_mutex :
842                 &device->own_state_mutex;
843
844         err = drbd_send_sync_param(peer_device);
845         if (!err)
846                 err = drbd_send_sizes(peer_device, 0, 0);
847         if (!err)
848                 err = drbd_send_uuids(peer_device);
849         if (!err)
850                 err = drbd_send_current_state(peer_device);
851         clear_bit(USE_DEGR_WFC_T, &device->flags);
852         clear_bit(RESIZE_PENDING, &device->flags);
853         atomic_set(&device->ap_in_flight, 0);
854         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
855         return err;
856 }
857
858 /*
859  * return values:
860  *   1 yes, we have a valid connection
861  *   0 oops, did not work out, please try again
862  *  -1 peer talks different language,
863  *     no point in trying again, please go standalone.
864  *  -2 We do not have a network config...
865  */
866 static int conn_connect(struct drbd_connection *connection)
867 {
868         struct drbd_socket sock, msock;
869         struct drbd_peer_device *peer_device;
870         struct net_conf *nc;
871         int vnr, timeout, h, ok;
872         bool discard_my_data;
873         enum drbd_state_rv rv;
874         struct accept_wait_data ad = {
875                 .connection = connection,
876                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
877         };
878
879         clear_bit(DISCONNECT_SENT, &connection->flags);
880         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
881                 return -2;
882
883         mutex_init(&sock.mutex);
884         sock.sbuf = connection->data.sbuf;
885         sock.rbuf = connection->data.rbuf;
886         sock.socket = NULL;
887         mutex_init(&msock.mutex);
888         msock.sbuf = connection->meta.sbuf;
889         msock.rbuf = connection->meta.rbuf;
890         msock.socket = NULL;
891
892         /* Assume that the peer only understands protocol 80 until we know better.  */
893         connection->agreed_pro_version = 80;
894
895         if (prepare_listen_socket(connection, &ad))
896                 return 0;
897
898         do {
899                 struct socket *s;
900
901                 s = drbd_try_connect(connection);
902                 if (s) {
903                         if (!sock.socket) {
904                                 sock.socket = s;
905                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
906                         } else if (!msock.socket) {
907                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
908                                 msock.socket = s;
909                                 send_first_packet(connection, &msock, P_INITIAL_META);
910                         } else {
911                                 drbd_err(connection, "Logic error in conn_connect()\n");
912                                 goto out_release_sockets;
913                         }
914                 }
915
916                 if (sock.socket && msock.socket) {
917                         rcu_read_lock();
918                         nc = rcu_dereference(connection->net_conf);
919                         timeout = nc->ping_timeo * HZ / 10;
920                         rcu_read_unlock();
921                         schedule_timeout_interruptible(timeout);
922                         ok = drbd_socket_okay(&sock.socket);
923                         ok = drbd_socket_okay(&msock.socket) && ok;
924                         if (ok)
925                                 break;
926                 }
927
928 retry:
929                 s = drbd_wait_for_connect(connection, &ad);
930                 if (s) {
931                         int fp = receive_first_packet(connection, s);
932                         drbd_socket_okay(&sock.socket);
933                         drbd_socket_okay(&msock.socket);
934                         switch (fp) {
935                         case P_INITIAL_DATA:
936                                 if (sock.socket) {
937                                         drbd_warn(connection, "initial packet S crossed\n");
938                                         sock_release(sock.socket);
939                                         sock.socket = s;
940                                         goto randomize;
941                                 }
942                                 sock.socket = s;
943                                 break;
944                         case P_INITIAL_META:
945                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
946                                 if (msock.socket) {
947                                         drbd_warn(connection, "initial packet M crossed\n");
948                                         sock_release(msock.socket);
949                                         msock.socket = s;
950                                         goto randomize;
951                                 }
952                                 msock.socket = s;
953                                 break;
954                         default:
955                                 drbd_warn(connection, "Error receiving initial packet\n");
956                                 sock_release(s);
957 randomize:
958                                 if (prandom_u32() & 1)
959                                         goto retry;
960                         }
961                 }
962
963                 if (connection->cstate <= C_DISCONNECTING)
964                         goto out_release_sockets;
965                 if (signal_pending(current)) {
966                         flush_signals(current);
967                         smp_rmb();
968                         if (get_t_state(&connection->receiver) == EXITING)
969                                 goto out_release_sockets;
970                 }
971
972                 ok = drbd_socket_okay(&sock.socket);
973                 ok = drbd_socket_okay(&msock.socket) && ok;
974         } while (!ok);
975
976         if (ad.s_listen)
977                 sock_release(ad.s_listen);
978
979         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
980         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
981
982         sock.socket->sk->sk_allocation = GFP_NOIO;
983         msock.socket->sk->sk_allocation = GFP_NOIO;
984
985         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
986         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
987
988         /* NOT YET ...
989          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
990          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
991          * first set it to the P_CONNECTION_FEATURES timeout,
992          * which we set to 4x the configured ping_timeout. */
993         rcu_read_lock();
994         nc = rcu_dereference(connection->net_conf);
995
996         sock.socket->sk->sk_sndtimeo =
997         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
998
999         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000         timeout = nc->timeout * HZ / 10;
1001         discard_my_data = nc->discard_my_data;
1002         rcu_read_unlock();
1003
1004         msock.socket->sk->sk_sndtimeo = timeout;
1005
1006         /* we don't want delays.
1007          * we use TCP_CORK where appropriate, though */
1008         drbd_tcp_nodelay(sock.socket);
1009         drbd_tcp_nodelay(msock.socket);
1010
1011         connection->data.socket = sock.socket;
1012         connection->meta.socket = msock.socket;
1013         connection->last_received = jiffies;
1014
1015         h = drbd_do_features(connection);
1016         if (h <= 0)
1017                 return h;
1018
1019         if (connection->cram_hmac_tfm) {
1020                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1021                 switch (drbd_do_auth(connection)) {
1022                 case -1:
1023                         drbd_err(connection, "Authentication of peer failed\n");
1024                         return -1;
1025                 case 0:
1026                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1027                         return 0;
1028                 }
1029         }
1030
1031         connection->data.socket->sk->sk_sndtimeo = timeout;
1032         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1033
1034         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1035                 return -1;
1036
1037         /* Prevent a race between resync-handshake and
1038          * being promoted to Primary.
1039          *
1040          * Grab and release the state mutex, so we know that any current
1041          * drbd_set_role() is finished, and any incoming drbd_set_role
1042          * will see the STATE_SENT flag, and wait for it to be cleared.
1043          */
1044         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1045                 mutex_lock(peer_device->device->state_mutex);
1046
1047         set_bit(STATE_SENT, &connection->flags);
1048
1049         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1050                 mutex_unlock(peer_device->device->state_mutex);
1051
1052         rcu_read_lock();
1053         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1054                 struct drbd_device *device = peer_device->device;
1055                 kref_get(&device->kref);
1056                 rcu_read_unlock();
1057
1058                 if (discard_my_data)
1059                         set_bit(DISCARD_MY_DATA, &device->flags);
1060                 else
1061                         clear_bit(DISCARD_MY_DATA, &device->flags);
1062
1063                 drbd_connected(peer_device);
1064                 kref_put(&device->kref, drbd_destroy_device);
1065                 rcu_read_lock();
1066         }
1067         rcu_read_unlock();
1068
1069         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1070         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1071                 clear_bit(STATE_SENT, &connection->flags);
1072                 return 0;
1073         }
1074
1075         drbd_thread_start(&connection->asender);
1076
1077         mutex_lock(&connection->resource->conf_update);
1078         /* The discard_my_data flag is a single-shot modifier to the next
1079          * connection attempt, the handshake of which is now well underway.
1080          * No need for rcu style copying of the whole struct
1081          * just to clear a single value. */
1082         connection->net_conf->discard_my_data = 0;
1083         mutex_unlock(&connection->resource->conf_update);
1084
1085         return h;
1086
1087 out_release_sockets:
1088         if (ad.s_listen)
1089                 sock_release(ad.s_listen);
1090         if (sock.socket)
1091                 sock_release(sock.socket);
1092         if (msock.socket)
1093                 sock_release(msock.socket);
1094         return -1;
1095 }
1096
1097 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1098 {
1099         unsigned int header_size = drbd_header_size(connection);
1100
1101         if (header_size == sizeof(struct p_header100) &&
1102             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1103                 struct p_header100 *h = header;
1104                 if (h->pad != 0) {
1105                         drbd_err(connection, "Header padding is not zero\n");
1106                         return -EINVAL;
1107                 }
1108                 pi->vnr = be16_to_cpu(h->volume);
1109                 pi->cmd = be16_to_cpu(h->command);
1110                 pi->size = be32_to_cpu(h->length);
1111         } else if (header_size == sizeof(struct p_header95) &&
1112                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1113                 struct p_header95 *h = header;
1114                 pi->cmd = be16_to_cpu(h->command);
1115                 pi->size = be32_to_cpu(h->length);
1116                 pi->vnr = 0;
1117         } else if (header_size == sizeof(struct p_header80) &&
1118                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1119                 struct p_header80 *h = header;
1120                 pi->cmd = be16_to_cpu(h->command);
1121                 pi->size = be16_to_cpu(h->length);
1122                 pi->vnr = 0;
1123         } else {
1124                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1125                          be32_to_cpu(*(__be32 *)header),
1126                          connection->agreed_pro_version);
1127                 return -EINVAL;
1128         }
1129         pi->data = header + header_size;
1130         return 0;
1131 }
1132
1133 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134 {
1135         void *buffer = connection->data.rbuf;
1136         int err;
1137
1138         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1139         if (err)
1140                 return err;
1141
1142         err = decode_header(connection, buffer, pi);
1143         connection->last_received = jiffies;
1144
1145         return err;
1146 }
1147
1148 static void drbd_flush(struct drbd_connection *connection)
1149 {
1150         int rv;
1151         struct drbd_peer_device *peer_device;
1152         int vnr;
1153
1154         if (connection->write_ordering >= WO_bdev_flush) {
1155                 rcu_read_lock();
1156                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1157                         struct drbd_device *device = peer_device->device;
1158
1159                         if (!get_ldev(device))
1160                                 continue;
1161                         kref_get(&device->kref);
1162                         rcu_read_unlock();
1163
1164                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1165                                         GFP_NOIO, NULL);
1166                         if (rv) {
1167                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1168                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1169                                  * don't try again for ANY return value != 0
1170                                  * if (rv == -EOPNOTSUPP) */
1171                                 drbd_bump_write_ordering(connection, WO_drain_io);
1172                         }
1173                         put_ldev(device);
1174                         kref_put(&device->kref, drbd_destroy_device);
1175
1176                         rcu_read_lock();
1177                         if (rv)
1178                                 break;
1179                 }
1180                 rcu_read_unlock();
1181         }
1182 }
1183
1184 /**
1185  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1186  * @device:     DRBD device.
1187  * @epoch:      Epoch object.
1188  * @ev:         Epoch event.
1189  */
1190 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1191                                                struct drbd_epoch *epoch,
1192                                                enum epoch_event ev)
1193 {
1194         int epoch_size;
1195         struct drbd_epoch *next_epoch;
1196         enum finish_epoch rv = FE_STILL_LIVE;
1197
1198         spin_lock(&connection->epoch_lock);
1199         do {
1200                 next_epoch = NULL;
1201
1202                 epoch_size = atomic_read(&epoch->epoch_size);
1203
1204                 switch (ev & ~EV_CLEANUP) {
1205                 case EV_PUT:
1206                         atomic_dec(&epoch->active);
1207                         break;
1208                 case EV_GOT_BARRIER_NR:
1209                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1210                         break;
1211                 case EV_BECAME_LAST:
1212                         /* nothing to do*/
1213                         break;
1214                 }
1215
1216                 if (epoch_size != 0 &&
1217                     atomic_read(&epoch->active) == 0 &&
1218                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1219                         if (!(ev & EV_CLEANUP)) {
1220                                 spin_unlock(&connection->epoch_lock);
1221                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1222                                 spin_lock(&connection->epoch_lock);
1223                         }
1224 #if 0
1225                         /* FIXME: dec unacked on connection, once we have
1226                          * something to count pending connection packets in. */
1227                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1228                                 dec_unacked(epoch->connection);
1229 #endif
1230
1231                         if (connection->current_epoch != epoch) {
1232                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1233                                 list_del(&epoch->list);
1234                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1235                                 connection->epochs--;
1236                                 kfree(epoch);
1237
1238                                 if (rv == FE_STILL_LIVE)
1239                                         rv = FE_DESTROYED;
1240                         } else {
1241                                 epoch->flags = 0;
1242                                 atomic_set(&epoch->epoch_size, 0);
1243                                 /* atomic_set(&epoch->active, 0); is already zero */
1244                                 if (rv == FE_STILL_LIVE)
1245                                         rv = FE_RECYCLED;
1246                         }
1247                 }
1248
1249                 if (!next_epoch)
1250                         break;
1251
1252                 epoch = next_epoch;
1253         } while (1);
1254
1255         spin_unlock(&connection->epoch_lock);
1256
1257         return rv;
1258 }
1259
1260 /**
1261  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1262  * @connection: DRBD connection.
1263  * @wo:         Write ordering method to try.
1264  */
1265 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1266 {
1267         struct disk_conf *dc;
1268         struct drbd_peer_device *peer_device;
1269         enum write_ordering_e pwo;
1270         int vnr;
1271         static char *write_ordering_str[] = {
1272                 [WO_none] = "none",
1273                 [WO_drain_io] = "drain",
1274                 [WO_bdev_flush] = "flush",
1275         };
1276
1277         pwo = connection->write_ordering;
1278         wo = min(pwo, wo);
1279         rcu_read_lock();
1280         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1281                 struct drbd_device *device = peer_device->device;
1282
1283                 if (!get_ldev_if_state(device, D_ATTACHING))
1284                         continue;
1285                 dc = rcu_dereference(device->ldev->disk_conf);
1286
1287                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1288                         wo = WO_drain_io;
1289                 if (wo == WO_drain_io && !dc->disk_drain)
1290                         wo = WO_none;
1291                 put_ldev(device);
1292         }
1293         rcu_read_unlock();
1294         connection->write_ordering = wo;
1295         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1296                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1297 }
1298
1299 /**
1300  * drbd_submit_peer_request()
1301  * @device:     DRBD device.
1302  * @peer_req:   peer request
1303  * @rw:         flag field, see bio->bi_rw
1304  *
1305  * May spread the pages to multiple bios,
1306  * depending on bio_add_page restrictions.
1307  *
1308  * Returns 0 if all bios have been submitted,
1309  * -ENOMEM if we could not allocate enough bios,
1310  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1311  *  single page to an empty bio (which should never happen and likely indicates
1312  *  that the lower level IO stack is in some way broken). This has been observed
1313  *  on certain Xen deployments.
1314  */
1315 /* TODO allocate from our own bio_set. */
1316 int drbd_submit_peer_request(struct drbd_device *device,
1317                              struct drbd_peer_request *peer_req,
1318                              const unsigned rw, const int fault_type)
1319 {
1320         struct bio *bios = NULL;
1321         struct bio *bio;
1322         struct page *page = peer_req->pages;
1323         sector_t sector = peer_req->i.sector;
1324         unsigned ds = peer_req->i.size;
1325         unsigned n_bios = 0;
1326         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1327         int err = -ENOMEM;
1328
1329         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1330                 /* wait for all pending IO completions, before we start
1331                  * zeroing things out. */
1332                 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1333                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1334                         sector, ds >> 9, GFP_NOIO))
1335                         peer_req->flags |= EE_WAS_ERROR;
1336                 drbd_endio_write_sec_final(peer_req);
1337                 return 0;
1338         }
1339
1340         /* Discards don't have any payload.
1341          * But the scsi layer still expects a bio_vec it can use internally,
1342          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1343         if (peer_req->flags & EE_IS_TRIM)
1344                 nr_pages = 1;
1345
1346         /* In most cases, we will only need one bio.  But in case the lower
1347          * level restrictions happen to be different at this offset on this
1348          * side than those of the sending peer, we may need to submit the
1349          * request in more than one bio.
1350          *
1351          * Plain bio_alloc is good enough here, this is no DRBD internally
1352          * generated bio, but a bio allocated on behalf of the peer.
1353          */
1354 next_bio:
1355         bio = bio_alloc(GFP_NOIO, nr_pages);
1356         if (!bio) {
1357                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1358                 goto fail;
1359         }
1360         /* > peer_req->i.sector, unless this is the first bio */
1361         bio->bi_iter.bi_sector = sector;
1362         bio->bi_bdev = device->ldev->backing_bdev;
1363         bio->bi_rw = rw;
1364         bio->bi_private = peer_req;
1365         bio->bi_end_io = drbd_peer_request_endio;
1366
1367         bio->bi_next = bios;
1368         bios = bio;
1369         ++n_bios;
1370
1371         if (rw & REQ_DISCARD) {
1372                 bio->bi_iter.bi_size = ds;
1373                 goto submit;
1374         }
1375
1376         page_chain_for_each(page) {
1377                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1378                 if (!bio_add_page(bio, page, len, 0)) {
1379                         /* A single page must always be possible!
1380                          * But in case it fails anyways,
1381                          * we deal with it, and complain (below). */
1382                         if (bio->bi_vcnt == 0) {
1383                                 drbd_err(device,
1384                                         "bio_add_page failed for len=%u, "
1385                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1386                                         len, (uint64_t)bio->bi_iter.bi_sector);
1387                                 err = -ENOSPC;
1388                                 goto fail;
1389                         }
1390                         goto next_bio;
1391                 }
1392                 ds -= len;
1393                 sector += len >> 9;
1394                 --nr_pages;
1395         }
1396         D_ASSERT(device, ds == 0);
1397 submit:
1398         D_ASSERT(device, page == NULL);
1399
1400         atomic_set(&peer_req->pending_bios, n_bios);
1401         do {
1402                 bio = bios;
1403                 bios = bios->bi_next;
1404                 bio->bi_next = NULL;
1405
1406                 drbd_generic_make_request(device, fault_type, bio);
1407         } while (bios);
1408         return 0;
1409
1410 fail:
1411         while (bios) {
1412                 bio = bios;
1413                 bios = bios->bi_next;
1414                 bio_put(bio);
1415         }
1416         return err;
1417 }
1418
1419 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1420                                              struct drbd_peer_request *peer_req)
1421 {
1422         struct drbd_interval *i = &peer_req->i;
1423
1424         drbd_remove_interval(&device->write_requests, i);
1425         drbd_clear_interval(i);
1426
1427         /* Wake up any processes waiting for this peer request to complete.  */
1428         if (i->waiting)
1429                 wake_up(&device->misc_wait);
1430 }
1431
1432 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1433 {
1434         struct drbd_peer_device *peer_device;
1435         int vnr;
1436
1437         rcu_read_lock();
1438         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1439                 struct drbd_device *device = peer_device->device;
1440
1441                 kref_get(&device->kref);
1442                 rcu_read_unlock();
1443                 drbd_wait_ee_list_empty(device, &device->active_ee);
1444                 kref_put(&device->kref, drbd_destroy_device);
1445                 rcu_read_lock();
1446         }
1447         rcu_read_unlock();
1448 }
1449
1450 static struct drbd_peer_device *
1451 conn_peer_device(struct drbd_connection *connection, int volume_number)
1452 {
1453         return idr_find(&connection->peer_devices, volume_number);
1454 }
1455
1456 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1457 {
1458         int rv;
1459         struct p_barrier *p = pi->data;
1460         struct drbd_epoch *epoch;
1461
1462         /* FIXME these are unacked on connection,
1463          * not a specific (peer)device.
1464          */
1465         connection->current_epoch->barrier_nr = p->barrier;
1466         connection->current_epoch->connection = connection;
1467         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1468
1469         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1470          * the activity log, which means it would not be resynced in case the
1471          * R_PRIMARY crashes now.
1472          * Therefore we must send the barrier_ack after the barrier request was
1473          * completed. */
1474         switch (connection->write_ordering) {
1475         case WO_none:
1476                 if (rv == FE_RECYCLED)
1477                         return 0;
1478
1479                 /* receiver context, in the writeout path of the other node.
1480                  * avoid potential distributed deadlock */
1481                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1482                 if (epoch)
1483                         break;
1484                 else
1485                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1486                         /* Fall through */
1487
1488         case WO_bdev_flush:
1489         case WO_drain_io:
1490                 conn_wait_active_ee_empty(connection);
1491                 drbd_flush(connection);
1492
1493                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1494                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1495                         if (epoch)
1496                                 break;
1497                 }
1498
1499                 return 0;
1500         default:
1501                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1502                 return -EIO;
1503         }
1504
1505         epoch->flags = 0;
1506         atomic_set(&epoch->epoch_size, 0);
1507         atomic_set(&epoch->active, 0);
1508
1509         spin_lock(&connection->epoch_lock);
1510         if (atomic_read(&connection->current_epoch->epoch_size)) {
1511                 list_add(&epoch->list, &connection->current_epoch->list);
1512                 connection->current_epoch = epoch;
1513                 connection->epochs++;
1514         } else {
1515                 /* The current_epoch got recycled while we allocated this one... */
1516                 kfree(epoch);
1517         }
1518         spin_unlock(&connection->epoch_lock);
1519
1520         return 0;
1521 }
1522
1523 /* used from receive_RSDataReply (recv_resync_read)
1524  * and from receive_Data */
1525 static struct drbd_peer_request *
1526 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1527               struct packet_info *pi) __must_hold(local)
1528 {
1529         struct drbd_device *device = peer_device->device;
1530         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1531         struct drbd_peer_request *peer_req;
1532         struct page *page;
1533         int dgs, ds, err;
1534         int data_size = pi->size;
1535         void *dig_in = peer_device->connection->int_dig_in;
1536         void *dig_vv = peer_device->connection->int_dig_vv;
1537         unsigned long *data;
1538         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1539
1540         dgs = 0;
1541         if (!trim && peer_device->connection->peer_integrity_tfm) {
1542                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1543                 /*
1544                  * FIXME: Receive the incoming digest into the receive buffer
1545                  *        here, together with its struct p_data?
1546                  */
1547                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1548                 if (err)
1549                         return NULL;
1550                 data_size -= dgs;
1551         }
1552
1553         if (trim) {
1554                 D_ASSERT(peer_device, data_size == 0);
1555                 data_size = be32_to_cpu(trim->size);
1556         }
1557
1558         if (!expect(IS_ALIGNED(data_size, 512)))
1559                 return NULL;
1560         /* prepare for larger trim requests. */
1561         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1562                 return NULL;
1563
1564         /* even though we trust out peer,
1565          * we sometimes have to double check. */
1566         if (sector + (data_size>>9) > capacity) {
1567                 drbd_err(device, "request from peer beyond end of local disk: "
1568                         "capacity: %llus < sector: %llus + size: %u\n",
1569                         (unsigned long long)capacity,
1570                         (unsigned long long)sector, data_size);
1571                 return NULL;
1572         }
1573
1574         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1575          * "criss-cross" setup, that might cause write-out on some other DRBD,
1576          * which in turn might block on the other node at this very place.  */
1577         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1578         if (!peer_req)
1579                 return NULL;
1580
1581         if (trim)
1582                 return peer_req;
1583
1584         ds = data_size;
1585         page = peer_req->pages;
1586         page_chain_for_each(page) {
1587                 unsigned len = min_t(int, ds, PAGE_SIZE);
1588                 data = kmap(page);
1589                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1590                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1591                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1592                         data[0] = data[0] ^ (unsigned long)-1;
1593                 }
1594                 kunmap(page);
1595                 if (err) {
1596                         drbd_free_peer_req(device, peer_req);
1597                         return NULL;
1598                 }
1599                 ds -= len;
1600         }
1601
1602         if (dgs) {
1603                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1604                 if (memcmp(dig_in, dig_vv, dgs)) {
1605                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1606                                 (unsigned long long)sector, data_size);
1607                         drbd_free_peer_req(device, peer_req);
1608                         return NULL;
1609                 }
1610         }
1611         device->recv_cnt += data_size>>9;
1612         return peer_req;
1613 }
1614
1615 /* drbd_drain_block() just takes a data block
1616  * out of the socket input buffer, and discards it.
1617  */
1618 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1619 {
1620         struct page *page;
1621         int err = 0;
1622         void *data;
1623
1624         if (!data_size)
1625                 return 0;
1626
1627         page = drbd_alloc_pages(peer_device, 1, 1);
1628
1629         data = kmap(page);
1630         while (data_size) {
1631                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1632
1633                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1634                 if (err)
1635                         break;
1636                 data_size -= len;
1637         }
1638         kunmap(page);
1639         drbd_free_pages(peer_device->device, page, 0);
1640         return err;
1641 }
1642
1643 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1644                            sector_t sector, int data_size)
1645 {
1646         struct bio_vec bvec;
1647         struct bvec_iter iter;
1648         struct bio *bio;
1649         int dgs, err, expect;
1650         void *dig_in = peer_device->connection->int_dig_in;
1651         void *dig_vv = peer_device->connection->int_dig_vv;
1652
1653         dgs = 0;
1654         if (peer_device->connection->peer_integrity_tfm) {
1655                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1656                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1657                 if (err)
1658                         return err;
1659                 data_size -= dgs;
1660         }
1661
1662         /* optimistically update recv_cnt.  if receiving fails below,
1663          * we disconnect anyways, and counters will be reset. */
1664         peer_device->device->recv_cnt += data_size>>9;
1665
1666         bio = req->master_bio;
1667         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1668
1669         bio_for_each_segment(bvec, bio, iter) {
1670                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1671                 expect = min_t(int, data_size, bvec.bv_len);
1672                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1673                 kunmap(bvec.bv_page);
1674                 if (err)
1675                         return err;
1676                 data_size -= expect;
1677         }
1678
1679         if (dgs) {
1680                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1681                 if (memcmp(dig_in, dig_vv, dgs)) {
1682                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1683                         return -EINVAL;
1684                 }
1685         }
1686
1687         D_ASSERT(peer_device->device, data_size == 0);
1688         return 0;
1689 }
1690
1691 /*
1692  * e_end_resync_block() is called in asender context via
1693  * drbd_finish_peer_reqs().
1694  */
1695 static int e_end_resync_block(struct drbd_work *w, int unused)
1696 {
1697         struct drbd_peer_request *peer_req =
1698                 container_of(w, struct drbd_peer_request, w);
1699         struct drbd_peer_device *peer_device = peer_req->peer_device;
1700         struct drbd_device *device = peer_device->device;
1701         sector_t sector = peer_req->i.sector;
1702         int err;
1703
1704         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1705
1706         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1707                 drbd_set_in_sync(device, sector, peer_req->i.size);
1708                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1709         } else {
1710                 /* Record failure to sync */
1711                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1712
1713                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1714         }
1715         dec_unacked(device);
1716
1717         return err;
1718 }
1719
1720 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1721                             struct packet_info *pi) __releases(local)
1722 {
1723         struct drbd_device *device = peer_device->device;
1724         struct drbd_peer_request *peer_req;
1725
1726         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1727         if (!peer_req)
1728                 goto fail;
1729
1730         dec_rs_pending(device);
1731
1732         inc_unacked(device);
1733         /* corresponding dec_unacked() in e_end_resync_block()
1734          * respective _drbd_clear_done_ee */
1735
1736         peer_req->w.cb = e_end_resync_block;
1737
1738         spin_lock_irq(&device->resource->req_lock);
1739         list_add(&peer_req->w.list, &device->sync_ee);
1740         spin_unlock_irq(&device->resource->req_lock);
1741
1742         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1743         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1744                 return 0;
1745
1746         /* don't care for the reason here */
1747         drbd_err(device, "submit failed, triggering re-connect\n");
1748         spin_lock_irq(&device->resource->req_lock);
1749         list_del(&peer_req->w.list);
1750         spin_unlock_irq(&device->resource->req_lock);
1751
1752         drbd_free_peer_req(device, peer_req);
1753 fail:
1754         put_ldev(device);
1755         return -EIO;
1756 }
1757
1758 static struct drbd_request *
1759 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1760              sector_t sector, bool missing_ok, const char *func)
1761 {
1762         struct drbd_request *req;
1763
1764         /* Request object according to our peer */
1765         req = (struct drbd_request *)(unsigned long)id;
1766         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1767                 return req;
1768         if (!missing_ok) {
1769                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1770                         (unsigned long)id, (unsigned long long)sector);
1771         }
1772         return NULL;
1773 }
1774
1775 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1776 {
1777         struct drbd_peer_device *peer_device;
1778         struct drbd_device *device;
1779         struct drbd_request *req;
1780         sector_t sector;
1781         int err;
1782         struct p_data *p = pi->data;
1783
1784         peer_device = conn_peer_device(connection, pi->vnr);
1785         if (!peer_device)
1786                 return -EIO;
1787         device = peer_device->device;
1788
1789         sector = be64_to_cpu(p->sector);
1790
1791         spin_lock_irq(&device->resource->req_lock);
1792         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1793         spin_unlock_irq(&device->resource->req_lock);
1794         if (unlikely(!req))
1795                 return -EIO;
1796
1797         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1798          * special casing it there for the various failure cases.
1799          * still no race with drbd_fail_pending_reads */
1800         err = recv_dless_read(peer_device, req, sector, pi->size);
1801         if (!err)
1802                 req_mod(req, DATA_RECEIVED);
1803         /* else: nothing. handled from drbd_disconnect...
1804          * I don't think we may complete this just yet
1805          * in case we are "on-disconnect: freeze" */
1806
1807         return err;
1808 }
1809
1810 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1811 {
1812         struct drbd_peer_device *peer_device;
1813         struct drbd_device *device;
1814         sector_t sector;
1815         int err;
1816         struct p_data *p = pi->data;
1817
1818         peer_device = conn_peer_device(connection, pi->vnr);
1819         if (!peer_device)
1820                 return -EIO;
1821         device = peer_device->device;
1822
1823         sector = be64_to_cpu(p->sector);
1824         D_ASSERT(device, p->block_id == ID_SYNCER);
1825
1826         if (get_ldev(device)) {
1827                 /* data is submitted to disk within recv_resync_read.
1828                  * corresponding put_ldev done below on error,
1829                  * or in drbd_peer_request_endio. */
1830                 err = recv_resync_read(peer_device, sector, pi);
1831         } else {
1832                 if (__ratelimit(&drbd_ratelimit_state))
1833                         drbd_err(device, "Can not write resync data to local disk.\n");
1834
1835                 err = drbd_drain_block(peer_device, pi->size);
1836
1837                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1838         }
1839
1840         atomic_add(pi->size >> 9, &device->rs_sect_in);
1841
1842         return err;
1843 }
1844
1845 static void restart_conflicting_writes(struct drbd_device *device,
1846                                        sector_t sector, int size)
1847 {
1848         struct drbd_interval *i;
1849         struct drbd_request *req;
1850
1851         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1852                 if (!i->local)
1853                         continue;
1854                 req = container_of(i, struct drbd_request, i);
1855                 if (req->rq_state & RQ_LOCAL_PENDING ||
1856                     !(req->rq_state & RQ_POSTPONED))
1857                         continue;
1858                 /* as it is RQ_POSTPONED, this will cause it to
1859                  * be queued on the retry workqueue. */
1860                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1861         }
1862 }
1863
1864 /*
1865  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1866  */
1867 static int e_end_block(struct drbd_work *w, int cancel)
1868 {
1869         struct drbd_peer_request *peer_req =
1870                 container_of(w, struct drbd_peer_request, w);
1871         struct drbd_peer_device *peer_device = peer_req->peer_device;
1872         struct drbd_device *device = peer_device->device;
1873         sector_t sector = peer_req->i.sector;
1874         int err = 0, pcmd;
1875
1876         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1877                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1878                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1879                                 device->state.conn <= C_PAUSED_SYNC_T &&
1880                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1881                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1882                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1883                         if (pcmd == P_RS_WRITE_ACK)
1884                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1885                 } else {
1886                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1887                         /* we expect it to be marked out of sync anyways...
1888                          * maybe assert this?  */
1889                 }
1890                 dec_unacked(device);
1891         }
1892         /* we delete from the conflict detection hash _after_ we sent out the
1893          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1894         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1895                 spin_lock_irq(&device->resource->req_lock);
1896                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1897                 drbd_remove_epoch_entry_interval(device, peer_req);
1898                 if (peer_req->flags & EE_RESTART_REQUESTS)
1899                         restart_conflicting_writes(device, sector, peer_req->i.size);
1900                 spin_unlock_irq(&device->resource->req_lock);
1901         } else
1902                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1903
1904         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1905
1906         return err;
1907 }
1908
1909 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1910 {
1911         struct drbd_peer_request *peer_req =
1912                 container_of(w, struct drbd_peer_request, w);
1913         struct drbd_peer_device *peer_device = peer_req->peer_device;
1914         int err;
1915
1916         err = drbd_send_ack(peer_device, ack, peer_req);
1917         dec_unacked(peer_device->device);
1918
1919         return err;
1920 }
1921
1922 static int e_send_superseded(struct drbd_work *w, int unused)
1923 {
1924         return e_send_ack(w, P_SUPERSEDED);
1925 }
1926
1927 static int e_send_retry_write(struct drbd_work *w, int unused)
1928 {
1929         struct drbd_peer_request *peer_req =
1930                 container_of(w, struct drbd_peer_request, w);
1931         struct drbd_connection *connection = peer_req->peer_device->connection;
1932
1933         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1934                              P_RETRY_WRITE : P_SUPERSEDED);
1935 }
1936
1937 static bool seq_greater(u32 a, u32 b)
1938 {
1939         /*
1940          * We assume 32-bit wrap-around here.
1941          * For 24-bit wrap-around, we would have to shift:
1942          *  a <<= 8; b <<= 8;
1943          */
1944         return (s32)a - (s32)b > 0;
1945 }
1946
1947 static u32 seq_max(u32 a, u32 b)
1948 {
1949         return seq_greater(a, b) ? a : b;
1950 }
1951
1952 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1953 {
1954         struct drbd_device *device = peer_device->device;
1955         unsigned int newest_peer_seq;
1956
1957         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1958                 spin_lock(&device->peer_seq_lock);
1959                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1960                 device->peer_seq = newest_peer_seq;
1961                 spin_unlock(&device->peer_seq_lock);
1962                 /* wake up only if we actually changed device->peer_seq */
1963                 if (peer_seq == newest_peer_seq)
1964                         wake_up(&device->seq_wait);
1965         }
1966 }
1967
1968 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1969 {
1970         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1971 }
1972
1973 /* maybe change sync_ee into interval trees as well? */
1974 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1975 {
1976         struct drbd_peer_request *rs_req;
1977         bool rv = 0;
1978
1979         spin_lock_irq(&device->resource->req_lock);
1980         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1981                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1982                              rs_req->i.sector, rs_req->i.size)) {
1983                         rv = 1;
1984                         break;
1985                 }
1986         }
1987         spin_unlock_irq(&device->resource->req_lock);
1988
1989         return rv;
1990 }
1991
1992 /* Called from receive_Data.
1993  * Synchronize packets on sock with packets on msock.
1994  *
1995  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1996  * packet traveling on msock, they are still processed in the order they have
1997  * been sent.
1998  *
1999  * Note: we don't care for Ack packets overtaking P_DATA packets.
2000  *
2001  * In case packet_seq is larger than device->peer_seq number, there are
2002  * outstanding packets on the msock. We wait for them to arrive.
2003  * In case we are the logically next packet, we update device->peer_seq
2004  * ourselves. Correctly handles 32bit wrap around.
2005  *
2006  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2007  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2008  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2009  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2010  *
2011  * returns 0 if we may process the packet,
2012  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2013 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2014 {
2015         struct drbd_device *device = peer_device->device;
2016         DEFINE_WAIT(wait);
2017         long timeout;
2018         int ret = 0, tp;
2019
2020         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2021                 return 0;
2022
2023         spin_lock(&device->peer_seq_lock);
2024         for (;;) {
2025                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2026                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2027                         break;
2028                 }
2029
2030                 if (signal_pending(current)) {
2031                         ret = -ERESTARTSYS;
2032                         break;
2033                 }
2034
2035                 rcu_read_lock();
2036                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2037                 rcu_read_unlock();
2038
2039                 if (!tp)
2040                         break;
2041
2042                 /* Only need to wait if two_primaries is enabled */
2043                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2044                 spin_unlock(&device->peer_seq_lock);
2045                 rcu_read_lock();
2046                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2047                 rcu_read_unlock();
2048                 timeout = schedule_timeout(timeout);
2049                 spin_lock(&device->peer_seq_lock);
2050                 if (!timeout) {
2051                         ret = -ETIMEDOUT;
2052                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2053                         break;
2054                 }
2055         }
2056         spin_unlock(&device->peer_seq_lock);
2057         finish_wait(&device->seq_wait, &wait);
2058         return ret;
2059 }
2060
2061 /* see also bio_flags_to_wire()
2062  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2063  * flags and back. We may replicate to other kernel versions. */
2064 static unsigned long wire_flags_to_bio(u32 dpf)
2065 {
2066         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2067                 (dpf & DP_FUA ? REQ_FUA : 0) |
2068                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2069                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2070 }
2071
2072 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2073                                     unsigned int size)
2074 {
2075         struct drbd_interval *i;
2076
2077     repeat:
2078         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2079                 struct drbd_request *req;
2080                 struct bio_and_error m;
2081
2082                 if (!i->local)
2083                         continue;
2084                 req = container_of(i, struct drbd_request, i);
2085                 if (!(req->rq_state & RQ_POSTPONED))
2086                         continue;
2087                 req->rq_state &= ~RQ_POSTPONED;
2088                 __req_mod(req, NEG_ACKED, &m);
2089                 spin_unlock_irq(&device->resource->req_lock);
2090                 if (m.bio)
2091                         complete_master_bio(device, &m);
2092                 spin_lock_irq(&device->resource->req_lock);
2093                 goto repeat;
2094         }
2095 }
2096
2097 static int handle_write_conflicts(struct drbd_device *device,
2098                                   struct drbd_peer_request *peer_req)
2099 {
2100         struct drbd_connection *connection = peer_req->peer_device->connection;
2101         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2102         sector_t sector = peer_req->i.sector;
2103         const unsigned int size = peer_req->i.size;
2104         struct drbd_interval *i;
2105         bool equal;
2106         int err;
2107
2108         /*
2109          * Inserting the peer request into the write_requests tree will prevent
2110          * new conflicting local requests from being added.
2111          */
2112         drbd_insert_interval(&device->write_requests, &peer_req->i);
2113
2114     repeat:
2115         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2116                 if (i == &peer_req->i)
2117                         continue;
2118
2119                 if (!i->local) {
2120                         /*
2121                          * Our peer has sent a conflicting remote request; this
2122                          * should not happen in a two-node setup.  Wait for the
2123                          * earlier peer request to complete.
2124                          */
2125                         err = drbd_wait_misc(device, i);
2126                         if (err)
2127                                 goto out;
2128                         goto repeat;
2129                 }
2130
2131                 equal = i->sector == sector && i->size == size;
2132                 if (resolve_conflicts) {
2133                         /*
2134                          * If the peer request is fully contained within the
2135                          * overlapping request, it can be considered overwritten
2136                          * and thus superseded; otherwise, it will be retried
2137                          * once all overlapping requests have completed.
2138                          */
2139                         bool superseded = i->sector <= sector && i->sector +
2140                                        (i->size >> 9) >= sector + (size >> 9);
2141
2142                         if (!equal)
2143                                 drbd_alert(device, "Concurrent writes detected: "
2144                                                "local=%llus +%u, remote=%llus +%u, "
2145                                                "assuming %s came first\n",
2146                                           (unsigned long long)i->sector, i->size,
2147                                           (unsigned long long)sector, size,
2148                                           superseded ? "local" : "remote");
2149
2150                         inc_unacked(device);
2151                         peer_req->w.cb = superseded ? e_send_superseded :
2152                                                    e_send_retry_write;
2153                         list_add_tail(&peer_req->w.list, &device->done_ee);
2154                         wake_asender(connection);
2155
2156                         err = -ENOENT;
2157                         goto out;
2158                 } else {
2159                         struct drbd_request *req =
2160                                 container_of(i, struct drbd_request, i);
2161
2162                         if (!equal)
2163                                 drbd_alert(device, "Concurrent writes detected: "
2164                                                "local=%llus +%u, remote=%llus +%u\n",
2165                                           (unsigned long long)i->sector, i->size,
2166                                           (unsigned long long)sector, size);
2167
2168                         if (req->rq_state & RQ_LOCAL_PENDING ||
2169                             !(req->rq_state & RQ_POSTPONED)) {
2170                                 /*
2171                                  * Wait for the node with the discard flag to
2172                                  * decide if this request has been superseded
2173                                  * or needs to be retried.
2174                                  * Requests that have been superseded will
2175                                  * disappear from the write_requests tree.
2176                                  *
2177                                  * In addition, wait for the conflicting
2178                                  * request to finish locally before submitting
2179                                  * the conflicting peer request.
2180                                  */
2181                                 err = drbd_wait_misc(device, &req->i);
2182                                 if (err) {
2183                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2184                                         fail_postponed_requests(device, sector, size);
2185                                         goto out;
2186                                 }
2187                                 goto repeat;
2188                         }
2189                         /*
2190                          * Remember to restart the conflicting requests after
2191                          * the new peer request has completed.
2192                          */
2193                         peer_req->flags |= EE_RESTART_REQUESTS;
2194                 }
2195         }
2196         err = 0;
2197
2198     out:
2199         if (err)
2200                 drbd_remove_epoch_entry_interval(device, peer_req);
2201         return err;
2202 }
2203
2204 /* mirrored write */
2205 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2206 {
2207         struct drbd_peer_device *peer_device;
2208         struct drbd_device *device;
2209         sector_t sector;
2210         struct drbd_peer_request *peer_req;
2211         struct p_data *p = pi->data;
2212         u32 peer_seq = be32_to_cpu(p->seq_num);
2213         int rw = WRITE;
2214         u32 dp_flags;
2215         int err, tp;
2216
2217         peer_device = conn_peer_device(connection, pi->vnr);
2218         if (!peer_device)
2219                 return -EIO;
2220         device = peer_device->device;
2221
2222         if (!get_ldev(device)) {
2223                 int err2;
2224
2225                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2226                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2227                 atomic_inc(&connection->current_epoch->epoch_size);
2228                 err2 = drbd_drain_block(peer_device, pi->size);
2229                 if (!err)
2230                         err = err2;
2231                 return err;
2232         }
2233
2234         /*
2235          * Corresponding put_ldev done either below (on various errors), or in
2236          * drbd_peer_request_endio, if we successfully submit the data at the
2237          * end of this function.
2238          */
2239
2240         sector = be64_to_cpu(p->sector);
2241         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2242         if (!peer_req) {
2243                 put_ldev(device);
2244                 return -EIO;
2245         }
2246
2247         peer_req->w.cb = e_end_block;
2248
2249         dp_flags = be32_to_cpu(p->dp_flags);
2250         rw |= wire_flags_to_bio(dp_flags);
2251         if (pi->cmd == P_TRIM) {
2252                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2253                 peer_req->flags |= EE_IS_TRIM;
2254                 if (!blk_queue_discard(q))
2255                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2256                 D_ASSERT(peer_device, peer_req->i.size > 0);
2257                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2258                 D_ASSERT(peer_device, peer_req->pages == NULL);
2259         } else if (peer_req->pages == NULL) {
2260                 D_ASSERT(device, peer_req->i.size == 0);
2261                 D_ASSERT(device, dp_flags & DP_FLUSH);
2262         }
2263
2264         if (dp_flags & DP_MAY_SET_IN_SYNC)
2265                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2266
2267         spin_lock(&connection->epoch_lock);
2268         peer_req->epoch = connection->current_epoch;
2269         atomic_inc(&peer_req->epoch->epoch_size);
2270         atomic_inc(&peer_req->epoch->active);
2271         spin_unlock(&connection->epoch_lock);
2272
2273         rcu_read_lock();
2274         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2275         rcu_read_unlock();
2276         if (tp) {
2277                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2278                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2279                 if (err)
2280                         goto out_interrupted;
2281                 spin_lock_irq(&device->resource->req_lock);
2282                 err = handle_write_conflicts(device, peer_req);
2283                 if (err) {
2284                         spin_unlock_irq(&device->resource->req_lock);
2285                         if (err == -ENOENT) {
2286                                 put_ldev(device);
2287                                 return 0;
2288                         }
2289                         goto out_interrupted;
2290                 }
2291         } else {
2292                 update_peer_seq(peer_device, peer_seq);
2293                 spin_lock_irq(&device->resource->req_lock);
2294         }
2295         /* if we use the zeroout fallback code, we process synchronously
2296          * and we wait for all pending requests, respectively wait for
2297          * active_ee to become empty in drbd_submit_peer_request();
2298          * better not add ourselves here. */
2299         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2300                 list_add(&peer_req->w.list, &device->active_ee);
2301         spin_unlock_irq(&device->resource->req_lock);
2302
2303         if (device->state.conn == C_SYNC_TARGET)
2304                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2305
2306         if (peer_device->connection->agreed_pro_version < 100) {
2307                 rcu_read_lock();
2308                 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2309                 case DRBD_PROT_C:
2310                         dp_flags |= DP_SEND_WRITE_ACK;
2311                         break;
2312                 case DRBD_PROT_B:
2313                         dp_flags |= DP_SEND_RECEIVE_ACK;
2314                         break;
2315                 }
2316                 rcu_read_unlock();
2317         }
2318
2319         if (dp_flags & DP_SEND_WRITE_ACK) {
2320                 peer_req->flags |= EE_SEND_WRITE_ACK;
2321                 inc_unacked(device);
2322                 /* corresponding dec_unacked() in e_end_block()
2323                  * respective _drbd_clear_done_ee */
2324         }
2325
2326         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2327                 /* I really don't like it that the receiver thread
2328                  * sends on the msock, but anyways */
2329                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2330         }
2331
2332         if (device->state.pdsk < D_INCONSISTENT) {
2333                 /* In case we have the only disk of the cluster, */
2334                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2335                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2336                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2337                 drbd_al_begin_io(device, &peer_req->i, true);
2338         }
2339
2340         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2341         if (!err)
2342                 return 0;
2343
2344         /* don't care for the reason here */
2345         drbd_err(device, "submit failed, triggering re-connect\n");
2346         spin_lock_irq(&device->resource->req_lock);
2347         list_del(&peer_req->w.list);
2348         drbd_remove_epoch_entry_interval(device, peer_req);
2349         spin_unlock_irq(&device->resource->req_lock);
2350         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2351                 drbd_al_complete_io(device, &peer_req->i);
2352
2353 out_interrupted:
2354         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2355         put_ldev(device);
2356         drbd_free_peer_req(device, peer_req);
2357         return err;
2358 }
2359
2360 /* We may throttle resync, if the lower device seems to be busy,
2361  * and current sync rate is above c_min_rate.
2362  *
2363  * To decide whether or not the lower device is busy, we use a scheme similar
2364  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2365  * (more than 64 sectors) of activity we cannot account for with our own resync
2366  * activity, it obviously is "busy".
2367  *
2368  * The current sync rate used here uses only the most recent two step marks,
2369  * to have a short time average so we can react faster.
2370  */
2371 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2372 {
2373         struct lc_element *tmp;
2374         bool throttle = true;
2375
2376         if (!drbd_rs_c_min_rate_throttle(device))
2377                 return false;
2378
2379         spin_lock_irq(&device->al_lock);
2380         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2381         if (tmp) {
2382                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2383                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2384                         throttle = false;
2385                 /* Do not slow down if app IO is already waiting for this extent */
2386         }
2387         spin_unlock_irq(&device->al_lock);
2388
2389         return throttle;
2390 }
2391
2392 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2393 {
2394         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2395         unsigned long db, dt, dbdt;
2396         unsigned int c_min_rate;
2397         int curr_events;
2398
2399         rcu_read_lock();
2400         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2401         rcu_read_unlock();
2402
2403         /* feature disabled? */
2404         if (c_min_rate == 0)
2405                 return false;
2406
2407         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2408                       (int)part_stat_read(&disk->part0, sectors[1]) -
2409                         atomic_read(&device->rs_sect_ev);
2410         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2411                 unsigned long rs_left;
2412                 int i;
2413
2414                 device->rs_last_events = curr_events;
2415
2416                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2417                  * approx. */
2418                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2419
2420                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2421                         rs_left = device->ov_left;
2422                 else
2423                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2424
2425                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2426                 if (!dt)
2427                         dt++;
2428                 db = device->rs_mark_left[i] - rs_left;
2429                 dbdt = Bit2KB(db/dt);
2430
2431                 if (dbdt > c_min_rate)
2432                         return true;
2433         }
2434         return false;
2435 }
2436
2437 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2438 {
2439         struct drbd_peer_device *peer_device;
2440         struct drbd_device *device;
2441         sector_t sector;
2442         sector_t capacity;
2443         struct drbd_peer_request *peer_req;
2444         struct digest_info *di = NULL;
2445         int size, verb;
2446         unsigned int fault_type;
2447         struct p_block_req *p = pi->data;
2448
2449         peer_device = conn_peer_device(connection, pi->vnr);
2450         if (!peer_device)
2451                 return -EIO;
2452         device = peer_device->device;
2453         capacity = drbd_get_capacity(device->this_bdev);
2454
2455         sector = be64_to_cpu(p->sector);
2456         size   = be32_to_cpu(p->blksize);
2457
2458         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2459                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2460                                 (unsigned long long)sector, size);
2461                 return -EINVAL;
2462         }
2463         if (sector + (size>>9) > capacity) {
2464                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2465                                 (unsigned long long)sector, size);
2466                 return -EINVAL;
2467         }
2468
2469         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2470                 verb = 1;
2471                 switch (pi->cmd) {
2472                 case P_DATA_REQUEST:
2473                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2474                         break;
2475                 case P_RS_DATA_REQUEST:
2476                 case P_CSUM_RS_REQUEST:
2477                 case P_OV_REQUEST:
2478                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2479                         break;
2480                 case P_OV_REPLY:
2481                         verb = 0;
2482                         dec_rs_pending(device);
2483                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2484                         break;
2485                 default:
2486                         BUG();
2487                 }
2488                 if (verb && __ratelimit(&drbd_ratelimit_state))
2489                         drbd_err(device, "Can not satisfy peer's read request, "
2490                             "no local data.\n");
2491
2492                 /* drain possibly payload */
2493                 return drbd_drain_block(peer_device, pi->size);
2494         }
2495
2496         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2497          * "criss-cross" setup, that might cause write-out on some other DRBD,
2498          * which in turn might block on the other node at this very place.  */
2499         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2500                         true /* has real payload */, GFP_NOIO);
2501         if (!peer_req) {
2502                 put_ldev(device);
2503                 return -ENOMEM;
2504         }
2505
2506         switch (pi->cmd) {
2507         case P_DATA_REQUEST:
2508                 peer_req->w.cb = w_e_end_data_req;
2509                 fault_type = DRBD_FAULT_DT_RD;
2510                 /* application IO, don't drbd_rs_begin_io */
2511                 goto submit;
2512
2513         case P_RS_DATA_REQUEST:
2514                 peer_req->w.cb = w_e_end_rsdata_req;
2515                 fault_type = DRBD_FAULT_RS_RD;
2516                 /* used in the sector offset progress display */
2517                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2518                 break;
2519
2520         case P_OV_REPLY:
2521         case P_CSUM_RS_REQUEST:
2522                 fault_type = DRBD_FAULT_RS_RD;
2523                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2524                 if (!di)
2525                         goto out_free_e;
2526
2527                 di->digest_size = pi->size;
2528                 di->digest = (((char *)di)+sizeof(struct digest_info));
2529
2530                 peer_req->digest = di;
2531                 peer_req->flags |= EE_HAS_DIGEST;
2532
2533                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2534                         goto out_free_e;
2535
2536                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2537                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2538                         peer_req->w.cb = w_e_end_csum_rs_req;
2539                         /* used in the sector offset progress display */
2540                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2541                 } else if (pi->cmd == P_OV_REPLY) {
2542                         /* track progress, we may need to throttle */
2543                         atomic_add(size >> 9, &device->rs_sect_in);
2544                         peer_req->w.cb = w_e_end_ov_reply;
2545                         dec_rs_pending(device);
2546                         /* drbd_rs_begin_io done when we sent this request,
2547                          * but accounting still needs to be done. */
2548                         goto submit_for_resync;
2549                 }
2550                 break;
2551
2552         case P_OV_REQUEST:
2553                 if (device->ov_start_sector == ~(sector_t)0 &&
2554                     peer_device->connection->agreed_pro_version >= 90) {
2555                         unsigned long now = jiffies;
2556                         int i;
2557                         device->ov_start_sector = sector;
2558                         device->ov_position = sector;
2559                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2560                         device->rs_total = device->ov_left;
2561                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2562                                 device->rs_mark_left[i] = device->ov_left;
2563                                 device->rs_mark_time[i] = now;
2564                         }
2565                         drbd_info(device, "Online Verify start sector: %llu\n",
2566                                         (unsigned long long)sector);
2567                 }
2568                 peer_req->w.cb = w_e_end_ov_req;
2569                 fault_type = DRBD_FAULT_RS_RD;
2570                 break;
2571
2572         default:
2573                 BUG();
2574         }
2575
2576         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2577          * wrt the receiver, but it is not as straightforward as it may seem.
2578          * Various places in the resync start and stop logic assume resync
2579          * requests are processed in order, requeuing this on the worker thread
2580          * introduces a bunch of new code for synchronization between threads.
2581          *
2582          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2583          * "forever", throttling after drbd_rs_begin_io will lock that extent
2584          * for application writes for the same time.  For now, just throttle
2585          * here, where the rest of the code expects the receiver to sleep for
2586          * a while, anyways.
2587          */
2588
2589         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2590          * this defers syncer requests for some time, before letting at least
2591          * on request through.  The resync controller on the receiving side
2592          * will adapt to the incoming rate accordingly.
2593          *
2594          * We cannot throttle here if remote is Primary/SyncTarget:
2595          * we would also throttle its application reads.
2596          * In that case, throttling is done on the SyncTarget only.
2597          */
2598         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2599                 schedule_timeout_uninterruptible(HZ/10);
2600         if (drbd_rs_begin_io(device, sector))
2601                 goto out_free_e;
2602
2603 submit_for_resync:
2604         atomic_add(size >> 9, &device->rs_sect_ev);
2605
2606 submit:
2607         inc_unacked(device);
2608         spin_lock_irq(&device->resource->req_lock);
2609         list_add_tail(&peer_req->w.list, &device->read_ee);
2610         spin_unlock_irq(&device->resource->req_lock);
2611
2612         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2613                 return 0;
2614
2615         /* don't care for the reason here */
2616         drbd_err(device, "submit failed, triggering re-connect\n");
2617         spin_lock_irq(&device->resource->req_lock);
2618         list_del(&peer_req->w.list);
2619         spin_unlock_irq(&device->resource->req_lock);
2620         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2621
2622 out_free_e:
2623         put_ldev(device);
2624         drbd_free_peer_req(device, peer_req);
2625         return -EIO;
2626 }
2627
2628 /**
2629  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2630  */
2631 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2632 {
2633         struct drbd_device *device = peer_device->device;
2634         int self, peer, rv = -100;
2635         unsigned long ch_self, ch_peer;
2636         enum drbd_after_sb_p after_sb_0p;
2637
2638         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2639         peer = device->p_uuid[UI_BITMAP] & 1;
2640
2641         ch_peer = device->p_uuid[UI_SIZE];
2642         ch_self = device->comm_bm_set;
2643
2644         rcu_read_lock();
2645         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2646         rcu_read_unlock();
2647         switch (after_sb_0p) {
2648         case ASB_CONSENSUS:
2649         case ASB_DISCARD_SECONDARY:
2650         case ASB_CALL_HELPER:
2651         case ASB_VIOLENTLY:
2652                 drbd_err(device, "Configuration error.\n");
2653                 break;
2654         case ASB_DISCONNECT:
2655                 break;
2656         case ASB_DISCARD_YOUNGER_PRI:
2657                 if (self == 0 && peer == 1) {
2658                         rv = -1;
2659                         break;
2660                 }
2661                 if (self == 1 && peer == 0) {
2662                         rv =  1;
2663                         break;
2664                 }
2665                 /* Else fall through to one of the other strategies... */
2666         case ASB_DISCARD_OLDER_PRI:
2667                 if (self == 0 && peer == 1) {
2668                         rv = 1;
2669                         break;
2670                 }
2671                 if (self == 1 && peer == 0) {
2672                         rv = -1;
2673                         break;
2674                 }
2675                 /* Else fall through to one of the other strategies... */
2676                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2677                      "Using discard-least-changes instead\n");
2678         case ASB_DISCARD_ZERO_CHG:
2679                 if (ch_peer == 0 && ch_self == 0) {
2680                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2681                                 ? -1 : 1;
2682                         break;
2683                 } else {
2684                         if (ch_peer == 0) { rv =  1; break; }
2685                         if (ch_self == 0) { rv = -1; break; }
2686                 }
2687                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2688                         break;
2689         case ASB_DISCARD_LEAST_CHG:
2690                 if      (ch_self < ch_peer)
2691                         rv = -1;
2692                 else if (ch_self > ch_peer)
2693                         rv =  1;
2694                 else /* ( ch_self == ch_peer ) */
2695                      /* Well, then use something else. */
2696                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2697                                 ? -1 : 1;
2698                 break;
2699         case ASB_DISCARD_LOCAL:
2700                 rv = -1;
2701                 break;
2702         case ASB_DISCARD_REMOTE:
2703                 rv =  1;
2704         }
2705
2706         return rv;
2707 }
2708
2709 /**
2710  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2711  */
2712 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2713 {
2714         struct drbd_device *device = peer_device->device;
2715         int hg, rv = -100;
2716         enum drbd_after_sb_p after_sb_1p;
2717
2718         rcu_read_lock();
2719         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2720         rcu_read_unlock();
2721         switch (after_sb_1p) {
2722         case ASB_DISCARD_YOUNGER_PRI:
2723         case ASB_DISCARD_OLDER_PRI:
2724         case ASB_DISCARD_LEAST_CHG:
2725         case ASB_DISCARD_LOCAL:
2726         case ASB_DISCARD_REMOTE:
2727         case ASB_DISCARD_ZERO_CHG:
2728                 drbd_err(device, "Configuration error.\n");
2729                 break;
2730         case ASB_DISCONNECT:
2731                 break;
2732         case ASB_CONSENSUS:
2733                 hg = drbd_asb_recover_0p(peer_device);
2734                 if (hg == -1 && device->state.role == R_SECONDARY)
2735                         rv = hg;
2736                 if (hg == 1  && device->state.role == R_PRIMARY)
2737                         rv = hg;
2738                 break;
2739         case ASB_VIOLENTLY:
2740                 rv = drbd_asb_recover_0p(peer_device);
2741                 break;
2742         case ASB_DISCARD_SECONDARY:
2743                 return device->state.role == R_PRIMARY ? 1 : -1;
2744         case ASB_CALL_HELPER:
2745                 hg = drbd_asb_recover_0p(peer_device);
2746                 if (hg == -1 && device->state.role == R_PRIMARY) {
2747                         enum drbd_state_rv rv2;
2748
2749                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2750                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2751                           * we do not need to wait for the after state change work either. */
2752                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2753                         if (rv2 != SS_SUCCESS) {
2754                                 drbd_khelper(device, "pri-lost-after-sb");
2755                         } else {
2756                                 drbd_warn(device, "Successfully gave up primary role.\n");
2757                                 rv = hg;
2758                         }
2759                 } else
2760                         rv = hg;
2761         }
2762
2763         return rv;
2764 }
2765
2766 /**
2767  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2768  */
2769 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2770 {
2771         struct drbd_device *device = peer_device->device;
2772         int hg, rv = -100;
2773         enum drbd_after_sb_p after_sb_2p;
2774
2775         rcu_read_lock();
2776         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2777         rcu_read_unlock();
2778         switch (after_sb_2p) {
2779         case ASB_DISCARD_YOUNGER_PRI:
2780         case ASB_DISCARD_OLDER_PRI:
2781         case ASB_DISCARD_LEAST_CHG:
2782         case ASB_DISCARD_LOCAL:
2783         case ASB_DISCARD_REMOTE:
2784         case ASB_CONSENSUS:
2785         case ASB_DISCARD_SECONDARY:
2786         case ASB_DISCARD_ZERO_CHG:
2787                 drbd_err(device, "Configuration error.\n");
2788                 break;
2789         case ASB_VIOLENTLY:
2790                 rv = drbd_asb_recover_0p(peer_device);
2791                 break;
2792         case ASB_DISCONNECT:
2793                 break;
2794         case ASB_CALL_HELPER:
2795                 hg = drbd_asb_recover_0p(peer_device);
2796                 if (hg == -1) {
2797                         enum drbd_state_rv rv2;
2798
2799                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2800                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2801                           * we do not need to wait for the after state change work either. */
2802                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2803                         if (rv2 != SS_SUCCESS) {
2804                                 drbd_khelper(device, "pri-lost-after-sb");
2805                         } else {
2806                                 drbd_warn(device, "Successfully gave up primary role.\n");
2807                                 rv = hg;
2808                         }
2809                 } else
2810                         rv = hg;
2811         }
2812
2813         return rv;
2814 }
2815
2816 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2817                            u64 bits, u64 flags)
2818 {
2819         if (!uuid) {
2820                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2821                 return;
2822         }
2823         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2824              text,
2825              (unsigned long long)uuid[UI_CURRENT],
2826              (unsigned long long)uuid[UI_BITMAP],
2827              (unsigned long long)uuid[UI_HISTORY_START],
2828              (unsigned long long)uuid[UI_HISTORY_END],
2829              (unsigned long long)bits,
2830              (unsigned long long)flags);
2831 }
2832
2833 /*
2834   100   after split brain try auto recover
2835     2   C_SYNC_SOURCE set BitMap
2836     1   C_SYNC_SOURCE use BitMap
2837     0   no Sync
2838    -1   C_SYNC_TARGET use BitMap
2839    -2   C_SYNC_TARGET set BitMap
2840  -100   after split brain, disconnect
2841 -1000   unrelated data
2842 -1091   requires proto 91
2843 -1096   requires proto 96
2844  */
2845 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2846 {
2847         u64 self, peer;
2848         int i, j;
2849
2850         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2851         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2852
2853         *rule_nr = 10;
2854         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2855                 return 0;
2856
2857         *rule_nr = 20;
2858         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2859              peer != UUID_JUST_CREATED)
2860                 return -2;
2861
2862         *rule_nr = 30;
2863         if (self != UUID_JUST_CREATED &&
2864             (peer == UUID_JUST_CREATED || peer == (u64)0))
2865                 return 2;
2866
2867         if (self == peer) {
2868                 int rct, dc; /* roles at crash time */
2869
2870                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2871
2872                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2873                                 return -1091;
2874
2875                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2876                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2877                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2878                                 drbd_uuid_move_history(device);
2879                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2880                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2881
2882                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2883                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2884                                 *rule_nr = 34;
2885                         } else {
2886                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2887                                 *rule_nr = 36;
2888                         }
2889
2890                         return 1;
2891                 }
2892
2893                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2894
2895                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2896                                 return -1091;
2897
2898                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2899                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2900                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2901
2902                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2903                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2904                                 device->p_uuid[UI_BITMAP] = 0UL;
2905
2906                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2907                                 *rule_nr = 35;
2908                         } else {
2909                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2910                                 *rule_nr = 37;
2911                         }
2912
2913                         return -1;
2914                 }
2915
2916                 /* Common power [off|failure] */
2917                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2918                         (device->p_uuid[UI_FLAGS] & 2);
2919                 /* lowest bit is set when we were primary,
2920                  * next bit (weight 2) is set when peer was primary */
2921                 *rule_nr = 40;
2922
2923                 switch (rct) {
2924                 case 0: /* !self_pri && !peer_pri */ return 0;
2925                 case 1: /*  self_pri && !peer_pri */ return 1;
2926                 case 2: /* !self_pri &&  peer_pri */ return -1;
2927                 case 3: /*  self_pri &&  peer_pri */
2928                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2929                         return dc ? -1 : 1;
2930                 }
2931         }
2932
2933         *rule_nr = 50;
2934         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2935         if (self == peer)
2936                 return -1;
2937
2938         *rule_nr = 51;
2939         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2940         if (self == peer) {
2941                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2942                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2943                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2944                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2945                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2946                            resync as sync source modifications of the peer's UUIDs. */
2947
2948                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2949                                 return -1091;
2950
2951                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2952                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2953
2954                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2955                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2956
2957                         return -1;
2958                 }
2959         }
2960
2961         *rule_nr = 60;
2962         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2963         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2964                 peer = device->p_uuid[i] & ~((u64)1);
2965                 if (self == peer)
2966                         return -2;
2967         }
2968
2969         *rule_nr = 70;
2970         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2971         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2972         if (self == peer)
2973                 return 1;
2974
2975         *rule_nr = 71;
2976         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2977         if (self == peer) {
2978                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2979                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2980                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2981                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2982                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2983                            resync as sync source modifications of our UUIDs. */
2984
2985                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2986                                 return -1091;
2987
2988                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2989                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2990
2991                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2992                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2993                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2994
2995                         return 1;
2996                 }
2997         }
2998
2999
3000         *rule_nr = 80;
3001         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3002         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3003                 self = device->ldev->md.uuid[i] & ~((u64)1);
3004                 if (self == peer)
3005                         return 2;
3006         }
3007
3008         *rule_nr = 90;
3009         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3010         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3011         if (self == peer && self != ((u64)0))
3012                 return 100;
3013
3014         *rule_nr = 100;
3015         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3016                 self = device->ldev->md.uuid[i] & ~((u64)1);
3017                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3018                         peer = device->p_uuid[j] & ~((u64)1);
3019                         if (self == peer)
3020                                 return -100;
3021                 }
3022         }
3023
3024         return -1000;
3025 }
3026
3027 /* drbd_sync_handshake() returns the new conn state on success, or
3028    CONN_MASK (-1) on failure.
3029  */
3030 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3031                                            enum drbd_role peer_role,
3032                                            enum drbd_disk_state peer_disk) __must_hold(local)
3033 {
3034         struct drbd_device *device = peer_device->device;
3035         enum drbd_conns rv = C_MASK;
3036         enum drbd_disk_state mydisk;
3037         struct net_conf *nc;
3038         int hg, rule_nr, rr_conflict, tentative;
3039
3040         mydisk = device->state.disk;
3041         if (mydisk == D_NEGOTIATING)
3042                 mydisk = device->new_state_tmp.disk;
3043
3044         drbd_info(device, "drbd_sync_handshake:\n");
3045
3046         spin_lock_irq(&device->ldev->md.uuid_lock);
3047         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3048         drbd_uuid_dump(device, "peer", device->p_uuid,
3049                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3050
3051         hg = drbd_uuid_compare(device, &rule_nr);
3052         spin_unlock_irq(&device->ldev->md.uuid_lock);
3053
3054         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3055
3056         if (hg == -1000) {
3057                 drbd_alert(device, "Unrelated data, aborting!\n");
3058                 return C_MASK;
3059         }
3060         if (hg < -1000) {
3061                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3062                 return C_MASK;
3063         }
3064
3065         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3066             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3067                 int f = (hg == -100) || abs(hg) == 2;
3068                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3069                 if (f)
3070                         hg = hg*2;
3071                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3072                      hg > 0 ? "source" : "target");
3073         }
3074
3075         if (abs(hg) == 100)
3076                 drbd_khelper(device, "initial-split-brain");
3077
3078         rcu_read_lock();
3079         nc = rcu_dereference(peer_device->connection->net_conf);
3080
3081         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3082                 int pcount = (device->state.role == R_PRIMARY)
3083                            + (peer_role == R_PRIMARY);
3084                 int forced = (hg == -100);
3085
3086                 switch (pcount) {
3087                 case 0:
3088                         hg = drbd_asb_recover_0p(peer_device);
3089                         break;
3090                 case 1:
3091                         hg = drbd_asb_recover_1p(peer_device);
3092                         break;
3093                 case 2:
3094                         hg = drbd_asb_recover_2p(peer_device);
3095                         break;
3096                 }
3097                 if (abs(hg) < 100) {
3098                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3099                              "automatically solved. Sync from %s node\n",
3100                              pcount, (hg < 0) ? "peer" : "this");
3101                         if (forced) {
3102                                 drbd_warn(device, "Doing a full sync, since"
3103                                      " UUIDs where ambiguous.\n");
3104                                 hg = hg*2;
3105                         }
3106                 }
3107         }
3108
3109         if (hg == -100) {
3110                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3111                         hg = -1;
3112                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3113                         hg = 1;
3114
3115                 if (abs(hg) < 100)
3116                         drbd_warn(device, "Split-Brain detected, manually solved. "
3117                              "Sync from %s node\n",
3118                              (hg < 0) ? "peer" : "this");
3119         }
3120         rr_conflict = nc->rr_conflict;
3121         tentative = nc->tentative;
3122         rcu_read_unlock();
3123
3124         if (hg == -100) {
3125                 /* FIXME this log message is not correct if we end up here
3126                  * after an attempted attach on a diskless node.
3127                  * We just refuse to attach -- well, we drop the "connection"
3128                  * to that disk, in a way... */
3129                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3130                 drbd_khelper(device, "split-brain");
3131                 return C_MASK;
3132         }
3133
3134         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3135                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3136                 return C_MASK;
3137         }
3138
3139         if (hg < 0 && /* by intention we do not use mydisk here. */
3140             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3141                 switch (rr_conflict) {
3142                 case ASB_CALL_HELPER:
3143                         drbd_khelper(device, "pri-lost");
3144                         /* fall through */
3145                 case ASB_DISCONNECT:
3146                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3147                         return C_MASK;
3148                 case ASB_VIOLENTLY:
3149                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3150                              "assumption\n");
3151                 }
3152         }
3153
3154         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3155                 if (hg == 0)
3156                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3157                 else
3158                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3159                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3160                                  abs(hg) >= 2 ? "full" : "bit-map based");
3161                 return C_MASK;
3162         }
3163
3164         if (abs(hg) >= 2) {
3165                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3166                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3167                                         BM_LOCKED_SET_ALLOWED))
3168                         return C_MASK;
3169         }
3170
3171         if (hg > 0) { /* become sync source. */
3172                 rv = C_WF_BITMAP_S;
3173         } else if (hg < 0) { /* become sync target */
3174                 rv = C_WF_BITMAP_T;
3175         } else {
3176                 rv = C_CONNECTED;
3177                 if (drbd_bm_total_weight(device)) {
3178                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3179                              drbd_bm_total_weight(device));
3180                 }
3181         }
3182
3183         return rv;
3184 }
3185
3186 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3187 {
3188         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3189         if (peer == ASB_DISCARD_REMOTE)
3190                 return ASB_DISCARD_LOCAL;
3191
3192         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3193         if (peer == ASB_DISCARD_LOCAL)
3194                 return ASB_DISCARD_REMOTE;
3195
3196         /* everything else is valid if they are equal on both sides. */
3197         return peer;
3198 }
3199
3200 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3201 {
3202         struct p_protocol *p = pi->data;
3203         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3204         int p_proto, p_discard_my_data, p_two_primaries, cf;
3205         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3206         char integrity_alg[SHARED_SECRET_MAX] = "";
3207         struct crypto_hash *peer_integrity_tfm = NULL;
3208         void *int_dig_in = NULL, *int_dig_vv = NULL;
3209
3210         p_proto         = be32_to_cpu(p->protocol);
3211         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3212         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3213         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3214         p_two_primaries = be32_to_cpu(p->two_primaries);
3215         cf              = be32_to_cpu(p->conn_flags);
3216         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3217
3218         if (connection->agreed_pro_version >= 87) {
3219                 int err;
3220
3221                 if (pi->size > sizeof(integrity_alg))
3222                         return -EIO;
3223                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3224                 if (err)
3225                         return err;
3226                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3227         }
3228
3229         if (pi->cmd != P_PROTOCOL_UPDATE) {
3230                 clear_bit(CONN_DRY_RUN, &connection->flags);
3231
3232                 if (cf & CF_DRY_RUN)
3233                         set_bit(CONN_DRY_RUN, &connection->flags);
3234
3235                 rcu_read_lock();
3236                 nc = rcu_dereference(connection->net_conf);
3237
3238                 if (p_proto != nc->wire_protocol) {
3239                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3240                         goto disconnect_rcu_unlock;
3241                 }
3242
3243                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3244                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3245                         goto disconnect_rcu_unlock;
3246                 }
3247
3248                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3249                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3250                         goto disconnect_rcu_unlock;
3251                 }
3252
3253                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3254                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3255                         goto disconnect_rcu_unlock;
3256                 }
3257
3258                 if (p_discard_my_data && nc->discard_my_data) {
3259                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3260                         goto disconnect_rcu_unlock;
3261                 }
3262
3263                 if (p_two_primaries != nc->two_primaries) {
3264                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3265                         goto disconnect_rcu_unlock;
3266                 }
3267
3268                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3269                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3270                         goto disconnect_rcu_unlock;
3271                 }
3272
3273                 rcu_read_unlock();
3274         }
3275
3276         if (integrity_alg[0]) {
3277                 int hash_size;
3278
3279                 /*
3280                  * We can only change the peer data integrity algorithm
3281                  * here.  Changing our own data integrity algorithm
3282                  * requires that we send a P_PROTOCOL_UPDATE packet at
3283                  * the same time; otherwise, the peer has no way to
3284                  * tell between which packets the algorithm should
3285                  * change.
3286                  */
3287
3288                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3289                 if (!peer_integrity_tfm) {
3290                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3291                                  integrity_alg);
3292                         goto disconnect;
3293                 }
3294
3295                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3296                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3297                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3298                 if (!(int_dig_in && int_dig_vv)) {
3299                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3300                         goto disconnect;
3301                 }
3302         }
3303
3304         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3305         if (!new_net_conf) {
3306                 drbd_err(connection, "Allocation of new net_conf failed\n");
3307                 goto disconnect;
3308         }
3309
3310         mutex_lock(&connection->data.mutex);
3311         mutex_lock(&connection->resource->conf_update);
3312         old_net_conf = connection->net_conf;
3313         *new_net_conf = *old_net_conf;
3314
3315         new_net_conf->wire_protocol = p_proto;
3316         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3317         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3318         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3319         new_net_conf->two_primaries = p_two_primaries;
3320
3321         rcu_assign_pointer(connection->net_conf, new_net_conf);
3322         mutex_unlock(&connection->resource->conf_update);
3323         mutex_unlock(&connection->data.mutex);
3324
3325         crypto_free_hash(connection->peer_integrity_tfm);
3326         kfree(connection->int_dig_in);
3327         kfree(connection->int_dig_vv);
3328         connection->peer_integrity_tfm = peer_integrity_tfm;
3329         connection->int_dig_in = int_dig_in;
3330         connection->int_dig_vv = int_dig_vv;
3331
3332         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3333                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3334                           integrity_alg[0] ? integrity_alg : "(none)");
3335
3336         synchronize_rcu();
3337         kfree(old_net_conf);
3338         return 0;
3339
3340 disconnect_rcu_unlock:
3341         rcu_read_unlock();
3342 disconnect:
3343         crypto_free_hash(peer_integrity_tfm);
3344         kfree(int_dig_in);
3345         kfree(int_dig_vv);
3346         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3347         return -EIO;
3348 }
3349
3350 /* helper function
3351  * input: alg name, feature name
3352  * return: NULL (alg name was "")
3353  *         ERR_PTR(error) if something goes wrong
3354  *         or the crypto hash ptr, if it worked out ok. */
3355 static
3356 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3357                 const char *alg, const char *name)
3358 {
3359         struct crypto_hash *tfm;
3360
3361         if (!alg[0])
3362                 return NULL;
3363
3364         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3365         if (IS_ERR(tfm)) {
3366                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3367                         alg, name, PTR_ERR(tfm));
3368                 return tfm;
3369         }
3370         return tfm;
3371 }
3372
3373 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3374 {
3375         void *buffer = connection->data.rbuf;
3376         int size = pi->size;
3377
3378         while (size) {
3379                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3380                 s = drbd_recv(connection, buffer, s);
3381                 if (s <= 0) {
3382                         if (s < 0)
3383                                 return s;
3384                         break;
3385                 }
3386                 size -= s;
3387         }
3388         if (size)
3389                 return -EIO;
3390         return 0;
3391 }
3392
3393 /*
3394  * config_unknown_volume  -  device configuration command for unknown volume
3395  *
3396  * When a device is added to an existing connection, the node on which the
3397  * device is added first will send configuration commands to its peer but the
3398  * peer will not know about the device yet.  It will warn and ignore these
3399  * commands.  Once the device is added on the second node, the second node will
3400  * send the same device configuration commands, but in the other direction.
3401  *
3402  * (We can also end up here if drbd is misconfigured.)
3403  */
3404 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3405 {
3406         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3407                   cmdname(pi->cmd), pi->vnr);
3408         return ignore_remaining_packet(connection, pi);
3409 }
3410
3411 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3412 {
3413         struct drbd_peer_device *peer_device;
3414         struct drbd_device *device;
3415         struct p_rs_param_95 *p;
3416         unsigned int header_size, data_size, exp_max_sz;
3417         struct crypto_hash *verify_tfm = NULL;
3418         struct crypto_hash *csums_tfm = NULL;
3419         struct net_conf *old_net_conf, *new_net_conf = NULL;
3420         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3421         const int apv = connection->agreed_pro_version;
3422         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3423         int fifo_size = 0;
3424         int err;
3425
3426         peer_device = conn_peer_device(connection, pi->vnr);
3427         if (!peer_device)
3428                 return config_unknown_volume(connection, pi);
3429         device = peer_device->device;
3430
3431         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3432                     : apv == 88 ? sizeof(struct p_rs_param)
3433                                         + SHARED_SECRET_MAX
3434                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3435                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3436
3437         if (pi->size > exp_max_sz) {
3438                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3439                     pi->size, exp_max_sz);
3440                 return -EIO;
3441         }
3442
3443         if (apv <= 88) {
3444                 header_size = sizeof(struct p_rs_param);
3445                 data_size = pi->size - header_size;
3446         } else if (apv <= 94) {
3447                 header_size = sizeof(struct p_rs_param_89);
3448                 data_size = pi->size - header_size;
3449                 D_ASSERT(device, data_size == 0);
3450         } else {
3451                 header_size = sizeof(struct p_rs_param_95);
3452                 data_size = pi->size - header_size;
3453                 D_ASSERT(device, data_size == 0);
3454         }
3455
3456         /* initialize verify_alg and csums_alg */
3457         p = pi->data;
3458         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3459
3460         err = drbd_recv_all(peer_device->connection, p, header_size);
3461         if (err)
3462                 return err;
3463
3464         mutex_lock(&connection->resource->conf_update);
3465         old_net_conf = peer_device->connection->net_conf;
3466         if (get_ldev(device)) {
3467                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3468                 if (!new_disk_conf) {
3469                         put_ldev(device);
3470                         mutex_unlock(&connection->resource->conf_update);
3471                         drbd_err(device, "Allocation of new disk_conf failed\n");
3472                         return -ENOMEM;
3473                 }
3474
3475                 old_disk_conf = device->ldev->disk_conf;
3476                 *new_disk_conf = *old_disk_conf;
3477
3478                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3479         }
3480
3481         if (apv >= 88) {
3482                 if (apv == 88) {
3483                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3484                                 drbd_err(device, "verify-alg of wrong size, "
3485                                         "peer wants %u, accepting only up to %u byte\n",
3486                                         data_size, SHARED_SECRET_MAX);
3487                                 err = -EIO;
3488                                 goto reconnect;
3489                         }
3490
3491                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3492                         if (err)
3493                                 goto reconnect;
3494                         /* we expect NUL terminated string */
3495                         /* but just in case someone tries to be evil */
3496                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3497                         p->verify_alg[data_size-1] = 0;
3498
3499                 } else /* apv >= 89 */ {
3500                         /* we still expect NUL terminated strings */
3501                         /* but just in case someone tries to be evil */
3502                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3503                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3504                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3505                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3506                 }
3507
3508                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3509                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3510                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3511                                     old_net_conf->verify_alg, p->verify_alg);
3512                                 goto disconnect;
3513                         }
3514                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3515                                         p->verify_alg, "verify-alg");
3516                         if (IS_ERR(verify_tfm)) {
3517                                 verify_tfm = NULL;
3518                                 goto disconnect;
3519                         }
3520                 }
3521
3522                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3523                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3524                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3525                                     old_net_conf->csums_alg, p->csums_alg);
3526                                 goto disconnect;
3527                         }
3528                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3529                                         p->csums_alg, "csums-alg");
3530                         if (IS_ERR(csums_tfm)) {
3531                                 csums_tfm = NULL;
3532                                 goto disconnect;
3533                         }
3534                 }
3535
3536                 if (apv > 94 && new_disk_conf) {
3537                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3538                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3539                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3540                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3541
3542                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3543                         if (fifo_size != device->rs_plan_s->size) {
3544                                 new_plan = fifo_alloc(fifo_size);
3545                                 if (!new_plan) {
3546                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3547                                         put_ldev(device);
3548                                         goto disconnect;
3549                                 }
3550                         }
3551                 }
3552
3553                 if (verify_tfm || csums_tfm) {
3554                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3555                         if (!new_net_conf) {
3556                                 drbd_err(device, "Allocation of new net_conf failed\n");
3557                                 goto disconnect;
3558                         }
3559
3560                         *new_net_conf = *old_net_conf;
3561
3562                         if (verify_tfm) {
3563                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3564                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3565                                 crypto_free_hash(peer_device->connection->verify_tfm);
3566                                 peer_device->connection->verify_tfm = verify_tfm;
3567                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3568                         }
3569                         if (csums_tfm) {
3570                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3571                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3572                                 crypto_free_hash(peer_device->connection->csums_tfm);
3573                                 peer_device->connection->csums_tfm = csums_tfm;
3574                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3575                         }
3576                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3577                 }
3578         }
3579
3580         if (new_disk_conf) {
3581                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3582                 put_ldev(device);
3583         }
3584
3585         if (new_plan) {
3586                 old_plan = device->rs_plan_s;
3587                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3588         }
3589
3590         mutex_unlock(&connection->resource->conf_update);
3591         synchronize_rcu();
3592         if (new_net_conf)
3593                 kfree(old_net_conf);
3594         kfree(old_disk_conf);
3595         kfree(old_plan);
3596
3597         return 0;
3598
3599 reconnect:
3600         if (new_disk_conf) {
3601                 put_ldev(device);
3602                 kfree(new_disk_conf);
3603         }
3604         mutex_unlock(&connection->resource->conf_update);
3605         return -EIO;
3606
3607 disconnect:
3608         kfree(new_plan);
3609         if (new_disk_conf) {
3610                 put_ldev(device);
3611                 kfree(new_disk_conf);
3612         }
3613         mutex_unlock(&connection->resource->conf_update);
3614         /* just for completeness: actually not needed,
3615          * as this is not reached if csums_tfm was ok. */
3616         crypto_free_hash(csums_tfm);
3617         /* but free the verify_tfm again, if csums_tfm did not work out */
3618         crypto_free_hash(verify_tfm);
3619         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3620         return -EIO;
3621 }
3622
3623 /* warn if the arguments differ by more than 12.5% */
3624 static void warn_if_differ_considerably(struct drbd_device *device,
3625         const char *s, sector_t a, sector_t b)
3626 {
3627         sector_t d;
3628         if (a == 0 || b == 0)
3629                 return;
3630         d = (a > b) ? (a - b) : (b - a);
3631         if (d > (a>>3) || d > (b>>3))
3632                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3633                      (unsigned long long)a, (unsigned long long)b);
3634 }
3635
3636 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3637 {
3638         struct drbd_peer_device *peer_device;
3639         struct drbd_device *device;
3640         struct p_sizes *p = pi->data;
3641         enum determine_dev_size dd = DS_UNCHANGED;
3642         sector_t p_size, p_usize, my_usize;
3643         int ldsc = 0; /* local disk size changed */
3644         enum dds_flags ddsf;
3645
3646         peer_device = conn_peer_device(connection, pi->vnr);
3647         if (!peer_device)
3648                 return config_unknown_volume(connection, pi);
3649         device = peer_device->device;
3650
3651         p_size = be64_to_cpu(p->d_size);
3652         p_usize = be64_to_cpu(p->u_size);
3653
3654         /* just store the peer's disk size for now.
3655          * we still need to figure out whether we accept that. */
3656         device->p_size = p_size;
3657
3658         if (get_ldev(device)) {
3659                 rcu_read_lock();
3660                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3661                 rcu_read_unlock();
3662
3663                 warn_if_differ_considerably(device, "lower level device sizes",
3664                            p_size, drbd_get_max_capacity(device->ldev));
3665                 warn_if_differ_considerably(device, "user requested size",
3666                                             p_usize, my_usize);
3667
3668                 /* if this is the first connect, or an otherwise expected
3669                  * param exchange, choose the minimum */
3670                 if (device->state.conn == C_WF_REPORT_PARAMS)
3671                         p_usize = min_not_zero(my_usize, p_usize);
3672
3673                 /* Never shrink a device with usable data during connect.
3674                    But allow online shrinking if we are connected. */
3675                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3676                     drbd_get_capacity(device->this_bdev) &&
3677                     device->state.disk >= D_OUTDATED &&
3678                     device->state.conn < C_CONNECTED) {
3679                         drbd_err(device, "The peer's disk size is too small!\n");
3680                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3681                         put_ldev(device);
3682                         return -EIO;
3683                 }
3684
3685                 if (my_usize != p_usize) {
3686                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3687
3688                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3689                         if (!new_disk_conf) {
3690                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3691                                 put_ldev(device);
3692                                 return -ENOMEM;
3693                         }
3694
3695                         mutex_lock(&connection->resource->conf_update);
3696                         old_disk_conf = device->ldev->disk_conf;
3697                         *new_disk_conf = *old_disk_conf;
3698                         new_disk_conf->disk_size = p_usize;
3699
3700                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3701                         mutex_unlock(&connection->resource->conf_update);
3702                         synchronize_rcu();
3703                         kfree(old_disk_conf);
3704
3705                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3706                                  (unsigned long)my_usize);
3707                 }
3708
3709                 put_ldev(device);
3710         }
3711
3712         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3713         drbd_reconsider_max_bio_size(device);
3714         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3715            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3716            drbd_reconsider_max_bio_size(), we can be sure that after
3717            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3718
3719         ddsf = be16_to_cpu(p->dds_flags);
3720         if (get_ldev(device)) {
3721                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3722                 put_ldev(device);
3723                 if (dd == DS_ERROR)
3724                         return -EIO;
3725                 drbd_md_sync(device);
3726         } else {
3727                 /* I am diskless, need to accept the peer's size. */
3728                 drbd_set_my_capacity(device, p_size);
3729         }
3730
3731         if (get_ldev(device)) {
3732                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3733                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3734                         ldsc = 1;
3735                 }
3736
3737                 put_ldev(device);
3738         }
3739
3740         if (device->state.conn > C_WF_REPORT_PARAMS) {
3741                 if (be64_to_cpu(p->c_size) !=
3742                     drbd_get_capacity(device->this_bdev) || ldsc) {
3743                         /* we have different sizes, probably peer
3744                          * needs to know my new size... */
3745                         drbd_send_sizes(peer_device, 0, ddsf);
3746                 }
3747                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3748                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3749                         if (device->state.pdsk >= D_INCONSISTENT &&
3750                             device->state.disk >= D_INCONSISTENT) {
3751                                 if (ddsf & DDSF_NO_RESYNC)
3752                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3753                                 else
3754                                         resync_after_online_grow(device);
3755                         } else
3756                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3757                 }
3758         }
3759
3760         return 0;
3761 }
3762
3763 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3764 {
3765         struct drbd_peer_device *peer_device;
3766         struct drbd_device *device;
3767         struct p_uuids *p = pi->data;
3768         u64 *p_uuid;
3769         int i, updated_uuids = 0;
3770
3771         peer_device = conn_peer_device(connection, pi->vnr);
3772         if (!peer_device)
3773                 return config_unknown_volume(connection, pi);
3774         device = peer_device->device;
3775
3776         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3777         if (!p_uuid) {
3778                 drbd_err(device, "kmalloc of p_uuid failed\n");
3779                 return false;
3780         }
3781
3782         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3783                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3784
3785         kfree(device->p_uuid);
3786         device->p_uuid = p_uuid;
3787
3788         if (device->state.conn < C_CONNECTED &&
3789             device->state.disk < D_INCONSISTENT &&
3790             device->state.role == R_PRIMARY &&
3791             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3792                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3793                     (unsigned long long)device->ed_uuid);
3794                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3795                 return -EIO;
3796         }
3797
3798         if (get_ldev(device)) {
3799                 int skip_initial_sync =
3800                         device->state.conn == C_CONNECTED &&
3801                         peer_device->connection->agreed_pro_version >= 90 &&
3802                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3803                         (p_uuid[UI_FLAGS] & 8);
3804                 if (skip_initial_sync) {
3805                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3806                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3807                                         "clear_n_write from receive_uuids",
3808                                         BM_LOCKED_TEST_ALLOWED);
3809                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3810                         _drbd_uuid_set(device, UI_BITMAP, 0);
3811                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3812                                         CS_VERBOSE, NULL);
3813                         drbd_md_sync(device);
3814                         updated_uuids = 1;
3815                 }
3816                 put_ldev(device);
3817         } else if (device->state.disk < D_INCONSISTENT &&
3818                    device->state.role == R_PRIMARY) {
3819                 /* I am a diskless primary, the peer just created a new current UUID
3820                    for me. */
3821                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3822         }
3823
3824         /* Before we test for the disk state, we should wait until an eventually
3825            ongoing cluster wide state change is finished. That is important if
3826            we are primary and are detaching from our disk. We need to see the
3827            new disk state... */
3828         mutex_lock(device->state_mutex);
3829         mutex_unlock(device->state_mutex);
3830         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3831                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3832
3833         if (updated_uuids)
3834                 drbd_print_uuids(device, "receiver updated UUIDs to");
3835
3836         return 0;
3837 }
3838
3839 /**
3840  * convert_state() - Converts the peer's view of the cluster state to our point of view
3841  * @ps:         The state as seen by the peer.
3842  */
3843 static union drbd_state convert_state(union drbd_state ps)
3844 {
3845         union drbd_state ms;
3846
3847         static enum drbd_conns c_tab[] = {
3848                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3849                 [C_CONNECTED] = C_CONNECTED,
3850
3851                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3852                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3853                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3854                 [C_VERIFY_S]       = C_VERIFY_T,
3855                 [C_MASK]   = C_MASK,
3856         };
3857
3858         ms.i = ps.i;
3859
3860         ms.conn = c_tab[ps.conn];
3861         ms.peer = ps.role;
3862         ms.role = ps.peer;
3863         ms.pdsk = ps.disk;
3864         ms.disk = ps.pdsk;
3865         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3866
3867         return ms;
3868 }
3869
3870 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3871 {
3872         struct drbd_peer_device *peer_device;
3873         struct drbd_device *device;
3874         struct p_req_state *p = pi->data;
3875         union drbd_state mask, val;
3876         enum drbd_state_rv rv;
3877
3878         peer_device = conn_peer_device(connection, pi->vnr);
3879         if (!peer_device)
3880                 return -EIO;
3881         device = peer_device->device;
3882
3883         mask.i = be32_to_cpu(p->mask);
3884         val.i = be32_to_cpu(p->val);
3885
3886         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3887             mutex_is_locked(device->state_mutex)) {
3888                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3889                 return 0;
3890         }
3891
3892         mask = convert_state(mask);
3893         val = convert_state(val);
3894
3895         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3896         drbd_send_sr_reply(peer_device, rv);
3897
3898         drbd_md_sync(device);
3899
3900         return 0;
3901 }
3902
3903 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3904 {
3905         struct p_req_state *p = pi->data;
3906         union drbd_state mask, val;
3907         enum drbd_state_rv rv;
3908
3909         mask.i = be32_to_cpu(p->mask);
3910         val.i = be32_to_cpu(p->val);
3911
3912         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3913             mutex_is_locked(&connection->cstate_mutex)) {
3914                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3915                 return 0;
3916         }
3917
3918         mask = convert_state(mask);
3919         val = convert_state(val);
3920
3921         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3922         conn_send_sr_reply(connection, rv);
3923
3924         return 0;
3925 }
3926
3927 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3928 {
3929         struct drbd_peer_device *peer_device;
3930         struct drbd_device *device;
3931         struct p_state *p = pi->data;
3932         union drbd_state os, ns, peer_state;
3933         enum drbd_disk_state real_peer_disk;
3934         enum chg_state_flags cs_flags;
3935         int rv;
3936
3937         peer_device = conn_peer_device(connection, pi->vnr);
3938         if (!peer_device)
3939                 return config_unknown_volume(connection, pi);
3940         device = peer_device->device;
3941
3942         peer_state.i = be32_to_cpu(p->state);
3943
3944         real_peer_disk = peer_state.disk;
3945         if (peer_state.disk == D_NEGOTIATING) {
3946                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3947                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3948         }
3949
3950         spin_lock_irq(&device->resource->req_lock);
3951  retry:
3952         os = ns = drbd_read_state(device);
3953         spin_unlock_irq(&device->resource->req_lock);
3954
3955         /* If some other part of the code (asender thread, timeout)
3956          * already decided to close the connection again,
3957          * we must not "re-establish" it here. */
3958         if (os.conn <= C_TEAR_DOWN)
3959                 return -ECONNRESET;
3960
3961         /* If this is the "end of sync" confirmation, usually the peer disk
3962          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3963          * set) resync started in PausedSyncT, or if the timing of pause-/
3964          * unpause-sync events has been "just right", the peer disk may
3965          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3966          */
3967         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3968             real_peer_disk == D_UP_TO_DATE &&
3969             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3970                 /* If we are (becoming) SyncSource, but peer is still in sync
3971                  * preparation, ignore its uptodate-ness to avoid flapping, it
3972                  * will change to inconsistent once the peer reaches active
3973                  * syncing states.
3974                  * It may have changed syncer-paused flags, however, so we
3975                  * cannot ignore this completely. */
3976                 if (peer_state.conn > C_CONNECTED &&
3977                     peer_state.conn < C_SYNC_SOURCE)
3978                         real_peer_disk = D_INCONSISTENT;
3979
3980                 /* if peer_state changes to connected at the same time,
3981                  * it explicitly notifies us that it finished resync.
3982                  * Maybe we should finish it up, too? */
3983                 else if (os.conn >= C_SYNC_SOURCE &&
3984                          peer_state.conn == C_CONNECTED) {
3985                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3986                                 drbd_resync_finished(device);
3987                         return 0;
3988                 }
3989         }
3990
3991         /* explicit verify finished notification, stop sector reached. */
3992         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3993             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3994                 ov_out_of_sync_print(device);
3995                 drbd_resync_finished(device);
3996                 return 0;
3997         }
3998
3999         /* peer says his disk is inconsistent, while we think it is uptodate,
4000          * and this happens while the peer still thinks we have a sync going on,
4001          * but we think we are already done with the sync.
4002          * We ignore this to avoid flapping pdsk.
4003          * This should not happen, if the peer is a recent version of drbd. */
4004         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4005             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4006                 real_peer_disk = D_UP_TO_DATE;
4007
4008         if (ns.conn == C_WF_REPORT_PARAMS)
4009                 ns.conn = C_CONNECTED;
4010
4011         if (peer_state.conn == C_AHEAD)
4012                 ns.conn = C_BEHIND;
4013
4014         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4015             get_ldev_if_state(device, D_NEGOTIATING)) {
4016                 int cr; /* consider resync */
4017
4018                 /* if we established a new connection */
4019                 cr  = (os.conn < C_CONNECTED);
4020                 /* if we had an established connection
4021                  * and one of the nodes newly attaches a disk */
4022                 cr |= (os.conn == C_CONNECTED &&
4023                        (peer_state.disk == D_NEGOTIATING ||
4024                         os.disk == D_NEGOTIATING));
4025                 /* if we have both been inconsistent, and the peer has been
4026                  * forced to be UpToDate with --overwrite-data */
4027                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4028                 /* if we had been plain connected, and the admin requested to
4029                  * start a sync by "invalidate" or "invalidate-remote" */
4030                 cr |= (os.conn == C_CONNECTED &&
4031                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4032                                  peer_state.conn <= C_WF_BITMAP_T));
4033
4034                 if (cr)
4035                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4036
4037                 put_ldev(device);
4038                 if (ns.conn == C_MASK) {
4039                         ns.conn = C_CONNECTED;
4040                         if (device->state.disk == D_NEGOTIATING) {
4041                                 drbd_force_state(device, NS(disk, D_FAILED));
4042                         } else if (peer_state.disk == D_NEGOTIATING) {
4043                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4044                                 peer_state.disk = D_DISKLESS;
4045                                 real_peer_disk = D_DISKLESS;
4046                         } else {
4047                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4048                                         return -EIO;
4049                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4050                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4051                                 return -EIO;
4052                         }
4053                 }
4054         }
4055
4056         spin_lock_irq(&device->resource->req_lock);
4057         if (os.i != drbd_read_state(device).i)
4058                 goto retry;
4059         clear_bit(CONSIDER_RESYNC, &device->flags);
4060         ns.peer = peer_state.role;
4061         ns.pdsk = real_peer_disk;
4062         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4063         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4064                 ns.disk = device->new_state_tmp.disk;
4065         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4066         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4067             test_bit(NEW_CUR_UUID, &device->flags)) {
4068                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4069                    for temporal network outages! */
4070                 spin_unlock_irq(&device->resource->req_lock);
4071                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4072                 tl_clear(peer_device->connection);
4073                 drbd_uuid_new_current(device);
4074                 clear_bit(NEW_CUR_UUID, &device->flags);
4075                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4076                 return -EIO;
4077         }
4078         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4079         ns = drbd_read_state(device);
4080         spin_unlock_irq(&device->resource->req_lock);
4081
4082         if (rv < SS_SUCCESS) {
4083                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4084                 return -EIO;
4085         }
4086
4087         if (os.conn > C_WF_REPORT_PARAMS) {
4088                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4089                     peer_state.disk != D_NEGOTIATING ) {
4090                         /* we want resync, peer has not yet decided to sync... */
4091                         /* Nowadays only used when forcing a node into primary role and
4092                            setting its disk to UpToDate with that */
4093                         drbd_send_uuids(peer_device);
4094                         drbd_send_current_state(peer_device);
4095                 }
4096         }
4097
4098         clear_bit(DISCARD_MY_DATA, &device->flags);
4099
4100         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4101
4102         return 0;
4103 }
4104
4105 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4106 {
4107         struct drbd_peer_device *peer_device;
4108         struct drbd_device *device;
4109         struct p_rs_uuid *p = pi->data;
4110
4111         peer_device = conn_peer_device(connection, pi->vnr);
4112         if (!peer_device)
4113                 return -EIO;
4114         device = peer_device->device;
4115
4116         wait_event(device->misc_wait,
4117                    device->state.conn == C_WF_SYNC_UUID ||
4118                    device->state.conn == C_BEHIND ||
4119                    device->state.conn < C_CONNECTED ||
4120                    device->state.disk < D_NEGOTIATING);
4121
4122         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4123
4124         /* Here the _drbd_uuid_ functions are right, current should
4125            _not_ be rotated into the history */
4126         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4127                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4128                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4129
4130                 drbd_print_uuids(device, "updated sync uuid");
4131                 drbd_start_resync(device, C_SYNC_TARGET);
4132
4133                 put_ldev(device);
4134         } else
4135                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4136
4137         return 0;
4138 }
4139
4140 /**
4141  * receive_bitmap_plain
4142  *
4143  * Return 0 when done, 1 when another iteration is needed, and a negative error
4144  * code upon failure.
4145  */
4146 static int
4147 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4148                      unsigned long *p, struct bm_xfer_ctx *c)
4149 {
4150         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4151                                  drbd_header_size(peer_device->connection);
4152         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4153                                        c->bm_words - c->word_offset);
4154         unsigned int want = num_words * sizeof(*p);
4155         int err;
4156
4157         if (want != size) {
4158                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4159                 return -EIO;
4160         }
4161         if (want == 0)
4162                 return 0;
4163         err = drbd_recv_all(peer_device->connection, p, want);
4164         if (err)
4165                 return err;
4166
4167         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4168
4169         c->word_offset += num_words;
4170         c->bit_offset = c->word_offset * BITS_PER_LONG;
4171         if (c->bit_offset > c->bm_bits)
4172                 c->bit_offset = c->bm_bits;
4173
4174         return 1;
4175 }
4176
4177 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4178 {
4179         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4180 }
4181
4182 static int dcbp_get_start(struct p_compressed_bm *p)
4183 {
4184         return (p->encoding & 0x80) != 0;
4185 }
4186
4187 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4188 {
4189         return (p->encoding >> 4) & 0x7;
4190 }
4191
4192 /**
4193  * recv_bm_rle_bits
4194  *
4195  * Return 0 when done, 1 when another iteration is needed, and a negative error
4196  * code upon failure.
4197  */
4198 static int
4199 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4200                 struct p_compressed_bm *p,
4201                  struct bm_xfer_ctx *c,
4202                  unsigned int len)
4203 {
4204         struct bitstream bs;
4205         u64 look_ahead;
4206         u64 rl;
4207         u64 tmp;
4208         unsigned long s = c->bit_offset;
4209         unsigned long e;
4210         int toggle = dcbp_get_start(p);
4211         int have;
4212         int bits;
4213
4214         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4215
4216         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4217         if (bits < 0)
4218                 return -EIO;
4219
4220         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4221                 bits = vli_decode_bits(&rl, look_ahead);
4222                 if (bits <= 0)
4223                         return -EIO;
4224
4225                 if (toggle) {
4226                         e = s + rl -1;
4227                         if (e >= c->bm_bits) {
4228                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4229                                 return -EIO;
4230                         }
4231                         _drbd_bm_set_bits(peer_device->device, s, e);
4232                 }
4233
4234                 if (have < bits) {
4235                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4236                                 have, bits, look_ahead,
4237                                 (unsigned int)(bs.cur.b - p->code),
4238                                 (unsigned int)bs.buf_len);
4239                         return -EIO;
4240                 }
4241                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4242                 if (likely(bits < 64))
4243                         look_ahead >>= bits;
4244                 else
4245                         look_ahead = 0;
4246                 have -= bits;
4247
4248                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4249                 if (bits < 0)
4250                         return -EIO;
4251                 look_ahead |= tmp << have;
4252                 have += bits;
4253         }
4254
4255         c->bit_offset = s;
4256         bm_xfer_ctx_bit_to_word_offset(c);
4257
4258         return (s != c->bm_bits);
4259 }
4260
4261 /**
4262  * decode_bitmap_c
4263  *
4264  * Return 0 when done, 1 when another iteration is needed, and a negative error
4265  * code upon failure.
4266  */
4267 static int
4268 decode_bitmap_c(struct drbd_peer_device *peer_device,
4269                 struct p_compressed_bm *p,
4270                 struct bm_xfer_ctx *c,
4271                 unsigned int len)
4272 {
4273         if (dcbp_get_code(p) == RLE_VLI_Bits)
4274                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4275
4276         /* other variants had been implemented for evaluation,
4277          * but have been dropped as this one turned out to be "best"
4278          * during all our tests. */
4279
4280         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4281         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4282         return -EIO;
4283 }
4284
4285 void INFO_bm_xfer_stats(struct drbd_device *device,
4286                 const char *direction, struct bm_xfer_ctx *c)
4287 {
4288         /* what would it take to transfer it "plaintext" */
4289         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4290         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4291         unsigned int plain =
4292                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4293                 c->bm_words * sizeof(unsigned long);
4294         unsigned int total = c->bytes[0] + c->bytes[1];
4295         unsigned int r;
4296
4297         /* total can not be zero. but just in case: */
4298         if (total == 0)
4299                 return;
4300
4301         /* don't report if not compressed */
4302         if (total >= plain)
4303                 return;
4304
4305         /* total < plain. check for overflow, still */
4306         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4307                                     : (1000 * total / plain);
4308
4309         if (r > 1000)
4310                 r = 1000;
4311
4312         r = 1000 - r;
4313         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4314              "total %u; compression: %u.%u%%\n",
4315                         direction,
4316                         c->bytes[1], c->packets[1],
4317                         c->bytes[0], c->packets[0],
4318                         total, r/10, r % 10);
4319 }
4320
4321 /* Since we are processing the bitfield from lower addresses to higher,
4322    it does not matter if the process it in 32 bit chunks or 64 bit
4323    chunks as long as it is little endian. (Understand it as byte stream,
4324    beginning with the lowest byte...) If we would use big endian
4325    we would need to process it from the highest address to the lowest,
4326    in order to be agnostic to the 32 vs 64 bits issue.
4327
4328    returns 0 on failure, 1 if we successfully received it. */
4329 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4330 {
4331         struct drbd_peer_device *peer_device;
4332         struct drbd_device *device;
4333         struct bm_xfer_ctx c;
4334         int err;
4335
4336         peer_device = conn_peer_device(connection, pi->vnr);
4337         if (!peer_device)
4338                 return -EIO;
4339         device = peer_device->device;
4340
4341         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4342         /* you are supposed to send additional out-of-sync information
4343          * if you actually set bits during this phase */
4344
4345         c = (struct bm_xfer_ctx) {
4346                 .bm_bits = drbd_bm_bits(device),
4347                 .bm_words = drbd_bm_words(device),
4348         };
4349
4350         for(;;) {
4351                 if (pi->cmd == P_BITMAP)
4352                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4353                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4354                         /* MAYBE: sanity check that we speak proto >= 90,
4355                          * and the feature is enabled! */
4356                         struct p_compressed_bm *p = pi->data;
4357
4358                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4359                                 drbd_err(device, "ReportCBitmap packet too large\n");
4360                                 err = -EIO;
4361                                 goto out;
4362                         }
4363                         if (pi->size <= sizeof(*p)) {
4364                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4365                                 err = -EIO;
4366                                 goto out;
4367                         }
4368                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4369                         if (err)
4370                                goto out;
4371                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4372                 } else {
4373                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4374                         err = -EIO;
4375                         goto out;
4376                 }
4377
4378                 c.packets[pi->cmd == P_BITMAP]++;
4379                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4380
4381                 if (err <= 0) {
4382                         if (err < 0)
4383                                 goto out;
4384                         break;
4385                 }
4386                 err = drbd_recv_header(peer_device->connection, pi);
4387                 if (err)
4388                         goto out;
4389         }
4390
4391         INFO_bm_xfer_stats(device, "receive", &c);
4392
4393         if (device->state.conn == C_WF_BITMAP_T) {
4394                 enum drbd_state_rv rv;
4395
4396                 err = drbd_send_bitmap(device);
4397                 if (err)
4398                         goto out;
4399                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4400                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4401                 D_ASSERT(device, rv == SS_SUCCESS);
4402         } else if (device->state.conn != C_WF_BITMAP_S) {
4403                 /* admin may have requested C_DISCONNECTING,
4404                  * other threads may have noticed network errors */
4405                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4406                     drbd_conn_str(device->state.conn));
4407         }
4408         err = 0;
4409
4410  out:
4411         drbd_bm_unlock(device);
4412         if (!err && device->state.conn == C_WF_BITMAP_S)
4413                 drbd_start_resync(device, C_SYNC_SOURCE);
4414         return err;
4415 }
4416
4417 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4418 {
4419         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4420                  pi->cmd, pi->size);
4421
4422         return ignore_remaining_packet(connection, pi);
4423 }
4424
4425 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4426 {
4427         /* Make sure we've acked all the TCP data associated
4428          * with the data requests being unplugged */
4429         drbd_tcp_quickack(connection->data.socket);
4430
4431         return 0;
4432 }
4433
4434 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4435 {
4436         struct drbd_peer_device *peer_device;
4437         struct drbd_device *device;
4438         struct p_block_desc *p = pi->data;
4439
4440         peer_device = conn_peer_device(connection, pi->vnr);
4441         if (!peer_device)
4442                 return -EIO;
4443         device = peer_device->device;
4444
4445         switch (device->state.conn) {
4446         case C_WF_SYNC_UUID:
4447         case C_WF_BITMAP_T:
4448         case C_BEHIND:
4449                         break;
4450         default:
4451                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4452                                 drbd_conn_str(device->state.conn));
4453         }
4454
4455         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4456
4457         return 0;
4458 }
4459
4460 struct data_cmd {
4461         int expect_payload;
4462         size_t pkt_size;
4463         int (*fn)(struct drbd_connection *, struct packet_info *);
4464 };
4465
4466 static struct data_cmd drbd_cmd_handler[] = {
4467         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4468         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4469         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4470         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4471         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4472         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4473         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4474         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4475         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4476         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4477         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4478         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4479         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4480         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4481         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4482         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4483         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4484         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4485         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4486         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4487         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4488         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4489         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4490         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4491         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4492 };
4493
4494 static void drbdd(struct drbd_connection *connection)
4495 {
4496         struct packet_info pi;
4497         size_t shs; /* sub header size */
4498         int err;
4499
4500         while (get_t_state(&connection->receiver) == RUNNING) {
4501                 struct data_cmd *cmd;
4502
4503                 drbd_thread_current_set_cpu(&connection->receiver);
4504                 if (drbd_recv_header(connection, &pi))
4505                         goto err_out;
4506
4507                 cmd = &drbd_cmd_handler[pi.cmd];
4508                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4509                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4510                                  cmdname(pi.cmd), pi.cmd);
4511                         goto err_out;
4512                 }
4513
4514                 shs = cmd->pkt_size;
4515                 if (pi.size > shs && !cmd->expect_payload) {
4516                         drbd_err(connection, "No payload expected %s l:%d\n",
4517                                  cmdname(pi.cmd), pi.size);
4518                         goto err_out;
4519                 }
4520
4521                 if (shs) {
4522                         err = drbd_recv_all_warn(connection, pi.data, shs);
4523                         if (err)
4524                                 goto err_out;
4525                         pi.size -= shs;
4526                 }
4527
4528                 err = cmd->fn(connection, &pi);
4529                 if (err) {
4530                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4531                                  cmdname(pi.cmd), err, pi.size);
4532                         goto err_out;
4533                 }
4534         }
4535         return;
4536
4537     err_out:
4538         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4539 }
4540
4541 static void conn_disconnect(struct drbd_connection *connection)
4542 {
4543         struct drbd_peer_device *peer_device;
4544         enum drbd_conns oc;
4545         int vnr;
4546
4547         if (connection->cstate == C_STANDALONE)
4548                 return;
4549
4550         /* We are about to start the cleanup after connection loss.
4551          * Make sure drbd_make_request knows about that.
4552          * Usually we should be in some network failure state already,
4553          * but just in case we are not, we fix it up here.
4554          */
4555         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4556
4557         /* asender does not clean up anything. it must not interfere, either */
4558         drbd_thread_stop(&connection->asender);
4559         drbd_free_sock(connection);
4560
4561         rcu_read_lock();
4562         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4563                 struct drbd_device *device = peer_device->device;
4564                 kref_get(&device->kref);
4565                 rcu_read_unlock();
4566                 drbd_disconnected(peer_device);
4567                 kref_put(&device->kref, drbd_destroy_device);
4568                 rcu_read_lock();
4569         }
4570         rcu_read_unlock();
4571
4572         if (!list_empty(&connection->current_epoch->list))
4573                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4574         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4575         atomic_set(&connection->current_epoch->epoch_size, 0);
4576         connection->send.seen_any_write_yet = false;
4577
4578         drbd_info(connection, "Connection closed\n");
4579
4580         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4581                 conn_try_outdate_peer_async(connection);
4582
4583         spin_lock_irq(&connection->resource->req_lock);
4584         oc = connection->cstate;
4585         if (oc >= C_UNCONNECTED)
4586                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4587
4588         spin_unlock_irq(&connection->resource->req_lock);
4589
4590         if (oc == C_DISCONNECTING)
4591                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4592 }
4593
4594 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4595 {
4596         struct drbd_device *device = peer_device->device;
4597         unsigned int i;
4598
4599         /* wait for current activity to cease. */
4600         spin_lock_irq(&device->resource->req_lock);
4601         _drbd_wait_ee_list_empty(device, &device->active_ee);
4602         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4603         _drbd_wait_ee_list_empty(device, &device->read_ee);
4604         spin_unlock_irq(&device->resource->req_lock);
4605
4606         /* We do not have data structures that would allow us to
4607          * get the rs_pending_cnt down to 0 again.
4608          *  * On C_SYNC_TARGET we do not have any data structures describing
4609          *    the pending RSDataRequest's we have sent.
4610          *  * On C_SYNC_SOURCE there is no data structure that tracks
4611          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4612          *  And no, it is not the sum of the reference counts in the
4613          *  resync_LRU. The resync_LRU tracks the whole operation including
4614          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4615          *  on the fly. */
4616         drbd_rs_cancel_all(device);
4617         device->rs_total = 0;
4618         device->rs_failed = 0;
4619         atomic_set(&device->rs_pending_cnt, 0);
4620         wake_up(&device->misc_wait);
4621
4622         del_timer_sync(&device->resync_timer);
4623         resync_timer_fn((unsigned long)device);
4624
4625         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4626          * w_make_resync_request etc. which may still be on the worker queue
4627          * to be "canceled" */
4628         drbd_flush_workqueue(&peer_device->connection->sender_work);
4629
4630         drbd_finish_peer_reqs(device);
4631
4632         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4633            might have issued a work again. The one before drbd_finish_peer_reqs() is
4634            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4635         drbd_flush_workqueue(&peer_device->connection->sender_work);
4636
4637         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4638          * again via drbd_try_clear_on_disk_bm(). */
4639         drbd_rs_cancel_all(device);
4640
4641         kfree(device->p_uuid);
4642         device->p_uuid = NULL;
4643
4644         if (!drbd_suspended(device))
4645                 tl_clear(peer_device->connection);
4646
4647         drbd_md_sync(device);
4648
4649         /* serialize with bitmap writeout triggered by the state change,
4650          * if any. */
4651         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4652
4653         /* tcp_close and release of sendpage pages can be deferred.  I don't
4654          * want to use SO_LINGER, because apparently it can be deferred for
4655          * more than 20 seconds (longest time I checked).
4656          *
4657          * Actually we don't care for exactly when the network stack does its
4658          * put_page(), but release our reference on these pages right here.
4659          */
4660         i = drbd_free_peer_reqs(device, &device->net_ee);
4661         if (i)
4662                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4663         i = atomic_read(&device->pp_in_use_by_net);
4664         if (i)
4665                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4666         i = atomic_read(&device->pp_in_use);
4667         if (i)
4668                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4669
4670         D_ASSERT(device, list_empty(&device->read_ee));
4671         D_ASSERT(device, list_empty(&device->active_ee));
4672         D_ASSERT(device, list_empty(&device->sync_ee));
4673         D_ASSERT(device, list_empty(&device->done_ee));
4674
4675         return 0;
4676 }
4677
4678 /*
4679  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4680  * we can agree on is stored in agreed_pro_version.
4681  *
4682  * feature flags and the reserved array should be enough room for future
4683  * enhancements of the handshake protocol, and possible plugins...
4684  *
4685  * for now, they are expected to be zero, but ignored.
4686  */
4687 static int drbd_send_features(struct drbd_connection *connection)
4688 {
4689         struct drbd_socket *sock;
4690         struct p_connection_features *p;
4691
4692         sock = &connection->data;
4693         p = conn_prepare_command(connection, sock);
4694         if (!p)
4695                 return -EIO;
4696         memset(p, 0, sizeof(*p));
4697         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4698         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4699         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4700         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4701 }
4702
4703 /*
4704  * return values:
4705  *   1 yes, we have a valid connection
4706  *   0 oops, did not work out, please try again
4707  *  -1 peer talks different language,
4708  *     no point in trying again, please go standalone.
4709  */
4710 static int drbd_do_features(struct drbd_connection *connection)
4711 {
4712         /* ASSERT current == connection->receiver ... */
4713         struct p_connection_features *p;
4714         const int expect = sizeof(struct p_connection_features);
4715         struct packet_info pi;
4716         int err;
4717
4718         err = drbd_send_features(connection);
4719         if (err)
4720                 return 0;
4721
4722         err = drbd_recv_header(connection, &pi);
4723         if (err)
4724                 return 0;
4725
4726         if (pi.cmd != P_CONNECTION_FEATURES) {
4727                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4728                          cmdname(pi.cmd), pi.cmd);
4729                 return -1;
4730         }
4731
4732         if (pi.size != expect) {
4733                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4734                      expect, pi.size);
4735                 return -1;
4736         }
4737
4738         p = pi.data;
4739         err = drbd_recv_all_warn(connection, p, expect);
4740         if (err)
4741                 return 0;
4742
4743         p->protocol_min = be32_to_cpu(p->protocol_min);
4744         p->protocol_max = be32_to_cpu(p->protocol_max);
4745         if (p->protocol_max == 0)
4746                 p->protocol_max = p->protocol_min;
4747
4748         if (PRO_VERSION_MAX < p->protocol_min ||
4749             PRO_VERSION_MIN > p->protocol_max)
4750                 goto incompat;
4751
4752         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4753         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4754
4755         drbd_info(connection, "Handshake successful: "
4756              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4757
4758         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4759                   connection->agreed_features & FF_TRIM ? " " : " not ");
4760
4761         return 1;
4762
4763  incompat:
4764         drbd_err(connection, "incompatible DRBD dialects: "
4765             "I support %d-%d, peer supports %d-%d\n",
4766             PRO_VERSION_MIN, PRO_VERSION_MAX,
4767             p->protocol_min, p->protocol_max);
4768         return -1;
4769 }
4770
4771 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4772 static int drbd_do_auth(struct drbd_connection *connection)
4773 {
4774         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4775         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4776         return -1;
4777 }
4778 #else
4779 #define CHALLENGE_LEN 64
4780
4781 /* Return value:
4782         1 - auth succeeded,
4783         0 - failed, try again (network error),
4784         -1 - auth failed, don't try again.
4785 */
4786
4787 static int drbd_do_auth(struct drbd_connection *connection)
4788 {
4789         struct drbd_socket *sock;
4790         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4791         struct scatterlist sg;
4792         char *response = NULL;
4793         char *right_response = NULL;
4794         char *peers_ch = NULL;
4795         unsigned int key_len;
4796         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4797         unsigned int resp_size;
4798         struct hash_desc desc;
4799         struct packet_info pi;
4800         struct net_conf *nc;
4801         int err, rv;
4802
4803         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4804
4805         rcu_read_lock();
4806         nc = rcu_dereference(connection->net_conf);
4807         key_len = strlen(nc->shared_secret);
4808         memcpy(secret, nc->shared_secret, key_len);
4809         rcu_read_unlock();
4810
4811         desc.tfm = connection->cram_hmac_tfm;
4812         desc.flags = 0;
4813
4814         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4815         if (rv) {
4816                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4817                 rv = -1;
4818                 goto fail;
4819         }
4820
4821         get_random_bytes(my_challenge, CHALLENGE_LEN);
4822
4823         sock = &connection->data;
4824         if (!conn_prepare_command(connection, sock)) {
4825                 rv = 0;
4826                 goto fail;
4827         }
4828         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4829                                 my_challenge, CHALLENGE_LEN);
4830         if (!rv)
4831                 goto fail;
4832
4833         err = drbd_recv_header(connection, &pi);
4834         if (err) {
4835                 rv = 0;
4836                 goto fail;
4837         }
4838
4839         if (pi.cmd != P_AUTH_CHALLENGE) {
4840                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4841                          cmdname(pi.cmd), pi.cmd);
4842                 rv = 0;
4843                 goto fail;
4844         }
4845
4846         if (pi.size > CHALLENGE_LEN * 2) {
4847                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4848                 rv = -1;
4849                 goto fail;
4850         }
4851
4852         if (pi.size < CHALLENGE_LEN) {
4853                 drbd_err(connection, "AuthChallenge payload too small.\n");
4854                 rv = -1;
4855                 goto fail;
4856         }
4857
4858         peers_ch = kmalloc(pi.size, GFP_NOIO);
4859         if (peers_ch == NULL) {
4860                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4861                 rv = -1;
4862                 goto fail;
4863         }
4864
4865         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4866         if (err) {
4867                 rv = 0;
4868                 goto fail;
4869         }
4870
4871         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4872                 drbd_err(connection, "Peer presented the same challenge!\n");
4873                 rv = -1;
4874                 goto fail;
4875         }
4876
4877         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4878         response = kmalloc(resp_size, GFP_NOIO);
4879         if (response == NULL) {
4880                 drbd_err(connection, "kmalloc of response failed\n");
4881                 rv = -1;
4882                 goto fail;
4883         }
4884
4885         sg_init_table(&sg, 1);
4886         sg_set_buf(&sg, peers_ch, pi.size);
4887
4888         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4889         if (rv) {
4890                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4891                 rv = -1;
4892                 goto fail;
4893         }
4894
4895         if (!conn_prepare_command(connection, sock)) {
4896                 rv = 0;
4897                 goto fail;
4898         }
4899         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4900                                 response, resp_size);
4901         if (!rv)
4902                 goto fail;
4903
4904         err = drbd_recv_header(connection, &pi);
4905         if (err) {
4906                 rv = 0;
4907                 goto fail;
4908         }
4909
4910         if (pi.cmd != P_AUTH_RESPONSE) {
4911                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4912                          cmdname(pi.cmd), pi.cmd);
4913                 rv = 0;
4914                 goto fail;
4915         }
4916
4917         if (pi.size != resp_size) {
4918                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4919                 rv = 0;
4920                 goto fail;
4921         }
4922
4923         err = drbd_recv_all_warn(connection, response , resp_size);
4924         if (err) {
4925                 rv = 0;
4926                 goto fail;
4927         }
4928
4929         right_response = kmalloc(resp_size, GFP_NOIO);
4930         if (right_response == NULL) {
4931                 drbd_err(connection, "kmalloc of right_response failed\n");
4932                 rv = -1;
4933                 goto fail;
4934         }
4935
4936         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4937
4938         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4939         if (rv) {
4940                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4941                 rv = -1;
4942                 goto fail;
4943         }
4944
4945         rv = !memcmp(response, right_response, resp_size);
4946
4947         if (rv)
4948                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4949                      resp_size);
4950         else
4951                 rv = -1;
4952
4953  fail:
4954         kfree(peers_ch);
4955         kfree(response);
4956         kfree(right_response);
4957
4958         return rv;
4959 }
4960 #endif
4961
4962 int drbd_receiver(struct drbd_thread *thi)
4963 {
4964         struct drbd_connection *connection = thi->connection;
4965         int h;
4966
4967         drbd_info(connection, "receiver (re)started\n");
4968
4969         do {
4970                 h = conn_connect(connection);
4971                 if (h == 0) {
4972                         conn_disconnect(connection);
4973                         schedule_timeout_interruptible(HZ);
4974                 }
4975                 if (h == -1) {
4976                         drbd_warn(connection, "Discarding network configuration.\n");
4977                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4978                 }
4979         } while (h == 0);
4980
4981         if (h > 0)
4982                 drbdd(connection);
4983
4984         conn_disconnect(connection);
4985
4986         drbd_info(connection, "receiver terminated\n");
4987         return 0;
4988 }
4989
4990 /* ********* acknowledge sender ******** */
4991
4992 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4993 {
4994         struct p_req_state_reply *p = pi->data;
4995         int retcode = be32_to_cpu(p->retcode);
4996
4997         if (retcode >= SS_SUCCESS) {
4998                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4999         } else {
5000                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5001                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5002                          drbd_set_st_err_str(retcode), retcode);
5003         }
5004         wake_up(&connection->ping_wait);
5005
5006         return 0;
5007 }
5008
5009 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5010 {
5011         struct drbd_peer_device *peer_device;
5012         struct drbd_device *device;
5013         struct p_req_state_reply *p = pi->data;
5014         int retcode = be32_to_cpu(p->retcode);
5015
5016         peer_device = conn_peer_device(connection, pi->vnr);
5017         if (!peer_device)
5018                 return -EIO;
5019         device = peer_device->device;
5020
5021         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5022                 D_ASSERT(device, connection->agreed_pro_version < 100);
5023                 return got_conn_RqSReply(connection, pi);
5024         }
5025
5026         if (retcode >= SS_SUCCESS) {
5027                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5028         } else {
5029                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5030                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5031                         drbd_set_st_err_str(retcode), retcode);
5032         }
5033         wake_up(&device->state_wait);
5034
5035         return 0;
5036 }
5037
5038 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5039 {
5040         return drbd_send_ping_ack(connection);
5041
5042 }
5043
5044 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5045 {
5046         /* restore idle timeout */
5047         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5048         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5049                 wake_up(&connection->ping_wait);
5050
5051         return 0;
5052 }
5053
5054 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5055 {
5056         struct drbd_peer_device *peer_device;
5057         struct drbd_device *device;
5058         struct p_block_ack *p = pi->data;
5059         sector_t sector = be64_to_cpu(p->sector);
5060         int blksize = be32_to_cpu(p->blksize);
5061
5062         peer_device = conn_peer_device(connection, pi->vnr);
5063         if (!peer_device)
5064                 return -EIO;
5065         device = peer_device->device;
5066
5067         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5068
5069         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5070
5071         if (get_ldev(device)) {
5072                 drbd_rs_complete_io(device, sector);
5073                 drbd_set_in_sync(device, sector, blksize);
5074                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5075                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5076                 put_ldev(device);
5077         }
5078         dec_rs_pending(device);
5079         atomic_add(blksize >> 9, &device->rs_sect_in);
5080
5081         return 0;
5082 }
5083
5084 static int
5085 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5086                               struct rb_root *root, const char *func,
5087                               enum drbd_req_event what, bool missing_ok)
5088 {
5089         struct drbd_request *req;
5090         struct bio_and_error m;
5091
5092         spin_lock_irq(&device->resource->req_lock);
5093         req = find_request(device, root, id, sector, missing_ok, func);
5094         if (unlikely(!req)) {
5095                 spin_unlock_irq(&device->resource->req_lock);
5096                 return -EIO;
5097         }
5098         __req_mod(req, what, &m);
5099         spin_unlock_irq(&device->resource->req_lock);
5100
5101         if (m.bio)
5102                 complete_master_bio(device, &m);
5103         return 0;
5104 }
5105
5106 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5107 {
5108         struct drbd_peer_device *peer_device;
5109         struct drbd_device *device;
5110         struct p_block_ack *p = pi->data;
5111         sector_t sector = be64_to_cpu(p->sector);
5112         int blksize = be32_to_cpu(p->blksize);
5113         enum drbd_req_event what;
5114
5115         peer_device = conn_peer_device(connection, pi->vnr);
5116         if (!peer_device)
5117                 return -EIO;
5118         device = peer_device->device;
5119
5120         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5121
5122         if (p->block_id == ID_SYNCER) {
5123                 drbd_set_in_sync(device, sector, blksize);
5124                 dec_rs_pending(device);
5125                 return 0;
5126         }
5127         switch (pi->cmd) {
5128         case P_RS_WRITE_ACK:
5129                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5130                 break;
5131         case P_WRITE_ACK:
5132                 what = WRITE_ACKED_BY_PEER;
5133                 break;
5134         case P_RECV_ACK:
5135                 what = RECV_ACKED_BY_PEER;
5136                 break;
5137         case P_SUPERSEDED:
5138                 what = CONFLICT_RESOLVED;
5139                 break;
5140         case P_RETRY_WRITE:
5141                 what = POSTPONE_WRITE;
5142                 break;
5143         default:
5144                 BUG();
5145         }
5146
5147         return validate_req_change_req_state(device, p->block_id, sector,
5148                                              &device->write_requests, __func__,
5149                                              what, false);
5150 }
5151
5152 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5153 {
5154         struct drbd_peer_device *peer_device;
5155         struct drbd_device *device;
5156         struct p_block_ack *p = pi->data;
5157         sector_t sector = be64_to_cpu(p->sector);
5158         int size = be32_to_cpu(p->blksize);
5159         int err;
5160
5161         peer_device = conn_peer_device(connection, pi->vnr);
5162         if (!peer_device)
5163                 return -EIO;
5164         device = peer_device->device;
5165
5166         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5167
5168         if (p->block_id == ID_SYNCER) {
5169                 dec_rs_pending(device);
5170                 drbd_rs_failed_io(device, sector, size);
5171                 return 0;
5172         }
5173
5174         err = validate_req_change_req_state(device, p->block_id, sector,
5175                                             &device->write_requests, __func__,
5176                                             NEG_ACKED, true);
5177         if (err) {
5178                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5179                    The master bio might already be completed, therefore the
5180                    request is no longer in the collision hash. */
5181                 /* In Protocol B we might already have got a P_RECV_ACK
5182                    but then get a P_NEG_ACK afterwards. */
5183                 drbd_set_out_of_sync(device, sector, size);
5184         }
5185         return 0;
5186 }
5187
5188 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5189 {
5190         struct drbd_peer_device *peer_device;
5191         struct drbd_device *device;
5192         struct p_block_ack *p = pi->data;
5193         sector_t sector = be64_to_cpu(p->sector);
5194
5195         peer_device = conn_peer_device(connection, pi->vnr);
5196         if (!peer_device)
5197                 return -EIO;
5198         device = peer_device->device;
5199
5200         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5201
5202         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5203             (unsigned long long)sector, be32_to_cpu(p->blksize));
5204
5205         return validate_req_change_req_state(device, p->block_id, sector,
5206                                              &device->read_requests, __func__,
5207                                              NEG_ACKED, false);
5208 }
5209
5210 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5211 {
5212         struct drbd_peer_device *peer_device;
5213         struct drbd_device *device;
5214         sector_t sector;
5215         int size;
5216         struct p_block_ack *p = pi->data;
5217
5218         peer_device = conn_peer_device(connection, pi->vnr);
5219         if (!peer_device)
5220                 return -EIO;
5221         device = peer_device->device;
5222
5223         sector = be64_to_cpu(p->sector);
5224         size = be32_to_cpu(p->blksize);
5225
5226         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227
5228         dec_rs_pending(device);
5229
5230         if (get_ldev_if_state(device, D_FAILED)) {
5231                 drbd_rs_complete_io(device, sector);
5232                 switch (pi->cmd) {
5233                 case P_NEG_RS_DREPLY:
5234                         drbd_rs_failed_io(device, sector, size);
5235                 case P_RS_CANCEL:
5236                         break;
5237                 default:
5238                         BUG();
5239                 }
5240                 put_ldev(device);
5241         }
5242
5243         return 0;
5244 }
5245
5246 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5247 {
5248         struct p_barrier_ack *p = pi->data;
5249         struct drbd_peer_device *peer_device;
5250         int vnr;
5251
5252         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5253
5254         rcu_read_lock();
5255         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5256                 struct drbd_device *device = peer_device->device;
5257
5258                 if (device->state.conn == C_AHEAD &&
5259                     atomic_read(&device->ap_in_flight) == 0 &&
5260                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5261                         device->start_resync_timer.expires = jiffies + HZ;
5262                         add_timer(&device->start_resync_timer);
5263                 }
5264         }
5265         rcu_read_unlock();
5266
5267         return 0;
5268 }
5269
5270 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5271 {
5272         struct drbd_peer_device *peer_device;
5273         struct drbd_device *device;
5274         struct p_block_ack *p = pi->data;
5275         struct drbd_device_work *dw;
5276         sector_t sector;
5277         int size;
5278
5279         peer_device = conn_peer_device(connection, pi->vnr);
5280         if (!peer_device)
5281                 return -EIO;
5282         device = peer_device->device;
5283
5284         sector = be64_to_cpu(p->sector);
5285         size = be32_to_cpu(p->blksize);
5286
5287         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5288
5289         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5290                 drbd_ov_out_of_sync_found(device, sector, size);
5291         else
5292                 ov_out_of_sync_print(device);
5293
5294         if (!get_ldev(device))
5295                 return 0;
5296
5297         drbd_rs_complete_io(device, sector);
5298         dec_rs_pending(device);
5299
5300         --device->ov_left;
5301
5302         /* let's advance progress step marks only for every other megabyte */
5303         if ((device->ov_left & 0x200) == 0x200)
5304                 drbd_advance_rs_marks(device, device->ov_left);
5305
5306         if (device->ov_left == 0) {
5307                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5308                 if (dw) {
5309                         dw->w.cb = w_ov_finished;
5310                         dw->device = device;
5311                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5312                 } else {
5313                         drbd_err(device, "kmalloc(dw) failed.");
5314                         ov_out_of_sync_print(device);
5315                         drbd_resync_finished(device);
5316                 }
5317         }
5318         put_ldev(device);
5319         return 0;
5320 }
5321
5322 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5323 {
5324         return 0;
5325 }
5326
5327 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5328 {
5329         struct drbd_peer_device *peer_device;
5330         int vnr, not_empty = 0;
5331
5332         do {
5333                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5334                 flush_signals(current);
5335
5336                 rcu_read_lock();
5337                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5338                         struct drbd_device *device = peer_device->device;
5339                         kref_get(&device->kref);
5340                         rcu_read_unlock();
5341                         if (drbd_finish_peer_reqs(device)) {
5342                                 kref_put(&device->kref, drbd_destroy_device);
5343                                 return 1;
5344                         }
5345                         kref_put(&device->kref, drbd_destroy_device);
5346                         rcu_read_lock();
5347                 }
5348                 set_bit(SIGNAL_ASENDER, &connection->flags);
5349
5350                 spin_lock_irq(&connection->resource->req_lock);
5351                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5352                         struct drbd_device *device = peer_device->device;
5353                         not_empty = !list_empty(&device->done_ee);
5354                         if (not_empty)
5355                                 break;
5356                 }
5357                 spin_unlock_irq(&connection->resource->req_lock);
5358                 rcu_read_unlock();
5359         } while (not_empty);
5360
5361         return 0;
5362 }
5363
5364 struct asender_cmd {
5365         size_t pkt_size;
5366         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5367 };
5368
5369 static struct asender_cmd asender_tbl[] = {
5370         [P_PING]            = { 0, got_Ping },
5371         [P_PING_ACK]        = { 0, got_PingAck },
5372         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5373         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5374         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5375         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5376         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5377         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5378         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5379         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5380         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5381         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5382         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5383         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5384         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5385         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5386         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5387 };
5388
5389 int drbd_asender(struct drbd_thread *thi)
5390 {
5391         struct drbd_connection *connection = thi->connection;
5392         struct asender_cmd *cmd = NULL;
5393         struct packet_info pi;
5394         int rv;
5395         void *buf    = connection->meta.rbuf;
5396         int received = 0;
5397         unsigned int header_size = drbd_header_size(connection);
5398         int expect   = header_size;
5399         bool ping_timeout_active = false;
5400         struct net_conf *nc;
5401         int ping_timeo, tcp_cork, ping_int;
5402         struct sched_param param = { .sched_priority = 2 };
5403
5404         rv = sched_setscheduler(current, SCHED_RR, &param);
5405         if (rv < 0)
5406                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5407
5408         while (get_t_state(thi) == RUNNING) {
5409                 drbd_thread_current_set_cpu(thi);
5410
5411                 rcu_read_lock();
5412                 nc = rcu_dereference(connection->net_conf);
5413                 ping_timeo = nc->ping_timeo;
5414                 tcp_cork = nc->tcp_cork;
5415                 ping_int = nc->ping_int;
5416                 rcu_read_unlock();
5417
5418                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5419                         if (drbd_send_ping(connection)) {
5420                                 drbd_err(connection, "drbd_send_ping has failed\n");
5421                                 goto reconnect;
5422                         }
5423                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5424                         ping_timeout_active = true;
5425                 }
5426
5427                 /* TODO: conditionally cork; it may hurt latency if we cork without
5428                    much to send */
5429                 if (tcp_cork)
5430                         drbd_tcp_cork(connection->meta.socket);
5431                 if (connection_finish_peer_reqs(connection)) {
5432                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5433                         goto reconnect;
5434                 }
5435                 /* but unconditionally uncork unless disabled */
5436                 if (tcp_cork)
5437                         drbd_tcp_uncork(connection->meta.socket);
5438
5439                 /* short circuit, recv_msg would return EINTR anyways. */
5440                 if (signal_pending(current))
5441                         continue;
5442
5443                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5444                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5445
5446                 flush_signals(current);
5447
5448                 /* Note:
5449                  * -EINTR        (on meta) we got a signal
5450                  * -EAGAIN       (on meta) rcvtimeo expired
5451                  * -ECONNRESET   other side closed the connection
5452                  * -ERESTARTSYS  (on data) we got a signal
5453                  * rv <  0       other than above: unexpected error!
5454                  * rv == expected: full header or command
5455                  * rv <  expected: "woken" by signal during receive
5456                  * rv == 0       : "connection shut down by peer"
5457                  */
5458                 if (likely(rv > 0)) {
5459                         received += rv;
5460                         buf      += rv;
5461                 } else if (rv == 0) {
5462                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5463                                 long t;
5464                                 rcu_read_lock();
5465                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5466                                 rcu_read_unlock();
5467
5468                                 t = wait_event_timeout(connection->ping_wait,
5469                                                        connection->cstate < C_WF_REPORT_PARAMS,
5470                                                        t);
5471                                 if (t)
5472                                         break;
5473                         }
5474                         drbd_err(connection, "meta connection shut down by peer.\n");
5475                         goto reconnect;
5476                 } else if (rv == -EAGAIN) {
5477                         /* If the data socket received something meanwhile,
5478                          * that is good enough: peer is still alive. */
5479                         if (time_after(connection->last_received,
5480                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5481                                 continue;
5482                         if (ping_timeout_active) {
5483                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5484                                 goto reconnect;
5485                         }
5486                         set_bit(SEND_PING, &connection->flags);
5487                         continue;
5488                 } else if (rv == -EINTR) {
5489                         continue;
5490                 } else {
5491                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5492                         goto reconnect;
5493                 }
5494
5495                 if (received == expect && cmd == NULL) {
5496                         if (decode_header(connection, connection->meta.rbuf, &pi))
5497                                 goto reconnect;
5498                         cmd = &asender_tbl[pi.cmd];
5499                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5500                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5501                                          cmdname(pi.cmd), pi.cmd);
5502                                 goto disconnect;
5503                         }
5504                         expect = header_size + cmd->pkt_size;
5505                         if (pi.size != expect - header_size) {
5506                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5507                                         pi.cmd, pi.size);
5508                                 goto reconnect;
5509                         }
5510                 }
5511                 if (received == expect) {
5512                         bool err;
5513
5514                         err = cmd->fn(connection, &pi);
5515                         if (err) {
5516                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5517                                 goto reconnect;
5518                         }
5519
5520                         connection->last_received = jiffies;
5521
5522                         if (cmd == &asender_tbl[P_PING_ACK]) {
5523                                 /* restore idle timeout */
5524                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5525                                 ping_timeout_active = false;
5526                         }
5527
5528                         buf      = connection->meta.rbuf;
5529                         received = 0;
5530                         expect   = header_size;
5531                         cmd      = NULL;
5532                 }
5533         }
5534
5535         if (0) {
5536 reconnect:
5537                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5538                 conn_md_sync(connection);
5539         }
5540         if (0) {
5541 disconnect:
5542                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5543         }
5544         clear_bit(SIGNAL_ASENDER, &connection->flags);
5545
5546         drbd_info(connection, "asender terminated\n");
5547
5548         return 0;
5549 }