drbd: Change how the initial packets are called
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123 DEFINE_MUTEX(drbd_cfg_mutex);
124
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache;       /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
133
134 /* I do not use a standard mempool, because:
135    1) I want to hand out the pre-allocated objects first.
136    2) I want to be able to interrupt sleeping allocation with a signal.
137    Note: This is a single linked list, the next pointer is the private
138          member of struct page.
139  */
140 struct page *drbd_pp_pool;
141 spinlock_t   drbd_pp_lock;
142 int          drbd_pp_vacant;
143 wait_queue_head_t drbd_pp_wait;
144
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146
147 static const struct block_device_operations drbd_ops = {
148         .owner =   THIS_MODULE,
149         .open =    drbd_open,
150         .release = drbd_release,
151 };
152
153 static void bio_destructor_drbd(struct bio *bio)
154 {
155         bio_free(bio, drbd_md_io_bio_set);
156 }
157
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
159 {
160         struct bio *bio;
161
162         if (!drbd_md_io_bio_set)
163                 return bio_alloc(gfp_mask, 1);
164
165         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
166         if (!bio)
167                 return NULL;
168         bio->bi_destructor = bio_destructor_drbd;
169         return bio;
170 }
171
172 #ifdef __CHECKER__
173 /* When checking with sparse, and this is an inline function, sparse will
174    give tons of false positives. When this is a real functions sparse works.
175  */
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
177 {
178         int io_allowed;
179
180         atomic_inc(&mdev->local_cnt);
181         io_allowed = (mdev->state.disk >= mins);
182         if (!io_allowed) {
183                 if (atomic_dec_and_test(&mdev->local_cnt))
184                         wake_up(&mdev->misc_wait);
185         }
186         return io_allowed;
187 }
188
189 #endif
190
191 /**
192  * DOC: The transfer log
193  *
194  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196  * of the list. There is always at least one &struct drbd_tl_epoch object.
197  *
198  * Each &struct drbd_tl_epoch has a circular double linked list of requests
199  * attached.
200  */
201 static int tl_init(struct drbd_tconn *tconn)
202 {
203         struct drbd_tl_epoch *b;
204
205         /* during device minor initialization, we may well use GFP_KERNEL */
206         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
207         if (!b)
208                 return 0;
209         INIT_LIST_HEAD(&b->requests);
210         INIT_LIST_HEAD(&b->w.list);
211         b->next = NULL;
212         b->br_number = 4711;
213         b->n_writes = 0;
214         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215
216         tconn->oldest_tle = b;
217         tconn->newest_tle = b;
218         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_del_init(&b->requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421 }
422
423
424 /**
425  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426  * @mdev:       DRBD device.
427  *
428  * This is called after the connection to the peer was lost. The storage covered
429  * by the requests on the transfer gets marked as our of sync. Called from the
430  * receiver thread and the worker thread.
431  */
432 void tl_clear(struct drbd_tconn *tconn)
433 {
434         struct drbd_conf *mdev;
435         struct list_head *le, *tle;
436         struct drbd_request *r;
437         int vnr;
438
439         spin_lock_irq(&tconn->req_lock);
440
441         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
442
443         /* we expect this list to be empty. */
444         if (!list_empty(&tconn->out_of_sequence_requests))
445                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
446
447         /* but just in case, clean it up anyways! */
448         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449                 r = list_entry(le, struct drbd_request, tl_requests);
450                 /* It would be nice to complete outside of spinlock.
451                  * But this is easier for now. */
452                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
453         }
454
455         /* ensure bit indicating barrier is required is clear */
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458
459         spin_unlock_irq(&tconn->req_lock);
460 }
461
462 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
463 {
464         spin_lock_irq(&tconn->req_lock);
465         _tl_restart(tconn, what);
466         spin_unlock_irq(&tconn->req_lock);
467 }
468
469 static int drbd_thread_setup(void *arg)
470 {
471         struct drbd_thread *thi = (struct drbd_thread *) arg;
472         struct drbd_tconn *tconn = thi->tconn;
473         unsigned long flags;
474         int retval;
475
476         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
477                  thi->name[0], thi->tconn->name);
478
479 restart:
480         retval = thi->function(thi);
481
482         spin_lock_irqsave(&thi->t_lock, flags);
483
484         /* if the receiver has been "EXITING", the last thing it did
485          * was set the conn state to "StandAlone",
486          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
487          * and receiver thread will be "started".
488          * drbd_thread_start needs to set "RESTARTING" in that case.
489          * t_state check and assignment needs to be within the same spinlock,
490          * so either thread_start sees EXITING, and can remap to RESTARTING,
491          * or thread_start see NONE, and can proceed as normal.
492          */
493
494         if (thi->t_state == RESTARTING) {
495                 conn_info(tconn, "Restarting %s thread\n", thi->name);
496                 thi->t_state = RUNNING;
497                 spin_unlock_irqrestore(&thi->t_lock, flags);
498                 goto restart;
499         }
500
501         thi->task = NULL;
502         thi->t_state = NONE;
503         smp_mb();
504         complete(&thi->stop);
505         spin_unlock_irqrestore(&thi->t_lock, flags);
506
507         conn_info(tconn, "Terminating %s\n", current->comm);
508
509         /* Release mod reference taken when thread was started */
510         module_put(THIS_MODULE);
511         return retval;
512 }
513
514 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
515                              int (*func) (struct drbd_thread *), char *name)
516 {
517         spin_lock_init(&thi->t_lock);
518         thi->task    = NULL;
519         thi->t_state = NONE;
520         thi->function = func;
521         thi->tconn = tconn;
522         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
523 }
524
525 int drbd_thread_start(struct drbd_thread *thi)
526 {
527         struct drbd_tconn *tconn = thi->tconn;
528         struct task_struct *nt;
529         unsigned long flags;
530
531         /* is used from state engine doing drbd_thread_stop_nowait,
532          * while holding the req lock irqsave */
533         spin_lock_irqsave(&thi->t_lock, flags);
534
535         switch (thi->t_state) {
536         case NONE:
537                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
538                          thi->name, current->comm, current->pid);
539
540                 /* Get ref on module for thread - this is released when thread exits */
541                 if (!try_module_get(THIS_MODULE)) {
542                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
543                         spin_unlock_irqrestore(&thi->t_lock, flags);
544                         return false;
545                 }
546
547                 init_completion(&thi->stop);
548                 thi->reset_cpu_mask = 1;
549                 thi->t_state = RUNNING;
550                 spin_unlock_irqrestore(&thi->t_lock, flags);
551                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
552
553                 nt = kthread_create(drbd_thread_setup, (void *) thi,
554                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
555
556                 if (IS_ERR(nt)) {
557                         conn_err(tconn, "Couldn't start thread\n");
558
559                         module_put(THIS_MODULE);
560                         return false;
561                 }
562                 spin_lock_irqsave(&thi->t_lock, flags);
563                 thi->task = nt;
564                 thi->t_state = RUNNING;
565                 spin_unlock_irqrestore(&thi->t_lock, flags);
566                 wake_up_process(nt);
567                 break;
568         case EXITING:
569                 thi->t_state = RESTARTING;
570                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
571                                 thi->name, current->comm, current->pid);
572                 /* fall through */
573         case RUNNING:
574         case RESTARTING:
575         default:
576                 spin_unlock_irqrestore(&thi->t_lock, flags);
577                 break;
578         }
579
580         return true;
581 }
582
583
584 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
585 {
586         unsigned long flags;
587
588         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
589
590         /* may be called from state engine, holding the req lock irqsave */
591         spin_lock_irqsave(&thi->t_lock, flags);
592
593         if (thi->t_state == NONE) {
594                 spin_unlock_irqrestore(&thi->t_lock, flags);
595                 if (restart)
596                         drbd_thread_start(thi);
597                 return;
598         }
599
600         if (thi->t_state != ns) {
601                 if (thi->task == NULL) {
602                         spin_unlock_irqrestore(&thi->t_lock, flags);
603                         return;
604                 }
605
606                 thi->t_state = ns;
607                 smp_mb();
608                 init_completion(&thi->stop);
609                 if (thi->task != current)
610                         force_sig(DRBD_SIGKILL, thi->task);
611         }
612
613         spin_unlock_irqrestore(&thi->t_lock, flags);
614
615         if (wait)
616                 wait_for_completion(&thi->stop);
617 }
618
619 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
620 {
621         struct drbd_thread *thi =
622                 task == tconn->receiver.task ? &tconn->receiver :
623                 task == tconn->asender.task  ? &tconn->asender :
624                 task == tconn->worker.task   ? &tconn->worker : NULL;
625
626         return thi;
627 }
628
629 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
630 {
631         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
632         return thi ? thi->name : task->comm;
633 }
634
635 int conn_lowest_minor(struct drbd_tconn *tconn)
636 {
637         int vnr = 0;
638         struct drbd_conf *mdev;
639
640         mdev = idr_get_next(&tconn->volumes, &vnr);
641         if (!mdev)
642                 return -1;
643         return mdev_to_minor(mdev);
644 }
645
646 #ifdef CONFIG_SMP
647 /**
648  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
649  * @mdev:       DRBD device.
650  *
651  * Forces all threads of a device onto the same CPU. This is beneficial for
652  * DRBD's performance. May be overwritten by user's configuration.
653  */
654 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
655 {
656         int ord, cpu;
657
658         /* user override. */
659         if (cpumask_weight(tconn->cpu_mask))
660                 return;
661
662         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
663         for_each_online_cpu(cpu) {
664                 if (ord-- == 0) {
665                         cpumask_set_cpu(cpu, tconn->cpu_mask);
666                         return;
667                 }
668         }
669         /* should not be reached */
670         cpumask_setall(tconn->cpu_mask);
671 }
672
673 /**
674  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
675  * @mdev:       DRBD device.
676  * @thi:        drbd_thread object
677  *
678  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
679  * prematurely.
680  */
681 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
682 {
683         struct task_struct *p = current;
684
685         if (!thi->reset_cpu_mask)
686                 return;
687         thi->reset_cpu_mask = 0;
688         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
689 }
690 #endif
691
692 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
693 {
694         h->magic   = cpu_to_be32(DRBD_MAGIC);
695         h->command = cpu_to_be16(cmd);
696         h->length  = cpu_to_be16(size);
697 }
698
699 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
700 {
701         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
702         h->command = cpu_to_be16(cmd);
703         h->length  = cpu_to_be32(size);
704 }
705
706 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
707                             enum drbd_packet cmd, int size)
708 {
709         if (tconn->agreed_pro_version >= 95)
710                 prepare_header95(&h->h95, cmd, size);
711         else
712                 prepare_header80(&h->h80, cmd, size);
713 }
714
715 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
716                            enum drbd_packet cmd, int size)
717 {
718         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
719 }
720
721 /* the appropriate socket mutex must be held already */
722 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
723                    enum drbd_packet cmd, struct p_header *h, size_t size,
724                    unsigned msg_flags)
725 {
726         int err;
727
728         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
729         err = drbd_send_all(tconn, sock->socket, h, size, msg_flags);
730         if (err && !signal_pending(current))
731                 conn_warn(tconn, "short send %s size=%d\n",
732                           cmdname(cmd), (int)size);
733         return err;
734 }
735
736 /* don't pass the socket. we may only look at it
737  * when we hold the appropriate socket mutex.
738  */
739 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
740                   enum drbd_packet cmd, struct p_header *h, size_t size)
741 {
742         int err = -EIO;
743
744         mutex_lock(&sock->mutex);
745         if (sock->socket)
746                 err = _conn_send_cmd(tconn, vnr, sock, cmd, h, size, 0);
747         mutex_unlock(&sock->mutex);
748         return err;
749 }
750
751 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
752                    size_t size)
753 {
754         struct p_header80 h;
755         int err;
756
757         prepare_header80(&h, cmd, size);
758         err = drbd_get_data_sock(tconn);
759         if (!err) {
760                 err = drbd_send_all(tconn, tconn->data.socket, &h, sizeof(h), 0);
761                 if (!err)
762                         err = drbd_send_all(tconn, tconn->data.socket, data, size, 0);
763                 drbd_put_data_sock(tconn);
764         }
765         return err;
766 }
767
768 int drbd_send_ping(struct drbd_tconn *tconn)
769 {
770         struct p_header h;
771         return !conn_send_cmd(tconn, 0, &tconn->meta, P_PING, &h, sizeof(h));
772 }
773
774 int drbd_send_ping_ack(struct drbd_tconn *tconn)
775 {
776         struct p_header h;
777         return !conn_send_cmd(tconn, 0, &tconn->meta, P_PING_ACK, &h, sizeof(h));
778 }
779
780 int drbd_send_sync_param(struct drbd_conf *mdev)
781 {
782         struct p_rs_param_95 *p;
783         struct drbd_socket *sock;
784         int size, err;
785         const int apv = mdev->tconn->agreed_pro_version;
786
787         size = apv <= 87 ? sizeof(struct p_rs_param)
788                 : apv == 88 ? sizeof(struct p_rs_param)
789                         + strlen(mdev->tconn->net_conf->verify_alg) + 1
790                 : apv <= 94 ? sizeof(struct p_rs_param_89)
791                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
792
793         mutex_lock(&mdev->tconn->data.mutex);
794         sock = &mdev->tconn->data;
795
796         if (likely(sock->socket != NULL)) {
797                 enum drbd_packet cmd =
798                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
799
800                 p = mdev->tconn->data.sbuf;
801
802                 /* initialize verify_alg and csums_alg */
803                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
804
805                 if (get_ldev(mdev)) {
806                         p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
807                         p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
808                         p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
809                         p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
810                         p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
811                         put_ldev(mdev);
812                 } else {
813                         p->rate = cpu_to_be32(DRBD_RATE_DEF);
814                         p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
815                         p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
816                         p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
817                         p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
818                 }
819
820                 if (apv >= 88)
821                         strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
822                 if (apv >= 89)
823                         strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
824
825                 err = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
826         } else
827                 err = -EIO;
828
829         mutex_unlock(&mdev->tconn->data.mutex);
830
831         return err;
832 }
833
834 int drbd_send_protocol(struct drbd_tconn *tconn)
835 {
836         struct p_protocol *p;
837         int size, cf, err;
838
839         size = sizeof(struct p_protocol);
840
841         if (tconn->agreed_pro_version >= 87)
842                 size += strlen(tconn->net_conf->integrity_alg) + 1;
843
844         /* we must not recurse into our own queue,
845          * as that is blocked during handshake */
846         p = kmalloc(size, GFP_NOIO);
847         if (p == NULL)
848                 return -ENOMEM;
849
850         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
851         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
852         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
853         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
854         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
855
856         cf = 0;
857         if (tconn->net_conf->want_lose)
858                 cf |= CF_WANT_LOSE;
859         if (tconn->net_conf->dry_run) {
860                 if (tconn->agreed_pro_version >= 92)
861                         cf |= CF_DRY_RUN;
862                 else {
863                         conn_err(tconn, "--dry-run is not supported by peer");
864                         kfree(p);
865                         return -EOPNOTSUPP;
866                 }
867         }
868         p->conn_flags    = cpu_to_be32(cf);
869
870         if (tconn->agreed_pro_version >= 87)
871                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
872
873         err = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
874         kfree(p);
875         return err;
876 }
877
878 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
879 {
880         struct p_uuids p;
881         int i;
882
883         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
884                 return 0;
885
886         for (i = UI_CURRENT; i < UI_SIZE; i++)
887                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
888
889         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
890         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
891         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
892         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
893         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
894         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
895
896         put_ldev(mdev);
897
898         return drbd_send_cmd(mdev, &mdev->tconn->data, P_UUIDS, &p.head, sizeof(p));
899 }
900
901 int drbd_send_uuids(struct drbd_conf *mdev)
902 {
903         return _drbd_send_uuids(mdev, 0);
904 }
905
906 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
907 {
908         return _drbd_send_uuids(mdev, 8);
909 }
910
911 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
912 {
913         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
914                 u64 *uuid = mdev->ldev->md.uuid;
915                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
916                      text,
917                      (unsigned long long)uuid[UI_CURRENT],
918                      (unsigned long long)uuid[UI_BITMAP],
919                      (unsigned long long)uuid[UI_HISTORY_START],
920                      (unsigned long long)uuid[UI_HISTORY_END]);
921                 put_ldev(mdev);
922         } else {
923                 dev_info(DEV, "%s effective data uuid: %016llX\n",
924                                 text,
925                                 (unsigned long long)mdev->ed_uuid);
926         }
927 }
928
929 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
930 {
931         struct p_rs_uuid p;
932         u64 uuid;
933
934         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
935
936         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
937         drbd_uuid_set(mdev, UI_BITMAP, uuid);
938         drbd_print_uuids(mdev, "updated sync UUID");
939         drbd_md_sync(mdev);
940         p.uuid = cpu_to_be64(uuid);
941
942         drbd_send_cmd(mdev, &mdev->tconn->data, P_SYNC_UUID, &p.head, sizeof(p));
943 }
944
945 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
946 {
947         struct p_sizes p;
948         sector_t d_size, u_size;
949         int q_order_type, max_bio_size;
950
951         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
952                 D_ASSERT(mdev->ldev->backing_bdev);
953                 d_size = drbd_get_max_capacity(mdev->ldev);
954                 u_size = mdev->ldev->dc.disk_size;
955                 q_order_type = drbd_queue_order_type(mdev);
956                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
957                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
958                 put_ldev(mdev);
959         } else {
960                 d_size = 0;
961                 u_size = 0;
962                 q_order_type = QUEUE_ORDERED_NONE;
963                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
964         }
965
966         p.d_size = cpu_to_be64(d_size);
967         p.u_size = cpu_to_be64(u_size);
968         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
969         p.max_bio_size = cpu_to_be32(max_bio_size);
970         p.queue_order_type = cpu_to_be16(q_order_type);
971         p.dds_flags = cpu_to_be16(flags);
972
973         return drbd_send_cmd(mdev, &mdev->tconn->data, P_SIZES, &p.head, sizeof(p));
974 }
975
976 /**
977  * drbd_send_state() - Sends the drbd state to the peer
978  * @mdev:       DRBD device.
979  */
980 int drbd_send_state(struct drbd_conf *mdev)
981 {
982         struct drbd_socket *sock;
983         struct p_state p;
984         int err = -EIO;
985
986         mutex_lock(&mdev->tconn->data.mutex);
987
988         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
989         sock = &mdev->tconn->data;
990
991         if (likely(sock->socket != NULL))
992                 err = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
993
994         mutex_unlock(&mdev->tconn->data.mutex);
995
996         return err;
997 }
998
999 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
1000                          union drbd_state mask, union drbd_state val)
1001 {
1002         struct p_req_state p;
1003
1004         p.mask    = cpu_to_be32(mask.i);
1005         p.val     = cpu_to_be32(val.i);
1006
1007         return conn_send_cmd(tconn, vnr, &tconn->data, cmd, &p.head, sizeof(p));
1008 }
1009
1010 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1011 {
1012         struct p_req_state_reply p;
1013
1014         p.retcode    = cpu_to_be32(retcode);
1015
1016         drbd_send_cmd(mdev, &mdev->tconn->meta, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1017 }
1018
1019 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1020 {
1021         struct p_req_state_reply p;
1022         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1023
1024         p.retcode    = cpu_to_be32(retcode);
1025
1026         return !conn_send_cmd(tconn, 0, &tconn->meta, cmd, &p.head, sizeof(p));
1027 }
1028
1029 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1030 {
1031         BUG_ON(code & ~0xf);
1032         p->encoding = (p->encoding & ~0xf) | code;
1033 }
1034
1035 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1036 {
1037         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1038 }
1039
1040 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1041 {
1042         BUG_ON(n & ~0x7);
1043         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1044 }
1045
1046 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1047         struct p_compressed_bm *p,
1048         struct bm_xfer_ctx *c)
1049 {
1050         struct bitstream bs;
1051         unsigned long plain_bits;
1052         unsigned long tmp;
1053         unsigned long rl;
1054         unsigned len;
1055         unsigned toggle;
1056         int bits;
1057
1058         /* may we use this feature? */
1059         if ((mdev->tconn->net_conf->use_rle == 0) ||
1060                 (mdev->tconn->agreed_pro_version < 90))
1061                         return 0;
1062
1063         if (c->bit_offset >= c->bm_bits)
1064                 return 0; /* nothing to do. */
1065
1066         /* use at most thus many bytes */
1067         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1068         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1069         /* plain bits covered in this code string */
1070         plain_bits = 0;
1071
1072         /* p->encoding & 0x80 stores whether the first run length is set.
1073          * bit offset is implicit.
1074          * start with toggle == 2 to be able to tell the first iteration */
1075         toggle = 2;
1076
1077         /* see how much plain bits we can stuff into one packet
1078          * using RLE and VLI. */
1079         do {
1080                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1081                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1082                 if (tmp == -1UL)
1083                         tmp = c->bm_bits;
1084                 rl = tmp - c->bit_offset;
1085
1086                 if (toggle == 2) { /* first iteration */
1087                         if (rl == 0) {
1088                                 /* the first checked bit was set,
1089                                  * store start value, */
1090                                 dcbp_set_start(p, 1);
1091                                 /* but skip encoding of zero run length */
1092                                 toggle = !toggle;
1093                                 continue;
1094                         }
1095                         dcbp_set_start(p, 0);
1096                 }
1097
1098                 /* paranoia: catch zero runlength.
1099                  * can only happen if bitmap is modified while we scan it. */
1100                 if (rl == 0) {
1101                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1102                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1103                         return -1;
1104                 }
1105
1106                 bits = vli_encode_bits(&bs, rl);
1107                 if (bits == -ENOBUFS) /* buffer full */
1108                         break;
1109                 if (bits <= 0) {
1110                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1111                         return 0;
1112                 }
1113
1114                 toggle = !toggle;
1115                 plain_bits += rl;
1116                 c->bit_offset = tmp;
1117         } while (c->bit_offset < c->bm_bits);
1118
1119         len = bs.cur.b - p->code + !!bs.cur.bit;
1120
1121         if (plain_bits < (len << 3)) {
1122                 /* incompressible with this method.
1123                  * we need to rewind both word and bit position. */
1124                 c->bit_offset -= plain_bits;
1125                 bm_xfer_ctx_bit_to_word_offset(c);
1126                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1127                 return 0;
1128         }
1129
1130         /* RLE + VLI was able to compress it just fine.
1131          * update c->word_offset. */
1132         bm_xfer_ctx_bit_to_word_offset(c);
1133
1134         /* store pad_bits */
1135         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1136
1137         return len;
1138 }
1139
1140 /**
1141  * send_bitmap_rle_or_plain
1142  *
1143  * Return 0 when done, 1 when another iteration is needed, and a negative error
1144  * code upon failure.
1145  */
1146 static int
1147 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1148 {
1149         struct p_compressed_bm *p = mdev->tconn->data.sbuf;
1150         unsigned long num_words;
1151         int len, err;
1152
1153         len = fill_bitmap_rle_bits(mdev, p, c);
1154
1155         if (len < 0)
1156                 return -EIO;
1157
1158         if (len) {
1159                 dcbp_set_code(p, RLE_VLI_Bits);
1160                 err = _drbd_send_cmd(mdev, &mdev->tconn->data,
1161                                      P_COMPRESSED_BITMAP, &p->head,
1162                                      sizeof(*p) + len, 0);
1163
1164                 c->packets[0]++;
1165                 c->bytes[0] += sizeof(*p) + len;
1166
1167                 if (c->bit_offset >= c->bm_bits)
1168                         len = 0; /* DONE */
1169         } else {
1170                 /* was not compressible.
1171                  * send a buffer full of plain text bits instead. */
1172                 struct p_header *h = mdev->tconn->data.sbuf;
1173                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1174                 len = num_words * sizeof(long);
1175                 if (len)
1176                         drbd_bm_get_lel(mdev, c->word_offset, num_words,
1177                                         (unsigned long *)h->payload);
1178                 err = _drbd_send_cmd(mdev, &mdev->tconn->data, P_BITMAP,
1179                                      h, sizeof(struct p_header80) + len, 0);
1180                 c->word_offset += num_words;
1181                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1182
1183                 c->packets[1]++;
1184                 c->bytes[1] += sizeof(struct p_header80) + len;
1185
1186                 if (c->bit_offset > c->bm_bits)
1187                         c->bit_offset = c->bm_bits;
1188         }
1189         if (!err) {
1190                 if (len == 0) {
1191                         INFO_bm_xfer_stats(mdev, "send", c);
1192                         return 0;
1193                 } else
1194                         return 1;
1195         }
1196         return -EIO;
1197 }
1198
1199 /* See the comment at receive_bitmap() */
1200 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1201 {
1202         struct bm_xfer_ctx c;
1203         int err;
1204
1205         if (!expect(mdev->bitmap))
1206                 return false;
1207
1208         if (get_ldev(mdev)) {
1209                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1210                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1211                         drbd_bm_set_all(mdev);
1212                         if (drbd_bm_write(mdev)) {
1213                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1214                                  * but otherwise process as per normal - need to tell other
1215                                  * side that a full resync is required! */
1216                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1217                         } else {
1218                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1219                                 drbd_md_sync(mdev);
1220                         }
1221                 }
1222                 put_ldev(mdev);
1223         }
1224
1225         c = (struct bm_xfer_ctx) {
1226                 .bm_bits = drbd_bm_bits(mdev),
1227                 .bm_words = drbd_bm_words(mdev),
1228         };
1229
1230         do {
1231                 err = send_bitmap_rle_or_plain(mdev, &c);
1232         } while (err > 0);
1233
1234         return err == 0;
1235 }
1236
1237 int drbd_send_bitmap(struct drbd_conf *mdev)
1238 {
1239         int err;
1240
1241         if (drbd_get_data_sock(mdev->tconn))
1242                 return -1;
1243         err = !_drbd_send_bitmap(mdev);
1244         drbd_put_data_sock(mdev->tconn);
1245         return err;
1246 }
1247 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1248 {
1249         struct p_barrier_ack p;
1250
1251         p.barrier  = barrier_nr;
1252         p.set_size = cpu_to_be32(set_size);
1253
1254         if (mdev->state.conn >= C_CONNECTED)
1255                 drbd_send_cmd(mdev, &mdev->tconn->meta, P_BARRIER_ACK, &p.head, sizeof(p));
1256 }
1257
1258 /**
1259  * _drbd_send_ack() - Sends an ack packet
1260  * @mdev:       DRBD device.
1261  * @cmd:        Packet command code.
1262  * @sector:     sector, needs to be in big endian byte order
1263  * @blksize:    size in byte, needs to be in big endian byte order
1264  * @block_id:   Id, big endian byte order
1265  */
1266 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1267                           u64 sector, u32 blksize, u64 block_id)
1268 {
1269         struct p_block_ack p;
1270
1271         p.sector   = sector;
1272         p.block_id = block_id;
1273         p.blksize  = blksize;
1274         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1275
1276         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1277                 return -EIO;
1278         return drbd_send_cmd(mdev, &mdev->tconn->meta, cmd, &p.head, sizeof(p));
1279 }
1280
1281 /* dp->sector and dp->block_id already/still in network byte order,
1282  * data_size is payload size according to dp->head,
1283  * and may need to be corrected for digest size. */
1284 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1285                       struct p_data *dp, int data_size)
1286 {
1287         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1288                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1289         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1290                        dp->block_id);
1291 }
1292
1293 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1294                       struct p_block_req *rp)
1295 {
1296         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1297 }
1298
1299 /**
1300  * drbd_send_ack() - Sends an ack packet
1301  * @mdev:       DRBD device
1302  * @cmd:        packet command code
1303  * @peer_req:   peer request
1304  */
1305 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1306                   struct drbd_peer_request *peer_req)
1307 {
1308         return _drbd_send_ack(mdev, cmd,
1309                               cpu_to_be64(peer_req->i.sector),
1310                               cpu_to_be32(peer_req->i.size),
1311                               peer_req->block_id);
1312 }
1313
1314 /* This function misuses the block_id field to signal if the blocks
1315  * are is sync or not. */
1316 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1317                      sector_t sector, int blksize, u64 block_id)
1318 {
1319         return _drbd_send_ack(mdev, cmd,
1320                               cpu_to_be64(sector),
1321                               cpu_to_be32(blksize),
1322                               cpu_to_be64(block_id));
1323 }
1324
1325 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1326                        sector_t sector, int size, u64 block_id)
1327 {
1328         struct p_block_req p;
1329
1330         p.sector   = cpu_to_be64(sector);
1331         p.block_id = block_id;
1332         p.blksize  = cpu_to_be32(size);
1333
1334         return drbd_send_cmd(mdev, &mdev->tconn->data, cmd, &p.head, sizeof(p));
1335 }
1336
1337 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1338                             void *digest, int digest_size, enum drbd_packet cmd)
1339 {
1340         int err;
1341         struct p_block_req p;
1342
1343         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1344         p.sector   = cpu_to_be64(sector);
1345         p.block_id = ID_SYNCER /* unused */;
1346         p.blksize  = cpu_to_be32(size);
1347
1348         mutex_lock(&mdev->tconn->data.mutex);
1349         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0);
1350         if (!err)
1351                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0);
1352         mutex_unlock(&mdev->tconn->data.mutex);
1353         return err;
1354 }
1355
1356 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1357 {
1358         struct p_block_req p;
1359
1360         p.sector   = cpu_to_be64(sector);
1361         p.block_id = ID_SYNCER /* unused */;
1362         p.blksize  = cpu_to_be32(size);
1363
1364         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OV_REQUEST, &p.head, sizeof(p));
1365 }
1366
1367 /* called on sndtimeo
1368  * returns false if we should retry,
1369  * true if we think connection is dead
1370  */
1371 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1372 {
1373         int drop_it;
1374         /* long elapsed = (long)(jiffies - mdev->last_received); */
1375
1376         drop_it =   tconn->meta.socket == sock
1377                 || !tconn->asender.task
1378                 || get_t_state(&tconn->asender) != RUNNING
1379                 || tconn->cstate < C_WF_REPORT_PARAMS;
1380
1381         if (drop_it)
1382                 return true;
1383
1384         drop_it = !--tconn->ko_count;
1385         if (!drop_it) {
1386                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1387                          current->comm, current->pid, tconn->ko_count);
1388                 request_ping(tconn);
1389         }
1390
1391         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1392 }
1393
1394 static void drbd_update_congested(struct drbd_tconn *tconn)
1395 {
1396         struct sock *sk = tconn->data.socket->sk;
1397         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1398                 set_bit(NET_CONGESTED, &tconn->flags);
1399 }
1400
1401 /* The idea of sendpage seems to be to put some kind of reference
1402  * to the page into the skb, and to hand it over to the NIC. In
1403  * this process get_page() gets called.
1404  *
1405  * As soon as the page was really sent over the network put_page()
1406  * gets called by some part of the network layer. [ NIC driver? ]
1407  *
1408  * [ get_page() / put_page() increment/decrement the count. If count
1409  *   reaches 0 the page will be freed. ]
1410  *
1411  * This works nicely with pages from FSs.
1412  * But this means that in protocol A we might signal IO completion too early!
1413  *
1414  * In order not to corrupt data during a resync we must make sure
1415  * that we do not reuse our own buffer pages (EEs) to early, therefore
1416  * we have the net_ee list.
1417  *
1418  * XFS seems to have problems, still, it submits pages with page_count == 0!
1419  * As a workaround, we disable sendpage on pages
1420  * with page_count == 0 or PageSlab.
1421  */
1422 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1423                               int offset, size_t size, unsigned msg_flags)
1424 {
1425         struct socket *socket;
1426         void *addr;
1427         int err;
1428
1429         socket = mdev->tconn->data.socket;
1430         addr = kmap(page) + offset;
1431         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1432         kunmap(page);
1433         if (!err)
1434                 mdev->send_cnt += size >> 9;
1435         return err;
1436 }
1437
1438 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1439                     int offset, size_t size, unsigned msg_flags)
1440 {
1441         struct socket *socket = mdev->tconn->data.socket;
1442         mm_segment_t oldfs = get_fs();
1443         int len = size;
1444         int err = -EIO;
1445
1446         /* e.g. XFS meta- & log-data is in slab pages, which have a
1447          * page_count of 0 and/or have PageSlab() set.
1448          * we cannot use send_page for those, as that does get_page();
1449          * put_page(); and would cause either a VM_BUG directly, or
1450          * __page_cache_release a page that would actually still be referenced
1451          * by someone, leading to some obscure delayed Oops somewhere else. */
1452         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1453                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1454
1455         msg_flags |= MSG_NOSIGNAL;
1456         drbd_update_congested(mdev->tconn);
1457         set_fs(KERNEL_DS);
1458         do {
1459                 int sent;
1460
1461                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1462                 if (sent <= 0) {
1463                         if (sent == -EAGAIN) {
1464                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1465                                         break;
1466                                 continue;
1467                         }
1468                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1469                              __func__, (int)size, len, sent);
1470                         if (sent < 0)
1471                                 err = sent;
1472                         break;
1473                 }
1474                 len    -= sent;
1475                 offset += sent;
1476         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1477         set_fs(oldfs);
1478         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1479
1480         if (len == 0) {
1481                 err = 0;
1482                 mdev->send_cnt += size >> 9;
1483         }
1484         return err;
1485 }
1486
1487 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1488 {
1489         struct bio_vec *bvec;
1490         int i;
1491         /* hint all but last page with MSG_MORE */
1492         __bio_for_each_segment(bvec, bio, i, 0) {
1493                 int err;
1494
1495                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1496                                          bvec->bv_offset, bvec->bv_len,
1497                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1498                 if (err)
1499                         return err;
1500         }
1501         return 0;
1502 }
1503
1504 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1505 {
1506         struct bio_vec *bvec;
1507         int i;
1508         /* hint all but last page with MSG_MORE */
1509         __bio_for_each_segment(bvec, bio, i, 0) {
1510                 int err;
1511
1512                 err = _drbd_send_page(mdev, bvec->bv_page,
1513                                       bvec->bv_offset, bvec->bv_len,
1514                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1515                 if (err)
1516                         return err;
1517         }
1518         return 0;
1519 }
1520
1521 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1522                             struct drbd_peer_request *peer_req)
1523 {
1524         struct page *page = peer_req->pages;
1525         unsigned len = peer_req->i.size;
1526         int err;
1527
1528         /* hint all but last page with MSG_MORE */
1529         page_chain_for_each(page) {
1530                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1531
1532                 err = _drbd_send_page(mdev, page, 0, l,
1533                                       page_chain_next(page) ? MSG_MORE : 0);
1534                 if (err)
1535                         return err;
1536                 len -= l;
1537         }
1538         return 0;
1539 }
1540
1541 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1542 {
1543         if (mdev->tconn->agreed_pro_version >= 95)
1544                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1545                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1546                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1547                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1548         else
1549                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1550 }
1551
1552 /* Used to send write requests
1553  * R_PRIMARY -> Peer    (P_DATA)
1554  */
1555 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1556 {
1557         int err;
1558         struct p_data p;
1559         unsigned int dp_flags = 0;
1560         void *dgb;
1561         int dgs;
1562
1563         err = drbd_get_data_sock(mdev->tconn);
1564         if (err)
1565                 return err;
1566
1567         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1568                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1569
1570         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1571         p.sector   = cpu_to_be64(req->i.sector);
1572         p.block_id = (unsigned long)req;
1573         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1574
1575         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1576
1577         if (mdev->state.conn >= C_SYNC_SOURCE &&
1578             mdev->state.conn <= C_PAUSED_SYNC_T)
1579                 dp_flags |= DP_MAY_SET_IN_SYNC;
1580
1581         p.dp_flags = cpu_to_be32(dp_flags);
1582         set_bit(UNPLUG_REMOTE, &mdev->flags);
1583         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1584                             sizeof(p), dgs ? MSG_MORE : 0);
1585         if (!err && dgs) {
1586                 dgb = mdev->tconn->int_dig_out;
1587                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1588                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1589         }
1590         if (!err) {
1591                 /* For protocol A, we have to memcpy the payload into
1592                  * socket buffers, as we may complete right away
1593                  * as soon as we handed it over to tcp, at which point the data
1594                  * pages may become invalid.
1595                  *
1596                  * For data-integrity enabled, we copy it as well, so we can be
1597                  * sure that even if the bio pages may still be modified, it
1598                  * won't change the data on the wire, thus if the digest checks
1599                  * out ok after sending on this side, but does not fit on the
1600                  * receiving side, we sure have detected corruption elsewhere.
1601                  */
1602                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1603                         err = _drbd_send_bio(mdev, req->master_bio);
1604                 else
1605                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1606
1607                 /* double check digest, sometimes buffers have been modified in flight. */
1608                 if (dgs > 0 && dgs <= 64) {
1609                         /* 64 byte, 512 bit, is the largest digest size
1610                          * currently supported in kernel crypto. */
1611                         unsigned char digest[64];
1612                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1613                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1614                                 dev_warn(DEV,
1615                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1616                                         (unsigned long long)req->i.sector, req->i.size);
1617                         }
1618                 } /* else if (dgs > 64) {
1619                      ... Be noisy about digest too large ...
1620                 } */
1621         }
1622
1623         drbd_put_data_sock(mdev->tconn);
1624
1625         return err;
1626 }
1627
1628 /* answer packet, used to send data back for read requests:
1629  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1630  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1631  */
1632 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1633                     struct drbd_peer_request *peer_req)
1634 {
1635         int err;
1636         struct p_data p;
1637         void *dgb;
1638         int dgs;
1639
1640         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1641                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1642
1643         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1644                                            sizeof(struct p_header80) +
1645                                            dgs + peer_req->i.size);
1646         p.sector   = cpu_to_be64(peer_req->i.sector);
1647         p.block_id = peer_req->block_id;
1648         p.seq_num = 0;  /* unused */
1649
1650         /* Only called by our kernel thread.
1651          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1652          * in response to admin command or module unload.
1653          */
1654         err = drbd_get_data_sock(mdev->tconn);
1655         if (err)
1656                 return err;
1657         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1658                             sizeof(p), dgs ? MSG_MORE : 0);
1659         if (!err && dgs) {
1660                 dgb = mdev->tconn->int_dig_out;
1661                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1662                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb,
1663                                     dgs, 0);
1664         }
1665         if (!err)
1666                 err = _drbd_send_zc_ee(mdev, peer_req);
1667         drbd_put_data_sock(mdev->tconn);
1668
1669         return err;
1670 }
1671
1672 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1673 {
1674         struct p_block_desc p;
1675
1676         p.sector  = cpu_to_be64(req->i.sector);
1677         p.blksize = cpu_to_be32(req->i.size);
1678
1679         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OUT_OF_SYNC, &p.head, sizeof(p));
1680 }
1681
1682 /*
1683   drbd_send distinguishes two cases:
1684
1685   Packets sent via the data socket "sock"
1686   and packets sent via the meta data socket "msock"
1687
1688                     sock                      msock
1689   -----------------+-------------------------+------------------------------
1690   timeout           conf.timeout / 2          conf.timeout / 2
1691   timeout action    send a ping via msock     Abort communication
1692                                               and close all sockets
1693 */
1694
1695 /*
1696  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1697  */
1698 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1699               void *buf, size_t size, unsigned msg_flags)
1700 {
1701         struct kvec iov;
1702         struct msghdr msg;
1703         int rv, sent = 0;
1704
1705         if (!sock)
1706                 return -EBADR;
1707
1708         /* THINK  if (signal_pending) return ... ? */
1709
1710         iov.iov_base = buf;
1711         iov.iov_len  = size;
1712
1713         msg.msg_name       = NULL;
1714         msg.msg_namelen    = 0;
1715         msg.msg_control    = NULL;
1716         msg.msg_controllen = 0;
1717         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1718
1719         if (sock == tconn->data.socket) {
1720                 tconn->ko_count = tconn->net_conf->ko_count;
1721                 drbd_update_congested(tconn);
1722         }
1723         do {
1724                 /* STRANGE
1725                  * tcp_sendmsg does _not_ use its size parameter at all ?
1726                  *
1727                  * -EAGAIN on timeout, -EINTR on signal.
1728                  */
1729 /* THINK
1730  * do we need to block DRBD_SIG if sock == &meta.socket ??
1731  * otherwise wake_asender() might interrupt some send_*Ack !
1732  */
1733                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1734                 if (rv == -EAGAIN) {
1735                         if (we_should_drop_the_connection(tconn, sock))
1736                                 break;
1737                         else
1738                                 continue;
1739                 }
1740                 if (rv == -EINTR) {
1741                         flush_signals(current);
1742                         rv = 0;
1743                 }
1744                 if (rv < 0)
1745                         break;
1746                 sent += rv;
1747                 iov.iov_base += rv;
1748                 iov.iov_len  -= rv;
1749         } while (sent < size);
1750
1751         if (sock == tconn->data.socket)
1752                 clear_bit(NET_CONGESTED, &tconn->flags);
1753
1754         if (rv <= 0) {
1755                 if (rv != -EAGAIN) {
1756                         conn_err(tconn, "%s_sendmsg returned %d\n",
1757                                  sock == tconn->meta.socket ? "msock" : "sock",
1758                                  rv);
1759                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1760                 } else
1761                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1762         }
1763
1764         return sent;
1765 }
1766
1767 /**
1768  * drbd_send_all  -  Send an entire buffer
1769  *
1770  * Returns 0 upon success and a negative error value otherwise.
1771  */
1772 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1773                   size_t size, unsigned msg_flags)
1774 {
1775         int err;
1776
1777         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1778         if (err < 0)
1779                 return err;
1780         if (err != size)
1781                 return -EIO;
1782         return 0;
1783 }
1784
1785 static int drbd_open(struct block_device *bdev, fmode_t mode)
1786 {
1787         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1788         unsigned long flags;
1789         int rv = 0;
1790
1791         mutex_lock(&drbd_main_mutex);
1792         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1793         /* to have a stable mdev->state.role
1794          * and no race with updating open_cnt */
1795
1796         if (mdev->state.role != R_PRIMARY) {
1797                 if (mode & FMODE_WRITE)
1798                         rv = -EROFS;
1799                 else if (!allow_oos)
1800                         rv = -EMEDIUMTYPE;
1801         }
1802
1803         if (!rv)
1804                 mdev->open_cnt++;
1805         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1806         mutex_unlock(&drbd_main_mutex);
1807
1808         return rv;
1809 }
1810
1811 static int drbd_release(struct gendisk *gd, fmode_t mode)
1812 {
1813         struct drbd_conf *mdev = gd->private_data;
1814         mutex_lock(&drbd_main_mutex);
1815         mdev->open_cnt--;
1816         mutex_unlock(&drbd_main_mutex);
1817         return 0;
1818 }
1819
1820 static void drbd_set_defaults(struct drbd_conf *mdev)
1821 {
1822         /* Beware! The actual layout differs
1823          * between big endian and little endian */
1824         mdev->state = (union drbd_state) {
1825                 { .role = R_SECONDARY,
1826                   .peer = R_UNKNOWN,
1827                   .conn = C_STANDALONE,
1828                   .disk = D_DISKLESS,
1829                   .pdsk = D_UNKNOWN,
1830                   .susp = 0,
1831                   .susp_nod = 0,
1832                   .susp_fen = 0
1833                 } };
1834 }
1835
1836 void drbd_init_set_defaults(struct drbd_conf *mdev)
1837 {
1838         /* the memset(,0,) did most of this.
1839          * note: only assignments, no allocation in here */
1840
1841         drbd_set_defaults(mdev);
1842
1843         atomic_set(&mdev->ap_bio_cnt, 0);
1844         atomic_set(&mdev->ap_pending_cnt, 0);
1845         atomic_set(&mdev->rs_pending_cnt, 0);
1846         atomic_set(&mdev->unacked_cnt, 0);
1847         atomic_set(&mdev->local_cnt, 0);
1848         atomic_set(&mdev->pp_in_use_by_net, 0);
1849         atomic_set(&mdev->rs_sect_in, 0);
1850         atomic_set(&mdev->rs_sect_ev, 0);
1851         atomic_set(&mdev->ap_in_flight, 0);
1852
1853         mutex_init(&mdev->md_io_mutex);
1854         mutex_init(&mdev->own_state_mutex);
1855         mdev->state_mutex = &mdev->own_state_mutex;
1856
1857         spin_lock_init(&mdev->al_lock);
1858         spin_lock_init(&mdev->peer_seq_lock);
1859         spin_lock_init(&mdev->epoch_lock);
1860
1861         INIT_LIST_HEAD(&mdev->active_ee);
1862         INIT_LIST_HEAD(&mdev->sync_ee);
1863         INIT_LIST_HEAD(&mdev->done_ee);
1864         INIT_LIST_HEAD(&mdev->read_ee);
1865         INIT_LIST_HEAD(&mdev->net_ee);
1866         INIT_LIST_HEAD(&mdev->resync_reads);
1867         INIT_LIST_HEAD(&mdev->resync_work.list);
1868         INIT_LIST_HEAD(&mdev->unplug_work.list);
1869         INIT_LIST_HEAD(&mdev->go_diskless.list);
1870         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1871         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1872         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1873
1874         mdev->resync_work.cb  = w_resync_timer;
1875         mdev->unplug_work.cb  = w_send_write_hint;
1876         mdev->go_diskless.cb  = w_go_diskless;
1877         mdev->md_sync_work.cb = w_md_sync;
1878         mdev->bm_io_work.w.cb = w_bitmap_io;
1879         mdev->start_resync_work.cb = w_start_resync;
1880
1881         mdev->resync_work.mdev  = mdev;
1882         mdev->unplug_work.mdev  = mdev;
1883         mdev->go_diskless.mdev  = mdev;
1884         mdev->md_sync_work.mdev = mdev;
1885         mdev->bm_io_work.w.mdev = mdev;
1886         mdev->start_resync_work.mdev = mdev;
1887
1888         init_timer(&mdev->resync_timer);
1889         init_timer(&mdev->md_sync_timer);
1890         init_timer(&mdev->start_resync_timer);
1891         init_timer(&mdev->request_timer);
1892         mdev->resync_timer.function = resync_timer_fn;
1893         mdev->resync_timer.data = (unsigned long) mdev;
1894         mdev->md_sync_timer.function = md_sync_timer_fn;
1895         mdev->md_sync_timer.data = (unsigned long) mdev;
1896         mdev->start_resync_timer.function = start_resync_timer_fn;
1897         mdev->start_resync_timer.data = (unsigned long) mdev;
1898         mdev->request_timer.function = request_timer_fn;
1899         mdev->request_timer.data = (unsigned long) mdev;
1900
1901         init_waitqueue_head(&mdev->misc_wait);
1902         init_waitqueue_head(&mdev->state_wait);
1903         init_waitqueue_head(&mdev->ee_wait);
1904         init_waitqueue_head(&mdev->al_wait);
1905         init_waitqueue_head(&mdev->seq_wait);
1906
1907         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1908         mdev->write_ordering = WO_bdev_flush;
1909         mdev->resync_wenr = LC_FREE;
1910         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1911         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1912 }
1913
1914 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1915 {
1916         int i;
1917         if (mdev->tconn->receiver.t_state != NONE)
1918                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1919                                 mdev->tconn->receiver.t_state);
1920
1921         /* no need to lock it, I'm the only thread alive */
1922         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1923                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1924         mdev->al_writ_cnt  =
1925         mdev->bm_writ_cnt  =
1926         mdev->read_cnt     =
1927         mdev->recv_cnt     =
1928         mdev->send_cnt     =
1929         mdev->writ_cnt     =
1930         mdev->p_size       =
1931         mdev->rs_start     =
1932         mdev->rs_total     =
1933         mdev->rs_failed    = 0;
1934         mdev->rs_last_events = 0;
1935         mdev->rs_last_sect_ev = 0;
1936         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1937                 mdev->rs_mark_left[i] = 0;
1938                 mdev->rs_mark_time[i] = 0;
1939         }
1940         D_ASSERT(mdev->tconn->net_conf == NULL);
1941
1942         drbd_set_my_capacity(mdev, 0);
1943         if (mdev->bitmap) {
1944                 /* maybe never allocated. */
1945                 drbd_bm_resize(mdev, 0, 1);
1946                 drbd_bm_cleanup(mdev);
1947         }
1948
1949         drbd_free_resources(mdev);
1950         clear_bit(AL_SUSPENDED, &mdev->flags);
1951
1952         /*
1953          * currently we drbd_init_ee only on module load, so
1954          * we may do drbd_release_ee only on module unload!
1955          */
1956         D_ASSERT(list_empty(&mdev->active_ee));
1957         D_ASSERT(list_empty(&mdev->sync_ee));
1958         D_ASSERT(list_empty(&mdev->done_ee));
1959         D_ASSERT(list_empty(&mdev->read_ee));
1960         D_ASSERT(list_empty(&mdev->net_ee));
1961         D_ASSERT(list_empty(&mdev->resync_reads));
1962         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1963         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1964         D_ASSERT(list_empty(&mdev->resync_work.list));
1965         D_ASSERT(list_empty(&mdev->unplug_work.list));
1966         D_ASSERT(list_empty(&mdev->go_diskless.list));
1967
1968         drbd_set_defaults(mdev);
1969 }
1970
1971
1972 static void drbd_destroy_mempools(void)
1973 {
1974         struct page *page;
1975
1976         while (drbd_pp_pool) {
1977                 page = drbd_pp_pool;
1978                 drbd_pp_pool = (struct page *)page_private(page);
1979                 __free_page(page);
1980                 drbd_pp_vacant--;
1981         }
1982
1983         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1984
1985         if (drbd_md_io_bio_set)
1986                 bioset_free(drbd_md_io_bio_set);
1987         if (drbd_md_io_page_pool)
1988                 mempool_destroy(drbd_md_io_page_pool);
1989         if (drbd_ee_mempool)
1990                 mempool_destroy(drbd_ee_mempool);
1991         if (drbd_request_mempool)
1992                 mempool_destroy(drbd_request_mempool);
1993         if (drbd_ee_cache)
1994                 kmem_cache_destroy(drbd_ee_cache);
1995         if (drbd_request_cache)
1996                 kmem_cache_destroy(drbd_request_cache);
1997         if (drbd_bm_ext_cache)
1998                 kmem_cache_destroy(drbd_bm_ext_cache);
1999         if (drbd_al_ext_cache)
2000                 kmem_cache_destroy(drbd_al_ext_cache);
2001
2002         drbd_md_io_bio_set   = NULL;
2003         drbd_md_io_page_pool = NULL;
2004         drbd_ee_mempool      = NULL;
2005         drbd_request_mempool = NULL;
2006         drbd_ee_cache        = NULL;
2007         drbd_request_cache   = NULL;
2008         drbd_bm_ext_cache    = NULL;
2009         drbd_al_ext_cache    = NULL;
2010
2011         return;
2012 }
2013
2014 static int drbd_create_mempools(void)
2015 {
2016         struct page *page;
2017         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2018         int i;
2019
2020         /* prepare our caches and mempools */
2021         drbd_request_mempool = NULL;
2022         drbd_ee_cache        = NULL;
2023         drbd_request_cache   = NULL;
2024         drbd_bm_ext_cache    = NULL;
2025         drbd_al_ext_cache    = NULL;
2026         drbd_pp_pool         = NULL;
2027         drbd_md_io_page_pool = NULL;
2028         drbd_md_io_bio_set   = NULL;
2029
2030         /* caches */
2031         drbd_request_cache = kmem_cache_create(
2032                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2033         if (drbd_request_cache == NULL)
2034                 goto Enomem;
2035
2036         drbd_ee_cache = kmem_cache_create(
2037                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2038         if (drbd_ee_cache == NULL)
2039                 goto Enomem;
2040
2041         drbd_bm_ext_cache = kmem_cache_create(
2042                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2043         if (drbd_bm_ext_cache == NULL)
2044                 goto Enomem;
2045
2046         drbd_al_ext_cache = kmem_cache_create(
2047                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2048         if (drbd_al_ext_cache == NULL)
2049                 goto Enomem;
2050
2051         /* mempools */
2052         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2053         if (drbd_md_io_bio_set == NULL)
2054                 goto Enomem;
2055
2056         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2057         if (drbd_md_io_page_pool == NULL)
2058                 goto Enomem;
2059
2060         drbd_request_mempool = mempool_create(number,
2061                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2062         if (drbd_request_mempool == NULL)
2063                 goto Enomem;
2064
2065         drbd_ee_mempool = mempool_create(number,
2066                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2067         if (drbd_ee_mempool == NULL)
2068                 goto Enomem;
2069
2070         /* drbd's page pool */
2071         spin_lock_init(&drbd_pp_lock);
2072
2073         for (i = 0; i < number; i++) {
2074                 page = alloc_page(GFP_HIGHUSER);
2075                 if (!page)
2076                         goto Enomem;
2077                 set_page_private(page, (unsigned long)drbd_pp_pool);
2078                 drbd_pp_pool = page;
2079         }
2080         drbd_pp_vacant = number;
2081
2082         return 0;
2083
2084 Enomem:
2085         drbd_destroy_mempools(); /* in case we allocated some */
2086         return -ENOMEM;
2087 }
2088
2089 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2090         void *unused)
2091 {
2092         /* just so we have it.  you never know what interesting things we
2093          * might want to do here some day...
2094          */
2095
2096         return NOTIFY_DONE;
2097 }
2098
2099 static struct notifier_block drbd_notifier = {
2100         .notifier_call = drbd_notify_sys,
2101 };
2102
2103 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2104 {
2105         int rr;
2106
2107         rr = drbd_release_ee(mdev, &mdev->active_ee);
2108         if (rr)
2109                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2110
2111         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2112         if (rr)
2113                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2114
2115         rr = drbd_release_ee(mdev, &mdev->read_ee);
2116         if (rr)
2117                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2118
2119         rr = drbd_release_ee(mdev, &mdev->done_ee);
2120         if (rr)
2121                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2122
2123         rr = drbd_release_ee(mdev, &mdev->net_ee);
2124         if (rr)
2125                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2126 }
2127
2128 /* caution. no locking. */
2129 void drbd_delete_device(unsigned int minor)
2130 {
2131         struct drbd_conf *mdev = minor_to_mdev(minor);
2132
2133         if (!mdev)
2134                 return;
2135
2136         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2137         idr_remove(&minors, minor);
2138         synchronize_rcu();
2139
2140         /* paranoia asserts */
2141         D_ASSERT(mdev->open_cnt == 0);
2142         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2143         /* end paranoia asserts */
2144
2145         del_gendisk(mdev->vdisk);
2146
2147         /* cleanup stuff that may have been allocated during
2148          * device (re-)configuration or state changes */
2149
2150         if (mdev->this_bdev)
2151                 bdput(mdev->this_bdev);
2152
2153         drbd_free_resources(mdev);
2154
2155         drbd_release_ee_lists(mdev);
2156
2157         lc_destroy(mdev->act_log);
2158         lc_destroy(mdev->resync);
2159
2160         kfree(mdev->p_uuid);
2161         /* mdev->p_uuid = NULL; */
2162
2163         /* cleanup the rest that has been
2164          * allocated from drbd_new_device
2165          * and actually free the mdev itself */
2166         drbd_free_mdev(mdev);
2167 }
2168
2169 static void drbd_cleanup(void)
2170 {
2171         unsigned int i;
2172         struct drbd_conf *mdev;
2173
2174         unregister_reboot_notifier(&drbd_notifier);
2175
2176         /* first remove proc,
2177          * drbdsetup uses it's presence to detect
2178          * whether DRBD is loaded.
2179          * If we would get stuck in proc removal,
2180          * but have netlink already deregistered,
2181          * some drbdsetup commands may wait forever
2182          * for an answer.
2183          */
2184         if (drbd_proc)
2185                 remove_proc_entry("drbd", NULL);
2186
2187         drbd_genl_unregister();
2188
2189         idr_for_each_entry(&minors, mdev, i)
2190                 drbd_delete_device(i);
2191         drbd_destroy_mempools();
2192         unregister_blkdev(DRBD_MAJOR, "drbd");
2193
2194         idr_destroy(&minors);
2195
2196         printk(KERN_INFO "drbd: module cleanup done.\n");
2197 }
2198
2199 /**
2200  * drbd_congested() - Callback for pdflush
2201  * @congested_data:     User data
2202  * @bdi_bits:           Bits pdflush is currently interested in
2203  *
2204  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2205  */
2206 static int drbd_congested(void *congested_data, int bdi_bits)
2207 {
2208         struct drbd_conf *mdev = congested_data;
2209         struct request_queue *q;
2210         char reason = '-';
2211         int r = 0;
2212
2213         if (!may_inc_ap_bio(mdev)) {
2214                 /* DRBD has frozen IO */
2215                 r = bdi_bits;
2216                 reason = 'd';
2217                 goto out;
2218         }
2219
2220         if (get_ldev(mdev)) {
2221                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2222                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2223                 put_ldev(mdev);
2224                 if (r)
2225                         reason = 'b';
2226         }
2227
2228         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2229                 r |= (1 << BDI_async_congested);
2230                 reason = reason == 'b' ? 'a' : 'n';
2231         }
2232
2233 out:
2234         mdev->congestion_reason = reason;
2235         return r;
2236 }
2237
2238 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2239 {
2240         sema_init(&wq->s, 0);
2241         spin_lock_init(&wq->q_lock);
2242         INIT_LIST_HEAD(&wq->q);
2243 }
2244
2245 struct drbd_tconn *conn_by_name(const char *name)
2246 {
2247         struct drbd_tconn *tconn;
2248
2249         if (!name || !name[0])
2250                 return NULL;
2251
2252         mutex_lock(&drbd_cfg_mutex);
2253         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2254                 if (!strcmp(tconn->name, name))
2255                         goto found;
2256         }
2257         tconn = NULL;
2258 found:
2259         mutex_unlock(&drbd_cfg_mutex);
2260         return tconn;
2261 }
2262
2263 static int drbd_alloc_socket(struct drbd_socket *socket)
2264 {
2265         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2266         if (!socket->rbuf)
2267                 return -ENOMEM;
2268         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2269         if (!socket->sbuf)
2270                 return -ENOMEM;
2271         return 0;
2272 }
2273
2274 static void drbd_free_socket(struct drbd_socket *socket)
2275 {
2276         free_page((unsigned long) socket->sbuf);
2277         free_page((unsigned long) socket->rbuf);
2278 }
2279
2280 struct drbd_tconn *drbd_new_tconn(const char *name)
2281 {
2282         struct drbd_tconn *tconn;
2283
2284         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2285         if (!tconn)
2286                 return NULL;
2287
2288         tconn->name = kstrdup(name, GFP_KERNEL);
2289         if (!tconn->name)
2290                 goto fail;
2291
2292         if (drbd_alloc_socket(&tconn->data))
2293                 goto fail;
2294         if (drbd_alloc_socket(&tconn->meta))
2295                 goto fail;
2296
2297         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2298                 goto fail;
2299
2300         if (!tl_init(tconn))
2301                 goto fail;
2302
2303         tconn->cstate = C_STANDALONE;
2304         mutex_init(&tconn->cstate_mutex);
2305         spin_lock_init(&tconn->req_lock);
2306         atomic_set(&tconn->net_cnt, 0);
2307         init_waitqueue_head(&tconn->net_cnt_wait);
2308         init_waitqueue_head(&tconn->ping_wait);
2309         idr_init(&tconn->volumes);
2310
2311         drbd_init_workqueue(&tconn->data.work);
2312         mutex_init(&tconn->data.mutex);
2313
2314         drbd_init_workqueue(&tconn->meta.work);
2315         mutex_init(&tconn->meta.mutex);
2316
2317         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2318         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2319         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2320
2321         tconn->res_opts = (struct res_opts) {
2322                 {}, 0, /* cpu_mask */
2323                 DRBD_ON_NO_DATA_DEF, /* on_no_data */
2324         };
2325
2326         mutex_lock(&drbd_cfg_mutex);
2327         list_add_tail(&tconn->all_tconn, &drbd_tconns);
2328         mutex_unlock(&drbd_cfg_mutex);
2329
2330         return tconn;
2331
2332 fail:
2333         tl_cleanup(tconn);
2334         free_cpumask_var(tconn->cpu_mask);
2335         drbd_free_socket(&tconn->meta);
2336         drbd_free_socket(&tconn->data);
2337         kfree(tconn->name);
2338         kfree(tconn);
2339
2340         return NULL;
2341 }
2342
2343 void drbd_free_tconn(struct drbd_tconn *tconn)
2344 {
2345         list_del(&tconn->all_tconn);
2346         idr_destroy(&tconn->volumes);
2347
2348         free_cpumask_var(tconn->cpu_mask);
2349         drbd_free_socket(&tconn->meta);
2350         drbd_free_socket(&tconn->data);
2351         kfree(tconn->name);
2352         kfree(tconn->int_dig_out);
2353         kfree(tconn->int_dig_in);
2354         kfree(tconn->int_dig_vv);
2355         kfree(tconn);
2356 }
2357
2358 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2359 {
2360         struct drbd_conf *mdev;
2361         struct gendisk *disk;
2362         struct request_queue *q;
2363         int vnr_got = vnr;
2364         int minor_got = minor;
2365         enum drbd_ret_code err = ERR_NOMEM;
2366
2367         mdev = minor_to_mdev(minor);
2368         if (mdev)
2369                 return ERR_MINOR_EXISTS;
2370
2371         /* GFP_KERNEL, we are outside of all write-out paths */
2372         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2373         if (!mdev)
2374                 return ERR_NOMEM;
2375
2376         mdev->tconn = tconn;
2377         mdev->minor = minor;
2378         mdev->vnr = vnr;
2379
2380         drbd_init_set_defaults(mdev);
2381
2382         q = blk_alloc_queue(GFP_KERNEL);
2383         if (!q)
2384                 goto out_no_q;
2385         mdev->rq_queue = q;
2386         q->queuedata   = mdev;
2387
2388         disk = alloc_disk(1);
2389         if (!disk)
2390                 goto out_no_disk;
2391         mdev->vdisk = disk;
2392
2393         set_disk_ro(disk, true);
2394
2395         disk->queue = q;
2396         disk->major = DRBD_MAJOR;
2397         disk->first_minor = minor;
2398         disk->fops = &drbd_ops;
2399         sprintf(disk->disk_name, "drbd%d", minor);
2400         disk->private_data = mdev;
2401
2402         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2403         /* we have no partitions. we contain only ourselves. */
2404         mdev->this_bdev->bd_contains = mdev->this_bdev;
2405
2406         q->backing_dev_info.congested_fn = drbd_congested;
2407         q->backing_dev_info.congested_data = mdev;
2408
2409         blk_queue_make_request(q, drbd_make_request);
2410         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2411            This triggers a max_bio_size message upon first attach or connect */
2412         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2413         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2414         blk_queue_merge_bvec(q, drbd_merge_bvec);
2415         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2416
2417         mdev->md_io_page = alloc_page(GFP_KERNEL);
2418         if (!mdev->md_io_page)
2419                 goto out_no_io_page;
2420
2421         if (drbd_bm_init(mdev))
2422                 goto out_no_bitmap;
2423         mdev->read_requests = RB_ROOT;
2424         mdev->write_requests = RB_ROOT;
2425
2426         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2427         if (!mdev->current_epoch)
2428                 goto out_no_epoch;
2429
2430         INIT_LIST_HEAD(&mdev->current_epoch->list);
2431         mdev->epochs = 1;
2432
2433         if (!idr_pre_get(&minors, GFP_KERNEL))
2434                 goto out_no_minor_idr;
2435         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2436                 goto out_no_minor_idr;
2437         if (minor_got != minor) {
2438                 err = ERR_MINOR_EXISTS;
2439                 drbd_msg_put_info("requested minor exists already");
2440                 goto out_idr_remove_minor;
2441         }
2442
2443         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2444                 goto out_idr_remove_minor;
2445         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2446                 goto out_idr_remove_minor;
2447         if (vnr_got != vnr) {
2448                 err = ERR_INVALID_REQUEST;
2449                 drbd_msg_put_info("requested volume exists already");
2450                 goto out_idr_remove_vol;
2451         }
2452         add_disk(disk);
2453
2454         /* inherit the connection state */
2455         mdev->state.conn = tconn->cstate;
2456         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2457                 drbd_connected(vnr, mdev, tconn);
2458
2459         return NO_ERROR;
2460
2461 out_idr_remove_vol:
2462         idr_remove(&tconn->volumes, vnr_got);
2463 out_idr_remove_minor:
2464         idr_remove(&minors, minor_got);
2465         synchronize_rcu();
2466 out_no_minor_idr:
2467         kfree(mdev->current_epoch);
2468 out_no_epoch:
2469         drbd_bm_cleanup(mdev);
2470 out_no_bitmap:
2471         __free_page(mdev->md_io_page);
2472 out_no_io_page:
2473         put_disk(disk);
2474 out_no_disk:
2475         blk_cleanup_queue(q);
2476 out_no_q:
2477         kfree(mdev);
2478         return err;
2479 }
2480
2481 /* counterpart of drbd_new_device.
2482  * last part of drbd_delete_device. */
2483 void drbd_free_mdev(struct drbd_conf *mdev)
2484 {
2485         kfree(mdev->current_epoch);
2486         if (mdev->bitmap) /* should no longer be there. */
2487                 drbd_bm_cleanup(mdev);
2488         __free_page(mdev->md_io_page);
2489         put_disk(mdev->vdisk);
2490         blk_cleanup_queue(mdev->rq_queue);
2491         kfree(mdev);
2492 }
2493
2494
2495 int __init drbd_init(void)
2496 {
2497         int err;
2498
2499         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2500         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2501
2502         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2503                 printk(KERN_ERR
2504                        "drbd: invalid minor_count (%d)\n", minor_count);
2505 #ifdef MODULE
2506                 return -EINVAL;
2507 #else
2508                 minor_count = 8;
2509 #endif
2510         }
2511
2512         err = register_blkdev(DRBD_MAJOR, "drbd");
2513         if (err) {
2514                 printk(KERN_ERR
2515                        "drbd: unable to register block device major %d\n",
2516                        DRBD_MAJOR);
2517                 return err;
2518         }
2519
2520         err = drbd_genl_register();
2521         if (err) {
2522                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2523                 goto fail;
2524         }
2525
2526
2527         register_reboot_notifier(&drbd_notifier);
2528
2529         /*
2530          * allocate all necessary structs
2531          */
2532         err = -ENOMEM;
2533
2534         init_waitqueue_head(&drbd_pp_wait);
2535
2536         drbd_proc = NULL; /* play safe for drbd_cleanup */
2537         idr_init(&minors);
2538
2539         err = drbd_create_mempools();
2540         if (err)
2541                 goto fail;
2542
2543         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2544         if (!drbd_proc) {
2545                 printk(KERN_ERR "drbd: unable to register proc file\n");
2546                 goto fail;
2547         }
2548
2549         rwlock_init(&global_state_lock);
2550         INIT_LIST_HEAD(&drbd_tconns);
2551
2552         printk(KERN_INFO "drbd: initialized. "
2553                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2554                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2555         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2556         printk(KERN_INFO "drbd: registered as block device major %d\n",
2557                 DRBD_MAJOR);
2558
2559         return 0; /* Success! */
2560
2561 fail:
2562         drbd_cleanup();
2563         if (err == -ENOMEM)
2564                 /* currently always the case */
2565                 printk(KERN_ERR "drbd: ran out of memory\n");
2566         else
2567                 printk(KERN_ERR "drbd: initialization failure\n");
2568         return err;
2569 }
2570
2571 void drbd_free_bc(struct drbd_backing_dev *ldev)
2572 {
2573         if (ldev == NULL)
2574                 return;
2575
2576         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2577         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2578
2579         kfree(ldev);
2580 }
2581
2582 void drbd_free_sock(struct drbd_tconn *tconn)
2583 {
2584         if (tconn->data.socket) {
2585                 mutex_lock(&tconn->data.mutex);
2586                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2587                 sock_release(tconn->data.socket);
2588                 tconn->data.socket = NULL;
2589                 mutex_unlock(&tconn->data.mutex);
2590         }
2591         if (tconn->meta.socket) {
2592                 mutex_lock(&tconn->meta.mutex);
2593                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2594                 sock_release(tconn->meta.socket);
2595                 tconn->meta.socket = NULL;
2596                 mutex_unlock(&tconn->meta.mutex);
2597         }
2598 }
2599
2600
2601 void drbd_free_resources(struct drbd_conf *mdev)
2602 {
2603         crypto_free_hash(mdev->tconn->csums_tfm);
2604         mdev->tconn->csums_tfm = NULL;
2605         crypto_free_hash(mdev->tconn->verify_tfm);
2606         mdev->tconn->verify_tfm = NULL;
2607         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2608         mdev->tconn->cram_hmac_tfm = NULL;
2609         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2610         mdev->tconn->integrity_w_tfm = NULL;
2611         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2612         mdev->tconn->integrity_r_tfm = NULL;
2613
2614         drbd_free_sock(mdev->tconn);
2615
2616         __no_warn(local,
2617                   drbd_free_bc(mdev->ldev);
2618                   mdev->ldev = NULL;);
2619 }
2620
2621 /* meta data management */
2622
2623 struct meta_data_on_disk {
2624         u64 la_size;           /* last agreed size. */
2625         u64 uuid[UI_SIZE];   /* UUIDs. */
2626         u64 device_uuid;
2627         u64 reserved_u64_1;
2628         u32 flags;             /* MDF */
2629         u32 magic;
2630         u32 md_size_sect;
2631         u32 al_offset;         /* offset to this block */
2632         u32 al_nr_extents;     /* important for restoring the AL */
2633               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2634         u32 bm_offset;         /* offset to the bitmap, from here */
2635         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2636         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2637         u32 reserved_u32[3];
2638
2639 } __packed;
2640
2641 /**
2642  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2643  * @mdev:       DRBD device.
2644  */
2645 void drbd_md_sync(struct drbd_conf *mdev)
2646 {
2647         struct meta_data_on_disk *buffer;
2648         sector_t sector;
2649         int i;
2650
2651         del_timer(&mdev->md_sync_timer);
2652         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2653         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2654                 return;
2655
2656         /* We use here D_FAILED and not D_ATTACHING because we try to write
2657          * metadata even if we detach due to a disk failure! */
2658         if (!get_ldev_if_state(mdev, D_FAILED))
2659                 return;
2660
2661         mutex_lock(&mdev->md_io_mutex);
2662         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2663         memset(buffer, 0, 512);
2664
2665         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2666         for (i = UI_CURRENT; i < UI_SIZE; i++)
2667                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2668         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2669         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2670
2671         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2672         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2673         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2674         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2675         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2676
2677         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2678         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2679
2680         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2681         sector = mdev->ldev->md.md_offset;
2682
2683         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2684                 /* this was a try anyways ... */
2685                 dev_err(DEV, "meta data update failed!\n");
2686                 drbd_chk_io_error(mdev, 1, true);
2687         }
2688
2689         /* Update mdev->ldev->md.la_size_sect,
2690          * since we updated it on metadata. */
2691         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2692
2693         mutex_unlock(&mdev->md_io_mutex);
2694         put_ldev(mdev);
2695 }
2696
2697 /**
2698  * drbd_md_read() - Reads in the meta data super block
2699  * @mdev:       DRBD device.
2700  * @bdev:       Device from which the meta data should be read in.
2701  *
2702  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2703  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2704  */
2705 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2706 {
2707         struct meta_data_on_disk *buffer;
2708         int i, rv = NO_ERROR;
2709
2710         if (!get_ldev_if_state(mdev, D_ATTACHING))
2711                 return ERR_IO_MD_DISK;
2712
2713         mutex_lock(&mdev->md_io_mutex);
2714         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2715
2716         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2717                 /* NOTE: can't do normal error processing here as this is
2718                    called BEFORE disk is attached */
2719                 dev_err(DEV, "Error while reading metadata.\n");
2720                 rv = ERR_IO_MD_DISK;
2721                 goto err;
2722         }
2723
2724         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2725                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2726                 rv = ERR_MD_INVALID;
2727                 goto err;
2728         }
2729         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2730                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2731                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2732                 rv = ERR_MD_INVALID;
2733                 goto err;
2734         }
2735         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2736                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2737                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2738                 rv = ERR_MD_INVALID;
2739                 goto err;
2740         }
2741         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2742                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2743                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2744                 rv = ERR_MD_INVALID;
2745                 goto err;
2746         }
2747
2748         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2749                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2750                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2751                 rv = ERR_MD_INVALID;
2752                 goto err;
2753         }
2754
2755         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2756         for (i = UI_CURRENT; i < UI_SIZE; i++)
2757                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2758         bdev->md.flags = be32_to_cpu(buffer->flags);
2759         bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2760         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2761
2762         spin_lock_irq(&mdev->tconn->req_lock);
2763         if (mdev->state.conn < C_CONNECTED) {
2764                 int peer;
2765                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2766                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2767                 mdev->peer_max_bio_size = peer;
2768         }
2769         spin_unlock_irq(&mdev->tconn->req_lock);
2770
2771         if (bdev->dc.al_extents < 7)
2772                 bdev->dc.al_extents = 127;
2773
2774  err:
2775         mutex_unlock(&mdev->md_io_mutex);
2776         put_ldev(mdev);
2777
2778         return rv;
2779 }
2780
2781 /**
2782  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2783  * @mdev:       DRBD device.
2784  *
2785  * Call this function if you change anything that should be written to
2786  * the meta-data super block. This function sets MD_DIRTY, and starts a
2787  * timer that ensures that within five seconds you have to call drbd_md_sync().
2788  */
2789 #ifdef DEBUG
2790 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2791 {
2792         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2793                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2794                 mdev->last_md_mark_dirty.line = line;
2795                 mdev->last_md_mark_dirty.func = func;
2796         }
2797 }
2798 #else
2799 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2800 {
2801         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2802                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2803 }
2804 #endif
2805
2806 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2807 {
2808         int i;
2809
2810         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2811                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2812 }
2813
2814 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2815 {
2816         if (idx == UI_CURRENT) {
2817                 if (mdev->state.role == R_PRIMARY)
2818                         val |= 1;
2819                 else
2820                         val &= ~((u64)1);
2821
2822                 drbd_set_ed_uuid(mdev, val);
2823         }
2824
2825         mdev->ldev->md.uuid[idx] = val;
2826         drbd_md_mark_dirty(mdev);
2827 }
2828
2829
2830 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2831 {
2832         if (mdev->ldev->md.uuid[idx]) {
2833                 drbd_uuid_move_history(mdev);
2834                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2835         }
2836         _drbd_uuid_set(mdev, idx, val);
2837 }
2838
2839 /**
2840  * drbd_uuid_new_current() - Creates a new current UUID
2841  * @mdev:       DRBD device.
2842  *
2843  * Creates a new current UUID, and rotates the old current UUID into
2844  * the bitmap slot. Causes an incremental resync upon next connect.
2845  */
2846 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2847 {
2848         u64 val;
2849         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2850
2851         if (bm_uuid)
2852                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2853
2854         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2855
2856         get_random_bytes(&val, sizeof(u64));
2857         _drbd_uuid_set(mdev, UI_CURRENT, val);
2858         drbd_print_uuids(mdev, "new current UUID");
2859         /* get it to stable storage _now_ */
2860         drbd_md_sync(mdev);
2861 }
2862
2863 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2864 {
2865         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2866                 return;
2867
2868         if (val == 0) {
2869                 drbd_uuid_move_history(mdev);
2870                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2871                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2872         } else {
2873                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2874                 if (bm_uuid)
2875                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2876
2877                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2878         }
2879         drbd_md_mark_dirty(mdev);
2880 }
2881
2882 /**
2883  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2884  * @mdev:       DRBD device.
2885  *
2886  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2887  */
2888 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2889 {
2890         int rv = -EIO;
2891
2892         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2893                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2894                 drbd_md_sync(mdev);
2895                 drbd_bm_set_all(mdev);
2896
2897                 rv = drbd_bm_write(mdev);
2898
2899                 if (!rv) {
2900                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2901                         drbd_md_sync(mdev);
2902                 }
2903
2904                 put_ldev(mdev);
2905         }
2906
2907         return rv;
2908 }
2909
2910 /**
2911  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2912  * @mdev:       DRBD device.
2913  *
2914  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2915  */
2916 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2917 {
2918         int rv = -EIO;
2919
2920         drbd_resume_al(mdev);
2921         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2922                 drbd_bm_clear_all(mdev);
2923                 rv = drbd_bm_write(mdev);
2924                 put_ldev(mdev);
2925         }
2926
2927         return rv;
2928 }
2929
2930 static int w_bitmap_io(struct drbd_work *w, int unused)
2931 {
2932         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2933         struct drbd_conf *mdev = w->mdev;
2934         int rv = -EIO;
2935
2936         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2937
2938         if (get_ldev(mdev)) {
2939                 drbd_bm_lock(mdev, work->why, work->flags);
2940                 rv = work->io_fn(mdev);
2941                 drbd_bm_unlock(mdev);
2942                 put_ldev(mdev);
2943         }
2944
2945         clear_bit_unlock(BITMAP_IO, &mdev->flags);
2946         wake_up(&mdev->misc_wait);
2947
2948         if (work->done)
2949                 work->done(mdev, rv);
2950
2951         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2952         work->why = NULL;
2953         work->flags = 0;
2954
2955         return 0;
2956 }
2957
2958 void drbd_ldev_destroy(struct drbd_conf *mdev)
2959 {
2960         lc_destroy(mdev->resync);
2961         mdev->resync = NULL;
2962         lc_destroy(mdev->act_log);
2963         mdev->act_log = NULL;
2964         __no_warn(local,
2965                 drbd_free_bc(mdev->ldev);
2966                 mdev->ldev = NULL;);
2967
2968         clear_bit(GO_DISKLESS, &mdev->flags);
2969 }
2970
2971 static int w_go_diskless(struct drbd_work *w, int unused)
2972 {
2973         struct drbd_conf *mdev = w->mdev;
2974
2975         D_ASSERT(mdev->state.disk == D_FAILED);
2976         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2977          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2978          * the protected members anymore, though, so once put_ldev reaches zero
2979          * again, it will be safe to free them. */
2980         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2981         return 0;
2982 }
2983
2984 void drbd_go_diskless(struct drbd_conf *mdev)
2985 {
2986         D_ASSERT(mdev->state.disk == D_FAILED);
2987         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
2988                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
2989 }
2990
2991 /**
2992  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
2993  * @mdev:       DRBD device.
2994  * @io_fn:      IO callback to be called when bitmap IO is possible
2995  * @done:       callback to be called after the bitmap IO was performed
2996  * @why:        Descriptive text of the reason for doing the IO
2997  *
2998  * While IO on the bitmap happens we freeze application IO thus we ensure
2999  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3000  * called from worker context. It MUST NOT be used while a previous such
3001  * work is still pending!
3002  */
3003 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3004                           int (*io_fn)(struct drbd_conf *),
3005                           void (*done)(struct drbd_conf *, int),
3006                           char *why, enum bm_flag flags)
3007 {
3008         D_ASSERT(current == mdev->tconn->worker.task);
3009
3010         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3011         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3012         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3013         if (mdev->bm_io_work.why)
3014                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3015                         why, mdev->bm_io_work.why);
3016
3017         mdev->bm_io_work.io_fn = io_fn;
3018         mdev->bm_io_work.done = done;
3019         mdev->bm_io_work.why = why;
3020         mdev->bm_io_work.flags = flags;
3021
3022         spin_lock_irq(&mdev->tconn->req_lock);
3023         set_bit(BITMAP_IO, &mdev->flags);
3024         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3025                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3026                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3027         }
3028         spin_unlock_irq(&mdev->tconn->req_lock);
3029 }
3030
3031 /**
3032  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3033  * @mdev:       DRBD device.
3034  * @io_fn:      IO callback to be called when bitmap IO is possible
3035  * @why:        Descriptive text of the reason for doing the IO
3036  *
3037  * freezes application IO while that the actual IO operations runs. This
3038  * functions MAY NOT be called from worker context.
3039  */
3040 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3041                 char *why, enum bm_flag flags)
3042 {
3043         int rv;
3044
3045         D_ASSERT(current != mdev->tconn->worker.task);
3046
3047         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3048                 drbd_suspend_io(mdev);
3049
3050         drbd_bm_lock(mdev, why, flags);
3051         rv = io_fn(mdev);
3052         drbd_bm_unlock(mdev);
3053
3054         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3055                 drbd_resume_io(mdev);
3056
3057         return rv;
3058 }
3059
3060 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3061 {
3062         if ((mdev->ldev->md.flags & flag) != flag) {
3063                 drbd_md_mark_dirty(mdev);
3064                 mdev->ldev->md.flags |= flag;
3065         }
3066 }
3067
3068 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3069 {
3070         if ((mdev->ldev->md.flags & flag) != 0) {
3071                 drbd_md_mark_dirty(mdev);
3072                 mdev->ldev->md.flags &= ~flag;
3073         }
3074 }
3075 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3076 {
3077         return (bdev->md.flags & flag) != 0;
3078 }
3079
3080 static void md_sync_timer_fn(unsigned long data)
3081 {
3082         struct drbd_conf *mdev = (struct drbd_conf *) data;
3083
3084         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3085 }
3086
3087 static int w_md_sync(struct drbd_work *w, int unused)
3088 {
3089         struct drbd_conf *mdev = w->mdev;
3090
3091         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3092 #ifdef DEBUG
3093         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3094                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3095 #endif
3096         drbd_md_sync(mdev);
3097         return 0;
3098 }
3099
3100 const char *cmdname(enum drbd_packet cmd)
3101 {
3102         /* THINK may need to become several global tables
3103          * when we want to support more than
3104          * one PRO_VERSION */
3105         static const char *cmdnames[] = {
3106                 [P_DATA]                = "Data",
3107                 [P_DATA_REPLY]          = "DataReply",
3108                 [P_RS_DATA_REPLY]       = "RSDataReply",
3109                 [P_BARRIER]             = "Barrier",
3110                 [P_BITMAP]              = "ReportBitMap",
3111                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3112                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3113                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3114                 [P_DATA_REQUEST]        = "DataRequest",
3115                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3116                 [P_SYNC_PARAM]          = "SyncParam",
3117                 [P_SYNC_PARAM89]        = "SyncParam89",
3118                 [P_PROTOCOL]            = "ReportProtocol",
3119                 [P_UUIDS]               = "ReportUUIDs",
3120                 [P_SIZES]               = "ReportSizes",
3121                 [P_STATE]               = "ReportState",
3122                 [P_SYNC_UUID]           = "ReportSyncUUID",
3123                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3124                 [P_AUTH_RESPONSE]       = "AuthResponse",
3125                 [P_PING]                = "Ping",
3126                 [P_PING_ACK]            = "PingAck",
3127                 [P_RECV_ACK]            = "RecvAck",
3128                 [P_WRITE_ACK]           = "WriteAck",
3129                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3130                 [P_DISCARD_WRITE]        = "DiscardWrite",
3131                 [P_NEG_ACK]             = "NegAck",
3132                 [P_NEG_DREPLY]          = "NegDReply",
3133                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3134                 [P_BARRIER_ACK]         = "BarrierAck",
3135                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3136                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3137                 [P_OV_REQUEST]          = "OVRequest",
3138                 [P_OV_REPLY]            = "OVReply",
3139                 [P_OV_RESULT]           = "OVResult",
3140                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3141                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3142                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3143                 [P_DELAY_PROBE]         = "DelayProbe",
3144                 [P_OUT_OF_SYNC]         = "OutOfSync",
3145                 [P_RETRY_WRITE]         = "RetryWrite",
3146         };
3147
3148         if (cmd == P_INITIAL_META)
3149                 return "InitialMeta";
3150         if (cmd == P_INITIAL_DATA)
3151                 return "InitialData";
3152         if (cmd == P_HAND_SHAKE)
3153                 return "HandShake";
3154         if (cmd >= ARRAY_SIZE(cmdnames))
3155                 return "Unknown";
3156         return cmdnames[cmd];
3157 }
3158
3159 /**
3160  * drbd_wait_misc  -  wait for a request to make progress
3161  * @mdev:       device associated with the request
3162  * @i:          the struct drbd_interval embedded in struct drbd_request or
3163  *              struct drbd_peer_request
3164  */
3165 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3166 {
3167         struct net_conf *net_conf = mdev->tconn->net_conf;
3168         DEFINE_WAIT(wait);
3169         long timeout;
3170
3171         if (!net_conf)
3172                 return -ETIMEDOUT;
3173         timeout = MAX_SCHEDULE_TIMEOUT;
3174         if (net_conf->ko_count)
3175                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3176
3177         /* Indicate to wake up mdev->misc_wait on progress.  */
3178         i->waiting = true;
3179         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3180         spin_unlock_irq(&mdev->tconn->req_lock);
3181         timeout = schedule_timeout(timeout);
3182         finish_wait(&mdev->misc_wait, &wait);
3183         spin_lock_irq(&mdev->tconn->req_lock);
3184         if (!timeout || mdev->state.conn < C_CONNECTED)
3185                 return -ETIMEDOUT;
3186         if (signal_pending(current))
3187                 return -ERESTARTSYS;
3188         return 0;
3189 }
3190
3191 #ifdef CONFIG_DRBD_FAULT_INJECTION
3192 /* Fault insertion support including random number generator shamelessly
3193  * stolen from kernel/rcutorture.c */
3194 struct fault_random_state {
3195         unsigned long state;
3196         unsigned long count;
3197 };
3198
3199 #define FAULT_RANDOM_MULT 39916801  /* prime */
3200 #define FAULT_RANDOM_ADD        479001701 /* prime */
3201 #define FAULT_RANDOM_REFRESH 10000
3202
3203 /*
3204  * Crude but fast random-number generator.  Uses a linear congruential
3205  * generator, with occasional help from get_random_bytes().
3206  */
3207 static unsigned long
3208 _drbd_fault_random(struct fault_random_state *rsp)
3209 {
3210         long refresh;
3211
3212         if (!rsp->count--) {
3213                 get_random_bytes(&refresh, sizeof(refresh));
3214                 rsp->state += refresh;
3215                 rsp->count = FAULT_RANDOM_REFRESH;
3216         }
3217         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3218         return swahw32(rsp->state);
3219 }
3220
3221 static char *
3222 _drbd_fault_str(unsigned int type) {
3223         static char *_faults[] = {
3224                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3225                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3226                 [DRBD_FAULT_RS_WR] = "Resync write",
3227                 [DRBD_FAULT_RS_RD] = "Resync read",
3228                 [DRBD_FAULT_DT_WR] = "Data write",
3229                 [DRBD_FAULT_DT_RD] = "Data read",
3230                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3231                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3232                 [DRBD_FAULT_AL_EE] = "EE allocation",
3233                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3234         };
3235
3236         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3237 }
3238
3239 unsigned int
3240 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3241 {
3242         static struct fault_random_state rrs = {0, 0};
3243
3244         unsigned int ret = (
3245                 (fault_devs == 0 ||
3246                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3247                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3248
3249         if (ret) {
3250                 fault_count++;
3251
3252                 if (__ratelimit(&drbd_ratelimit_state))
3253                         dev_warn(DEV, "***Simulating %s failure\n",
3254                                 _drbd_fault_str(type));
3255         }
3256
3257         return ret;
3258 }
3259 #endif
3260
3261 const char *drbd_buildtag(void)
3262 {
3263         /* DRBD built from external sources has here a reference to the
3264            git hash of the source code. */
3265
3266         static char buildtag[38] = "\0uilt-in";
3267
3268         if (buildtag[0] == 0) {
3269 #ifdef CONFIG_MODULES
3270                 if (THIS_MODULE != NULL)
3271                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3272                 else
3273 #endif
3274                         buildtag[0] = 'b';
3275         }
3276
3277         return buildtag;
3278 }
3279
3280 module_init(drbd_init)
3281 module_exit(drbd_cleanup)
3282
3283 EXPORT_SYMBOL(drbd_conn_str);
3284 EXPORT_SYMBOL(drbd_role_str);
3285 EXPORT_SYMBOL(drbd_disk_str);
3286 EXPORT_SYMBOL(drbd_set_st_err_str);