drbd: Preallocate one page per drbd_socket as a send buffer
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123 DEFINE_MUTEX(drbd_cfg_mutex);
124
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache;       /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
133
134 /* I do not use a standard mempool, because:
135    1) I want to hand out the pre-allocated objects first.
136    2) I want to be able to interrupt sleeping allocation with a signal.
137    Note: This is a single linked list, the next pointer is the private
138          member of struct page.
139  */
140 struct page *drbd_pp_pool;
141 spinlock_t   drbd_pp_lock;
142 int          drbd_pp_vacant;
143 wait_queue_head_t drbd_pp_wait;
144
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
146
147 static const struct block_device_operations drbd_ops = {
148         .owner =   THIS_MODULE,
149         .open =    drbd_open,
150         .release = drbd_release,
151 };
152
153 static void bio_destructor_drbd(struct bio *bio)
154 {
155         bio_free(bio, drbd_md_io_bio_set);
156 }
157
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
159 {
160         struct bio *bio;
161
162         if (!drbd_md_io_bio_set)
163                 return bio_alloc(gfp_mask, 1);
164
165         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
166         if (!bio)
167                 return NULL;
168         bio->bi_destructor = bio_destructor_drbd;
169         return bio;
170 }
171
172 #ifdef __CHECKER__
173 /* When checking with sparse, and this is an inline function, sparse will
174    give tons of false positives. When this is a real functions sparse works.
175  */
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
177 {
178         int io_allowed;
179
180         atomic_inc(&mdev->local_cnt);
181         io_allowed = (mdev->state.disk >= mins);
182         if (!io_allowed) {
183                 if (atomic_dec_and_test(&mdev->local_cnt))
184                         wake_up(&mdev->misc_wait);
185         }
186         return io_allowed;
187 }
188
189 #endif
190
191 /**
192  * DOC: The transfer log
193  *
194  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196  * of the list. There is always at least one &struct drbd_tl_epoch object.
197  *
198  * Each &struct drbd_tl_epoch has a circular double linked list of requests
199  * attached.
200  */
201 static int tl_init(struct drbd_tconn *tconn)
202 {
203         struct drbd_tl_epoch *b;
204
205         /* during device minor initialization, we may well use GFP_KERNEL */
206         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
207         if (!b)
208                 return 0;
209         INIT_LIST_HEAD(&b->requests);
210         INIT_LIST_HEAD(&b->w.list);
211         b->next = NULL;
212         b->br_number = 4711;
213         b->n_writes = 0;
214         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
215
216         tconn->oldest_tle = b;
217         tconn->newest_tle = b;
218         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_del_init(&b->requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
373                 }
374                 tmp = b->next;
375
376                 if (n_writes) {
377                         if (what == RESEND) {
378                                 b->n_writes = n_writes;
379                                 if (b->w.cb == NULL) {
380                                         b->w.cb = w_send_barrier;
381                                         inc_ap_pending(b->w.mdev);
382                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
383                                 }
384
385                                 drbd_queue_work(&tconn->data.work, &b->w);
386                         }
387                         pn = &b->next;
388                 } else {
389                         if (n_reads)
390                                 list_add(&carry_reads, &b->requests);
391                         /* there could still be requests on that ring list,
392                          * in case local io is still pending */
393                         list_del(&b->requests);
394
395                         /* dec_ap_pending corresponding to queue_barrier.
396                          * the newest barrier may not have been queued yet,
397                          * in which case w.cb is still NULL. */
398                         if (b->w.cb != NULL)
399                                 dec_ap_pending(b->w.mdev);
400
401                         if (b == tconn->newest_tle) {
402                                 /* recycle, but reinit! */
403                                 if (tmp != NULL)
404                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
405                                 INIT_LIST_HEAD(&b->requests);
406                                 list_splice(&carry_reads, &b->requests);
407                                 INIT_LIST_HEAD(&b->w.list);
408                                 b->w.cb = NULL;
409                                 b->br_number = net_random();
410                                 b->n_writes = 0;
411
412                                 *pn = b;
413                                 break;
414                         }
415                         *pn = tmp;
416                         kfree(b);
417                 }
418                 b = tmp;
419                 list_splice(&carry_reads, &b->requests);
420         }
421 }
422
423
424 /**
425  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426  * @mdev:       DRBD device.
427  *
428  * This is called after the connection to the peer was lost. The storage covered
429  * by the requests on the transfer gets marked as our of sync. Called from the
430  * receiver thread and the worker thread.
431  */
432 void tl_clear(struct drbd_tconn *tconn)
433 {
434         struct drbd_conf *mdev;
435         struct list_head *le, *tle;
436         struct drbd_request *r;
437         int vnr;
438
439         spin_lock_irq(&tconn->req_lock);
440
441         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
442
443         /* we expect this list to be empty. */
444         if (!list_empty(&tconn->out_of_sequence_requests))
445                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
446
447         /* but just in case, clean it up anyways! */
448         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449                 r = list_entry(le, struct drbd_request, tl_requests);
450                 /* It would be nice to complete outside of spinlock.
451                  * But this is easier for now. */
452                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
453         }
454
455         /* ensure bit indicating barrier is required is clear */
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458
459         spin_unlock_irq(&tconn->req_lock);
460 }
461
462 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
463 {
464         spin_lock_irq(&tconn->req_lock);
465         _tl_restart(tconn, what);
466         spin_unlock_irq(&tconn->req_lock);
467 }
468
469 static int drbd_thread_setup(void *arg)
470 {
471         struct drbd_thread *thi = (struct drbd_thread *) arg;
472         struct drbd_tconn *tconn = thi->tconn;
473         unsigned long flags;
474         int retval;
475
476         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
477                  thi->name[0], thi->tconn->name);
478
479 restart:
480         retval = thi->function(thi);
481
482         spin_lock_irqsave(&thi->t_lock, flags);
483
484         /* if the receiver has been "EXITING", the last thing it did
485          * was set the conn state to "StandAlone",
486          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
487          * and receiver thread will be "started".
488          * drbd_thread_start needs to set "RESTARTING" in that case.
489          * t_state check and assignment needs to be within the same spinlock,
490          * so either thread_start sees EXITING, and can remap to RESTARTING,
491          * or thread_start see NONE, and can proceed as normal.
492          */
493
494         if (thi->t_state == RESTARTING) {
495                 conn_info(tconn, "Restarting %s thread\n", thi->name);
496                 thi->t_state = RUNNING;
497                 spin_unlock_irqrestore(&thi->t_lock, flags);
498                 goto restart;
499         }
500
501         thi->task = NULL;
502         thi->t_state = NONE;
503         smp_mb();
504         complete(&thi->stop);
505         spin_unlock_irqrestore(&thi->t_lock, flags);
506
507         conn_info(tconn, "Terminating %s\n", current->comm);
508
509         /* Release mod reference taken when thread was started */
510         module_put(THIS_MODULE);
511         return retval;
512 }
513
514 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
515                              int (*func) (struct drbd_thread *), char *name)
516 {
517         spin_lock_init(&thi->t_lock);
518         thi->task    = NULL;
519         thi->t_state = NONE;
520         thi->function = func;
521         thi->tconn = tconn;
522         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
523 }
524
525 int drbd_thread_start(struct drbd_thread *thi)
526 {
527         struct drbd_tconn *tconn = thi->tconn;
528         struct task_struct *nt;
529         unsigned long flags;
530
531         /* is used from state engine doing drbd_thread_stop_nowait,
532          * while holding the req lock irqsave */
533         spin_lock_irqsave(&thi->t_lock, flags);
534
535         switch (thi->t_state) {
536         case NONE:
537                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
538                          thi->name, current->comm, current->pid);
539
540                 /* Get ref on module for thread - this is released when thread exits */
541                 if (!try_module_get(THIS_MODULE)) {
542                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
543                         spin_unlock_irqrestore(&thi->t_lock, flags);
544                         return false;
545                 }
546
547                 init_completion(&thi->stop);
548                 thi->reset_cpu_mask = 1;
549                 thi->t_state = RUNNING;
550                 spin_unlock_irqrestore(&thi->t_lock, flags);
551                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
552
553                 nt = kthread_create(drbd_thread_setup, (void *) thi,
554                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
555
556                 if (IS_ERR(nt)) {
557                         conn_err(tconn, "Couldn't start thread\n");
558
559                         module_put(THIS_MODULE);
560                         return false;
561                 }
562                 spin_lock_irqsave(&thi->t_lock, flags);
563                 thi->task = nt;
564                 thi->t_state = RUNNING;
565                 spin_unlock_irqrestore(&thi->t_lock, flags);
566                 wake_up_process(nt);
567                 break;
568         case EXITING:
569                 thi->t_state = RESTARTING;
570                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
571                                 thi->name, current->comm, current->pid);
572                 /* fall through */
573         case RUNNING:
574         case RESTARTING:
575         default:
576                 spin_unlock_irqrestore(&thi->t_lock, flags);
577                 break;
578         }
579
580         return true;
581 }
582
583
584 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
585 {
586         unsigned long flags;
587
588         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
589
590         /* may be called from state engine, holding the req lock irqsave */
591         spin_lock_irqsave(&thi->t_lock, flags);
592
593         if (thi->t_state == NONE) {
594                 spin_unlock_irqrestore(&thi->t_lock, flags);
595                 if (restart)
596                         drbd_thread_start(thi);
597                 return;
598         }
599
600         if (thi->t_state != ns) {
601                 if (thi->task == NULL) {
602                         spin_unlock_irqrestore(&thi->t_lock, flags);
603                         return;
604                 }
605
606                 thi->t_state = ns;
607                 smp_mb();
608                 init_completion(&thi->stop);
609                 if (thi->task != current)
610                         force_sig(DRBD_SIGKILL, thi->task);
611         }
612
613         spin_unlock_irqrestore(&thi->t_lock, flags);
614
615         if (wait)
616                 wait_for_completion(&thi->stop);
617 }
618
619 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
620 {
621         struct drbd_thread *thi =
622                 task == tconn->receiver.task ? &tconn->receiver :
623                 task == tconn->asender.task  ? &tconn->asender :
624                 task == tconn->worker.task   ? &tconn->worker : NULL;
625
626         return thi;
627 }
628
629 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
630 {
631         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
632         return thi ? thi->name : task->comm;
633 }
634
635 int conn_lowest_minor(struct drbd_tconn *tconn)
636 {
637         int vnr = 0;
638         struct drbd_conf *mdev;
639
640         mdev = idr_get_next(&tconn->volumes, &vnr);
641         if (!mdev)
642                 return -1;
643         return mdev_to_minor(mdev);
644 }
645
646 #ifdef CONFIG_SMP
647 /**
648  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
649  * @mdev:       DRBD device.
650  *
651  * Forces all threads of a device onto the same CPU. This is beneficial for
652  * DRBD's performance. May be overwritten by user's configuration.
653  */
654 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
655 {
656         int ord, cpu;
657
658         /* user override. */
659         if (cpumask_weight(tconn->cpu_mask))
660                 return;
661
662         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
663         for_each_online_cpu(cpu) {
664                 if (ord-- == 0) {
665                         cpumask_set_cpu(cpu, tconn->cpu_mask);
666                         return;
667                 }
668         }
669         /* should not be reached */
670         cpumask_setall(tconn->cpu_mask);
671 }
672
673 /**
674  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
675  * @mdev:       DRBD device.
676  * @thi:        drbd_thread object
677  *
678  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
679  * prematurely.
680  */
681 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
682 {
683         struct task_struct *p = current;
684
685         if (!thi->reset_cpu_mask)
686                 return;
687         thi->reset_cpu_mask = 0;
688         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
689 }
690 #endif
691
692 static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
693 {
694         h->magic   = cpu_to_be32(DRBD_MAGIC);
695         h->command = cpu_to_be16(cmd);
696         h->length  = cpu_to_be16(size);
697 }
698
699 static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
700 {
701         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
702         h->command = cpu_to_be16(cmd);
703         h->length  = cpu_to_be32(size);
704 }
705
706 static void _prepare_header(struct drbd_tconn *tconn, int vnr, struct p_header *h,
707                             enum drbd_packet cmd, int size)
708 {
709         if (tconn->agreed_pro_version >= 95)
710                 prepare_header95(&h->h95, cmd, size);
711         else
712                 prepare_header80(&h->h80, cmd, size);
713 }
714
715 static void prepare_header(struct drbd_conf *mdev, struct p_header *h,
716                            enum drbd_packet cmd, int size)
717 {
718         _prepare_header(mdev->tconn, mdev->vnr, h, cmd, size);
719 }
720
721 /* the appropriate socket mutex must be held already */
722 int _conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct socket *sock,
723                    enum drbd_packet cmd, struct p_header *h, size_t size,
724                    unsigned msg_flags)
725 {
726         int err;
727
728         _prepare_header(tconn, vnr, h, cmd, size - sizeof(struct p_header));
729         err = drbd_send_all(tconn, sock, h, size, msg_flags);
730         if (err && !signal_pending(current))
731                 conn_warn(tconn, "short send %s size=%d\n",
732                           cmdname(cmd), (int)size);
733         return err;
734 }
735
736 /* don't pass the socket. we may only look at it
737  * when we hold the appropriate socket mutex.
738  */
739 int conn_send_cmd(struct drbd_tconn *tconn, int vnr, struct drbd_socket *sock,
740                   enum drbd_packet cmd, struct p_header *h, size_t size)
741 {
742         int err = -EIO;
743
744         mutex_lock(&sock->mutex);
745         if (sock->socket)
746                 err = _conn_send_cmd(tconn, vnr, sock->socket, cmd, h, size, 0);
747         mutex_unlock(&sock->mutex);
748         return err;
749 }
750
751 int conn_send_cmd2(struct drbd_tconn *tconn, enum drbd_packet cmd, char *data,
752                    size_t size)
753 {
754         struct p_header80 h;
755         int err;
756
757         prepare_header80(&h, cmd, size);
758         err = drbd_get_data_sock(tconn);
759         if (!err) {
760                 err = drbd_send_all(tconn, tconn->data.socket, &h, sizeof(h), 0);
761                 if (!err)
762                         err = drbd_send_all(tconn, tconn->data.socket, data, size, 0);
763                 drbd_put_data_sock(tconn);
764         }
765         return err;
766 }
767
768 int drbd_send_ping(struct drbd_tconn *tconn)
769 {
770         struct p_header h;
771         return !conn_send_cmd(tconn, 0, &tconn->meta, P_PING, &h, sizeof(h));
772 }
773
774 int drbd_send_ping_ack(struct drbd_tconn *tconn)
775 {
776         struct p_header h;
777         return !conn_send_cmd(tconn, 0, &tconn->meta, P_PING_ACK, &h, sizeof(h));
778 }
779
780 int drbd_send_sync_param(struct drbd_conf *mdev)
781 {
782         struct p_rs_param_95 *p;
783         struct socket *sock;
784         int size, err;
785         const int apv = mdev->tconn->agreed_pro_version;
786
787         size = apv <= 87 ? sizeof(struct p_rs_param)
788                 : apv == 88 ? sizeof(struct p_rs_param)
789                         + strlen(mdev->tconn->net_conf->verify_alg) + 1
790                 : apv <= 94 ? sizeof(struct p_rs_param_89)
791                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
792
793         /* used from admin command context and receiver/worker context.
794          * to avoid kmalloc, grab the socket right here,
795          * then use the pre-allocated sbuf there */
796         mutex_lock(&mdev->tconn->data.mutex);
797         sock = mdev->tconn->data.socket;
798
799         if (likely(sock != NULL)) {
800                 enum drbd_packet cmd =
801                         apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
802
803                 p = mdev->tconn->data.sbuf;
804
805                 /* initialize verify_alg and csums_alg */
806                 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
807
808                 if (get_ldev(mdev)) {
809                         p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
810                         p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
811                         p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
812                         p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
813                         p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
814                         put_ldev(mdev);
815                 } else {
816                         p->rate = cpu_to_be32(DRBD_RATE_DEF);
817                         p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
818                         p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
819                         p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
820                         p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
821                 }
822
823                 if (apv >= 88)
824                         strcpy(p->verify_alg, mdev->tconn->net_conf->verify_alg);
825                 if (apv >= 89)
826                         strcpy(p->csums_alg, mdev->tconn->net_conf->csums_alg);
827
828                 err = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
829         } else
830                 err = -EIO;
831
832         mutex_unlock(&mdev->tconn->data.mutex);
833
834         return err;
835 }
836
837 int drbd_send_protocol(struct drbd_tconn *tconn)
838 {
839         struct p_protocol *p;
840         int size, cf, err;
841
842         size = sizeof(struct p_protocol);
843
844         if (tconn->agreed_pro_version >= 87)
845                 size += strlen(tconn->net_conf->integrity_alg) + 1;
846
847         /* we must not recurse into our own queue,
848          * as that is blocked during handshake */
849         p = kmalloc(size, GFP_NOIO);
850         if (p == NULL)
851                 return -ENOMEM;
852
853         p->protocol      = cpu_to_be32(tconn->net_conf->wire_protocol);
854         p->after_sb_0p   = cpu_to_be32(tconn->net_conf->after_sb_0p);
855         p->after_sb_1p   = cpu_to_be32(tconn->net_conf->after_sb_1p);
856         p->after_sb_2p   = cpu_to_be32(tconn->net_conf->after_sb_2p);
857         p->two_primaries = cpu_to_be32(tconn->net_conf->two_primaries);
858
859         cf = 0;
860         if (tconn->net_conf->want_lose)
861                 cf |= CF_WANT_LOSE;
862         if (tconn->net_conf->dry_run) {
863                 if (tconn->agreed_pro_version >= 92)
864                         cf |= CF_DRY_RUN;
865                 else {
866                         conn_err(tconn, "--dry-run is not supported by peer");
867                         kfree(p);
868                         return -EOPNOTSUPP;
869                 }
870         }
871         p->conn_flags    = cpu_to_be32(cf);
872
873         if (tconn->agreed_pro_version >= 87)
874                 strcpy(p->integrity_alg, tconn->net_conf->integrity_alg);
875
876         err = conn_send_cmd2(tconn, P_PROTOCOL, p->head.payload, size - sizeof(struct p_header));
877         kfree(p);
878         return err;
879 }
880
881 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
882 {
883         struct p_uuids p;
884         int i;
885
886         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
887                 return 0;
888
889         for (i = UI_CURRENT; i < UI_SIZE; i++)
890                 p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
891
892         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
893         p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
894         uuid_flags |= mdev->tconn->net_conf->want_lose ? 1 : 0;
895         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
896         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
897         p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
898
899         put_ldev(mdev);
900
901         return drbd_send_cmd(mdev, &mdev->tconn->data, P_UUIDS, &p.head, sizeof(p));
902 }
903
904 int drbd_send_uuids(struct drbd_conf *mdev)
905 {
906         return _drbd_send_uuids(mdev, 0);
907 }
908
909 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
910 {
911         return _drbd_send_uuids(mdev, 8);
912 }
913
914 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
915 {
916         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
917                 u64 *uuid = mdev->ldev->md.uuid;
918                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
919                      text,
920                      (unsigned long long)uuid[UI_CURRENT],
921                      (unsigned long long)uuid[UI_BITMAP],
922                      (unsigned long long)uuid[UI_HISTORY_START],
923                      (unsigned long long)uuid[UI_HISTORY_END]);
924                 put_ldev(mdev);
925         } else {
926                 dev_info(DEV, "%s effective data uuid: %016llX\n",
927                                 text,
928                                 (unsigned long long)mdev->ed_uuid);
929         }
930 }
931
932 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
933 {
934         struct p_rs_uuid p;
935         u64 uuid;
936
937         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
938
939         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
940         drbd_uuid_set(mdev, UI_BITMAP, uuid);
941         drbd_print_uuids(mdev, "updated sync UUID");
942         drbd_md_sync(mdev);
943         p.uuid = cpu_to_be64(uuid);
944
945         drbd_send_cmd(mdev, &mdev->tconn->data, P_SYNC_UUID, &p.head, sizeof(p));
946 }
947
948 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
949 {
950         struct p_sizes p;
951         sector_t d_size, u_size;
952         int q_order_type, max_bio_size;
953
954         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
955                 D_ASSERT(mdev->ldev->backing_bdev);
956                 d_size = drbd_get_max_capacity(mdev->ldev);
957                 u_size = mdev->ldev->dc.disk_size;
958                 q_order_type = drbd_queue_order_type(mdev);
959                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
960                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
961                 put_ldev(mdev);
962         } else {
963                 d_size = 0;
964                 u_size = 0;
965                 q_order_type = QUEUE_ORDERED_NONE;
966                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
967         }
968
969         p.d_size = cpu_to_be64(d_size);
970         p.u_size = cpu_to_be64(u_size);
971         p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
972         p.max_bio_size = cpu_to_be32(max_bio_size);
973         p.queue_order_type = cpu_to_be16(q_order_type);
974         p.dds_flags = cpu_to_be16(flags);
975
976         return drbd_send_cmd(mdev, &mdev->tconn->data, P_SIZES, &p.head, sizeof(p));
977 }
978
979 /**
980  * drbd_send_state() - Sends the drbd state to the peer
981  * @mdev:       DRBD device.
982  */
983 int drbd_send_state(struct drbd_conf *mdev)
984 {
985         struct socket *sock;
986         struct p_state p;
987         int err = -EIO;
988
989         mutex_lock(&mdev->tconn->data.mutex);
990
991         p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
992         sock = mdev->tconn->data.socket;
993
994         if (likely(sock != NULL))
995                 err = _drbd_send_cmd(mdev, sock, P_STATE, &p.head, sizeof(p), 0);
996
997         mutex_unlock(&mdev->tconn->data.mutex);
998
999         return err;
1000 }
1001
1002 int _conn_send_state_req(struct drbd_tconn *tconn, int vnr, enum drbd_packet cmd,
1003                          union drbd_state mask, union drbd_state val)
1004 {
1005         struct p_req_state p;
1006
1007         p.mask    = cpu_to_be32(mask.i);
1008         p.val     = cpu_to_be32(val.i);
1009
1010         return conn_send_cmd(tconn, vnr, &tconn->data, cmd, &p.head, sizeof(p));
1011 }
1012
1013 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1014 {
1015         struct p_req_state_reply p;
1016
1017         p.retcode    = cpu_to_be32(retcode);
1018
1019         drbd_send_cmd(mdev, &mdev->tconn->meta, P_STATE_CHG_REPLY, &p.head, sizeof(p));
1020 }
1021
1022 int conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1023 {
1024         struct p_req_state_reply p;
1025         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1026
1027         p.retcode    = cpu_to_be32(retcode);
1028
1029         return !conn_send_cmd(tconn, 0, &tconn->meta, cmd, &p.head, sizeof(p));
1030 }
1031
1032 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1033 {
1034         BUG_ON(code & ~0xf);
1035         p->encoding = (p->encoding & ~0xf) | code;
1036 }
1037
1038 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1039 {
1040         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1041 }
1042
1043 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1044 {
1045         BUG_ON(n & ~0x7);
1046         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1047 }
1048
1049 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1050         struct p_compressed_bm *p,
1051         struct bm_xfer_ctx *c)
1052 {
1053         struct bitstream bs;
1054         unsigned long plain_bits;
1055         unsigned long tmp;
1056         unsigned long rl;
1057         unsigned len;
1058         unsigned toggle;
1059         int bits;
1060
1061         /* may we use this feature? */
1062         if ((mdev->tconn->net_conf->use_rle == 0) ||
1063                 (mdev->tconn->agreed_pro_version < 90))
1064                         return 0;
1065
1066         if (c->bit_offset >= c->bm_bits)
1067                 return 0; /* nothing to do. */
1068
1069         /* use at most thus many bytes */
1070         bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
1071         memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
1072         /* plain bits covered in this code string */
1073         plain_bits = 0;
1074
1075         /* p->encoding & 0x80 stores whether the first run length is set.
1076          * bit offset is implicit.
1077          * start with toggle == 2 to be able to tell the first iteration */
1078         toggle = 2;
1079
1080         /* see how much plain bits we can stuff into one packet
1081          * using RLE and VLI. */
1082         do {
1083                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1084                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1085                 if (tmp == -1UL)
1086                         tmp = c->bm_bits;
1087                 rl = tmp - c->bit_offset;
1088
1089                 if (toggle == 2) { /* first iteration */
1090                         if (rl == 0) {
1091                                 /* the first checked bit was set,
1092                                  * store start value, */
1093                                 dcbp_set_start(p, 1);
1094                                 /* but skip encoding of zero run length */
1095                                 toggle = !toggle;
1096                                 continue;
1097                         }
1098                         dcbp_set_start(p, 0);
1099                 }
1100
1101                 /* paranoia: catch zero runlength.
1102                  * can only happen if bitmap is modified while we scan it. */
1103                 if (rl == 0) {
1104                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1105                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1106                         return -1;
1107                 }
1108
1109                 bits = vli_encode_bits(&bs, rl);
1110                 if (bits == -ENOBUFS) /* buffer full */
1111                         break;
1112                 if (bits <= 0) {
1113                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1114                         return 0;
1115                 }
1116
1117                 toggle = !toggle;
1118                 plain_bits += rl;
1119                 c->bit_offset = tmp;
1120         } while (c->bit_offset < c->bm_bits);
1121
1122         len = bs.cur.b - p->code + !!bs.cur.bit;
1123
1124         if (plain_bits < (len << 3)) {
1125                 /* incompressible with this method.
1126                  * we need to rewind both word and bit position. */
1127                 c->bit_offset -= plain_bits;
1128                 bm_xfer_ctx_bit_to_word_offset(c);
1129                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1130                 return 0;
1131         }
1132
1133         /* RLE + VLI was able to compress it just fine.
1134          * update c->word_offset. */
1135         bm_xfer_ctx_bit_to_word_offset(c);
1136
1137         /* store pad_bits */
1138         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1139
1140         return len;
1141 }
1142
1143 /**
1144  * send_bitmap_rle_or_plain
1145  *
1146  * Return 0 when done, 1 when another iteration is needed, and a negative error
1147  * code upon failure.
1148  */
1149 static int
1150 send_bitmap_rle_or_plain(struct drbd_conf *mdev,
1151                          struct p_header *h, struct bm_xfer_ctx *c)
1152 {
1153         struct p_compressed_bm *p = (void*)h;
1154         unsigned long num_words;
1155         int len, err;
1156
1157         len = fill_bitmap_rle_bits(mdev, p, c);
1158
1159         if (len < 0)
1160                 return -EIO;
1161
1162         if (len) {
1163                 dcbp_set_code(p, RLE_VLI_Bits);
1164                 err = _drbd_send_cmd(mdev, mdev->tconn->data.socket,
1165                                      P_COMPRESSED_BITMAP, h,
1166                                      sizeof(*p) + len, 0);
1167
1168                 c->packets[0]++;
1169                 c->bytes[0] += sizeof(*p) + len;
1170
1171                 if (c->bit_offset >= c->bm_bits)
1172                         len = 0; /* DONE */
1173         } else {
1174                 /* was not compressible.
1175                  * send a buffer full of plain text bits instead. */
1176                 num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
1177                 len = num_words * sizeof(long);
1178                 if (len)
1179                         drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
1180                 err = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BITMAP,
1181                                      h, sizeof(struct p_header80) + len, 0);
1182                 c->word_offset += num_words;
1183                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1184
1185                 c->packets[1]++;
1186                 c->bytes[1] += sizeof(struct p_header80) + len;
1187
1188                 if (c->bit_offset > c->bm_bits)
1189                         c->bit_offset = c->bm_bits;
1190         }
1191         if (!err) {
1192                 if (len == 0) {
1193                         INFO_bm_xfer_stats(mdev, "send", c);
1194                         return 0;
1195                 } else
1196                         return 1;
1197         }
1198         return -EIO;
1199 }
1200
1201 /* See the comment at receive_bitmap() */
1202 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1203 {
1204         struct bm_xfer_ctx c;
1205         struct p_header *p;
1206         int err;
1207
1208         if (!expect(mdev->bitmap))
1209                 return false;
1210
1211         /* maybe we should use some per thread scratch page,
1212          * and allocate that during initial device creation? */
1213         p = (struct p_header *) __get_free_page(GFP_NOIO);
1214         if (!p) {
1215                 dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
1216                 return false;
1217         }
1218
1219         if (get_ldev(mdev)) {
1220                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1221                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1222                         drbd_bm_set_all(mdev);
1223                         if (drbd_bm_write(mdev)) {
1224                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1225                                  * but otherwise process as per normal - need to tell other
1226                                  * side that a full resync is required! */
1227                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1228                         } else {
1229                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1230                                 drbd_md_sync(mdev);
1231                         }
1232                 }
1233                 put_ldev(mdev);
1234         }
1235
1236         c = (struct bm_xfer_ctx) {
1237                 .bm_bits = drbd_bm_bits(mdev),
1238                 .bm_words = drbd_bm_words(mdev),
1239         };
1240
1241         do {
1242                 err = send_bitmap_rle_or_plain(mdev, p, &c);
1243         } while (err > 0);
1244
1245         free_page((unsigned long) p);
1246         return err == 0;
1247 }
1248
1249 int drbd_send_bitmap(struct drbd_conf *mdev)
1250 {
1251         int err;
1252
1253         if (drbd_get_data_sock(mdev->tconn))
1254                 return -1;
1255         err = !_drbd_send_bitmap(mdev);
1256         drbd_put_data_sock(mdev->tconn);
1257         return err;
1258 }
1259 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1260 {
1261         struct p_barrier_ack p;
1262
1263         p.barrier  = barrier_nr;
1264         p.set_size = cpu_to_be32(set_size);
1265
1266         if (mdev->state.conn >= C_CONNECTED)
1267                 drbd_send_cmd(mdev, &mdev->tconn->meta, P_BARRIER_ACK, &p.head, sizeof(p));
1268 }
1269
1270 /**
1271  * _drbd_send_ack() - Sends an ack packet
1272  * @mdev:       DRBD device.
1273  * @cmd:        Packet command code.
1274  * @sector:     sector, needs to be in big endian byte order
1275  * @blksize:    size in byte, needs to be in big endian byte order
1276  * @block_id:   Id, big endian byte order
1277  */
1278 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1279                           u64 sector, u32 blksize, u64 block_id)
1280 {
1281         struct p_block_ack p;
1282
1283         p.sector   = sector;
1284         p.block_id = block_id;
1285         p.blksize  = blksize;
1286         p.seq_num  = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1287
1288         if (!mdev->tconn->meta.socket || mdev->state.conn < C_CONNECTED)
1289                 return -EIO;
1290         return drbd_send_cmd(mdev, &mdev->tconn->meta, cmd, &p.head, sizeof(p));
1291 }
1292
1293 /* dp->sector and dp->block_id already/still in network byte order,
1294  * data_size is payload size according to dp->head,
1295  * and may need to be corrected for digest size. */
1296 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1297                       struct p_data *dp, int data_size)
1298 {
1299         data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1300                 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1301         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1302                        dp->block_id);
1303 }
1304
1305 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1306                       struct p_block_req *rp)
1307 {
1308         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1309 }
1310
1311 /**
1312  * drbd_send_ack() - Sends an ack packet
1313  * @mdev:       DRBD device
1314  * @cmd:        packet command code
1315  * @peer_req:   peer request
1316  */
1317 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1318                   struct drbd_peer_request *peer_req)
1319 {
1320         return _drbd_send_ack(mdev, cmd,
1321                               cpu_to_be64(peer_req->i.sector),
1322                               cpu_to_be32(peer_req->i.size),
1323                               peer_req->block_id);
1324 }
1325
1326 /* This function misuses the block_id field to signal if the blocks
1327  * are is sync or not. */
1328 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1329                      sector_t sector, int blksize, u64 block_id)
1330 {
1331         return _drbd_send_ack(mdev, cmd,
1332                               cpu_to_be64(sector),
1333                               cpu_to_be32(blksize),
1334                               cpu_to_be64(block_id));
1335 }
1336
1337 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1338                        sector_t sector, int size, u64 block_id)
1339 {
1340         struct p_block_req p;
1341
1342         p.sector   = cpu_to_be64(sector);
1343         p.block_id = block_id;
1344         p.blksize  = cpu_to_be32(size);
1345
1346         return drbd_send_cmd(mdev, &mdev->tconn->data, cmd, &p.head, sizeof(p));
1347 }
1348
1349 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1350                             void *digest, int digest_size, enum drbd_packet cmd)
1351 {
1352         int err;
1353         struct p_block_req p;
1354
1355         prepare_header(mdev, &p.head, cmd, sizeof(p) - sizeof(struct p_header) + digest_size);
1356         p.sector   = cpu_to_be64(sector);
1357         p.block_id = ID_SYNCER /* unused */;
1358         p.blksize  = cpu_to_be32(size);
1359
1360         mutex_lock(&mdev->tconn->data.mutex);
1361         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p, sizeof(p), 0);
1362         if (!err)
1363                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, digest, digest_size, 0);
1364         mutex_unlock(&mdev->tconn->data.mutex);
1365         return err;
1366 }
1367
1368 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1369 {
1370         struct p_block_req p;
1371
1372         p.sector   = cpu_to_be64(sector);
1373         p.block_id = ID_SYNCER /* unused */;
1374         p.blksize  = cpu_to_be32(size);
1375
1376         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OV_REQUEST, &p.head, sizeof(p));
1377 }
1378
1379 /* called on sndtimeo
1380  * returns false if we should retry,
1381  * true if we think connection is dead
1382  */
1383 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1384 {
1385         int drop_it;
1386         /* long elapsed = (long)(jiffies - mdev->last_received); */
1387
1388         drop_it =   tconn->meta.socket == sock
1389                 || !tconn->asender.task
1390                 || get_t_state(&tconn->asender) != RUNNING
1391                 || tconn->cstate < C_WF_REPORT_PARAMS;
1392
1393         if (drop_it)
1394                 return true;
1395
1396         drop_it = !--tconn->ko_count;
1397         if (!drop_it) {
1398                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1399                          current->comm, current->pid, tconn->ko_count);
1400                 request_ping(tconn);
1401         }
1402
1403         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1404 }
1405
1406 static void drbd_update_congested(struct drbd_tconn *tconn)
1407 {
1408         struct sock *sk = tconn->data.socket->sk;
1409         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1410                 set_bit(NET_CONGESTED, &tconn->flags);
1411 }
1412
1413 /* The idea of sendpage seems to be to put some kind of reference
1414  * to the page into the skb, and to hand it over to the NIC. In
1415  * this process get_page() gets called.
1416  *
1417  * As soon as the page was really sent over the network put_page()
1418  * gets called by some part of the network layer. [ NIC driver? ]
1419  *
1420  * [ get_page() / put_page() increment/decrement the count. If count
1421  *   reaches 0 the page will be freed. ]
1422  *
1423  * This works nicely with pages from FSs.
1424  * But this means that in protocol A we might signal IO completion too early!
1425  *
1426  * In order not to corrupt data during a resync we must make sure
1427  * that we do not reuse our own buffer pages (EEs) to early, therefore
1428  * we have the net_ee list.
1429  *
1430  * XFS seems to have problems, still, it submits pages with page_count == 0!
1431  * As a workaround, we disable sendpage on pages
1432  * with page_count == 0 or PageSlab.
1433  */
1434 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1435                               int offset, size_t size, unsigned msg_flags)
1436 {
1437         struct socket *socket;
1438         void *addr;
1439         int err;
1440
1441         socket = mdev->tconn->data.socket;
1442         addr = kmap(page) + offset;
1443         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1444         kunmap(page);
1445         if (!err)
1446                 mdev->send_cnt += size >> 9;
1447         return err;
1448 }
1449
1450 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1451                     int offset, size_t size, unsigned msg_flags)
1452 {
1453         struct socket *socket = mdev->tconn->data.socket;
1454         mm_segment_t oldfs = get_fs();
1455         int len = size;
1456         int err = -EIO;
1457
1458         /* e.g. XFS meta- & log-data is in slab pages, which have a
1459          * page_count of 0 and/or have PageSlab() set.
1460          * we cannot use send_page for those, as that does get_page();
1461          * put_page(); and would cause either a VM_BUG directly, or
1462          * __page_cache_release a page that would actually still be referenced
1463          * by someone, leading to some obscure delayed Oops somewhere else. */
1464         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1465                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1466
1467         msg_flags |= MSG_NOSIGNAL;
1468         drbd_update_congested(mdev->tconn);
1469         set_fs(KERNEL_DS);
1470         do {
1471                 int sent;
1472
1473                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1474                 if (sent <= 0) {
1475                         if (sent == -EAGAIN) {
1476                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1477                                         break;
1478                                 continue;
1479                         }
1480                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1481                              __func__, (int)size, len, sent);
1482                         if (sent < 0)
1483                                 err = sent;
1484                         break;
1485                 }
1486                 len    -= sent;
1487                 offset += sent;
1488         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1489         set_fs(oldfs);
1490         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1491
1492         if (len == 0) {
1493                 err = 0;
1494                 mdev->send_cnt += size >> 9;
1495         }
1496         return err;
1497 }
1498
1499 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1500 {
1501         struct bio_vec *bvec;
1502         int i;
1503         /* hint all but last page with MSG_MORE */
1504         __bio_for_each_segment(bvec, bio, i, 0) {
1505                 int err;
1506
1507                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1508                                          bvec->bv_offset, bvec->bv_len,
1509                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1510                 if (err)
1511                         return err;
1512         }
1513         return 0;
1514 }
1515
1516 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1517 {
1518         struct bio_vec *bvec;
1519         int i;
1520         /* hint all but last page with MSG_MORE */
1521         __bio_for_each_segment(bvec, bio, i, 0) {
1522                 int err;
1523
1524                 err = _drbd_send_page(mdev, bvec->bv_page,
1525                                       bvec->bv_offset, bvec->bv_len,
1526                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1527                 if (err)
1528                         return err;
1529         }
1530         return 0;
1531 }
1532
1533 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1534                             struct drbd_peer_request *peer_req)
1535 {
1536         struct page *page = peer_req->pages;
1537         unsigned len = peer_req->i.size;
1538         int err;
1539
1540         /* hint all but last page with MSG_MORE */
1541         page_chain_for_each(page) {
1542                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1543
1544                 err = _drbd_send_page(mdev, page, 0, l,
1545                                       page_chain_next(page) ? MSG_MORE : 0);
1546                 if (err)
1547                         return err;
1548                 len -= l;
1549         }
1550         return 0;
1551 }
1552
1553 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1554 {
1555         if (mdev->tconn->agreed_pro_version >= 95)
1556                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1557                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1558                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1559                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1560         else
1561                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1562 }
1563
1564 /* Used to send write requests
1565  * R_PRIMARY -> Peer    (P_DATA)
1566  */
1567 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1568 {
1569         int err;
1570         struct p_data p;
1571         unsigned int dp_flags = 0;
1572         void *dgb;
1573         int dgs;
1574
1575         err = drbd_get_data_sock(mdev->tconn);
1576         if (err)
1577                 return err;
1578
1579         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1580                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1581
1582         prepare_header(mdev, &p.head, P_DATA, sizeof(p) - sizeof(struct p_header) + dgs + req->i.size);
1583         p.sector   = cpu_to_be64(req->i.sector);
1584         p.block_id = (unsigned long)req;
1585         p.seq_num  = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1586
1587         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1588
1589         if (mdev->state.conn >= C_SYNC_SOURCE &&
1590             mdev->state.conn <= C_PAUSED_SYNC_T)
1591                 dp_flags |= DP_MAY_SET_IN_SYNC;
1592
1593         p.dp_flags = cpu_to_be32(dp_flags);
1594         set_bit(UNPLUG_REMOTE, &mdev->flags);
1595         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1596                             sizeof(p), dgs ? MSG_MORE : 0);
1597         if (!err && dgs) {
1598                 dgb = mdev->tconn->int_dig_out;
1599                 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, dgb);
1600                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb, dgs, 0);
1601         }
1602         if (!err) {
1603                 /* For protocol A, we have to memcpy the payload into
1604                  * socket buffers, as we may complete right away
1605                  * as soon as we handed it over to tcp, at which point the data
1606                  * pages may become invalid.
1607                  *
1608                  * For data-integrity enabled, we copy it as well, so we can be
1609                  * sure that even if the bio pages may still be modified, it
1610                  * won't change the data on the wire, thus if the digest checks
1611                  * out ok after sending on this side, but does not fit on the
1612                  * receiving side, we sure have detected corruption elsewhere.
1613                  */
1614                 if (mdev->tconn->net_conf->wire_protocol == DRBD_PROT_A || dgs)
1615                         err = _drbd_send_bio(mdev, req->master_bio);
1616                 else
1617                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1618
1619                 /* double check digest, sometimes buffers have been modified in flight. */
1620                 if (dgs > 0 && dgs <= 64) {
1621                         /* 64 byte, 512 bit, is the largest digest size
1622                          * currently supported in kernel crypto. */
1623                         unsigned char digest[64];
1624                         drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1625                         if (memcmp(mdev->tconn->int_dig_out, digest, dgs)) {
1626                                 dev_warn(DEV,
1627                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1628                                         (unsigned long long)req->i.sector, req->i.size);
1629                         }
1630                 } /* else if (dgs > 64) {
1631                      ... Be noisy about digest too large ...
1632                 } */
1633         }
1634
1635         drbd_put_data_sock(mdev->tconn);
1636
1637         return err;
1638 }
1639
1640 /* answer packet, used to send data back for read requests:
1641  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1642  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1643  */
1644 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1645                     struct drbd_peer_request *peer_req)
1646 {
1647         int err;
1648         struct p_data p;
1649         void *dgb;
1650         int dgs;
1651
1652         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1653                 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1654
1655         prepare_header(mdev, &p.head, cmd, sizeof(p) -
1656                                            sizeof(struct p_header80) +
1657                                            dgs + peer_req->i.size);
1658         p.sector   = cpu_to_be64(peer_req->i.sector);
1659         p.block_id = peer_req->block_id;
1660         p.seq_num = 0;  /* unused */
1661
1662         /* Only called by our kernel thread.
1663          * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
1664          * in response to admin command or module unload.
1665          */
1666         err = drbd_get_data_sock(mdev->tconn);
1667         if (err)
1668                 return err;
1669         err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, &p,
1670                             sizeof(p), dgs ? MSG_MORE : 0);
1671         if (!err && dgs) {
1672                 dgb = mdev->tconn->int_dig_out;
1673                 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, dgb);
1674                 err = drbd_send_all(mdev->tconn, mdev->tconn->data.socket, dgb,
1675                                     dgs, 0);
1676         }
1677         if (!err)
1678                 err = _drbd_send_zc_ee(mdev, peer_req);
1679         drbd_put_data_sock(mdev->tconn);
1680
1681         return err;
1682 }
1683
1684 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1685 {
1686         struct p_block_desc p;
1687
1688         p.sector  = cpu_to_be64(req->i.sector);
1689         p.blksize = cpu_to_be32(req->i.size);
1690
1691         return drbd_send_cmd(mdev, &mdev->tconn->data, P_OUT_OF_SYNC, &p.head, sizeof(p));
1692 }
1693
1694 /*
1695   drbd_send distinguishes two cases:
1696
1697   Packets sent via the data socket "sock"
1698   and packets sent via the meta data socket "msock"
1699
1700                     sock                      msock
1701   -----------------+-------------------------+------------------------------
1702   timeout           conf.timeout / 2          conf.timeout / 2
1703   timeout action    send a ping via msock     Abort communication
1704                                               and close all sockets
1705 */
1706
1707 /*
1708  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1709  */
1710 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1711               void *buf, size_t size, unsigned msg_flags)
1712 {
1713         struct kvec iov;
1714         struct msghdr msg;
1715         int rv, sent = 0;
1716
1717         if (!sock)
1718                 return -EBADR;
1719
1720         /* THINK  if (signal_pending) return ... ? */
1721
1722         iov.iov_base = buf;
1723         iov.iov_len  = size;
1724
1725         msg.msg_name       = NULL;
1726         msg.msg_namelen    = 0;
1727         msg.msg_control    = NULL;
1728         msg.msg_controllen = 0;
1729         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1730
1731         if (sock == tconn->data.socket) {
1732                 tconn->ko_count = tconn->net_conf->ko_count;
1733                 drbd_update_congested(tconn);
1734         }
1735         do {
1736                 /* STRANGE
1737                  * tcp_sendmsg does _not_ use its size parameter at all ?
1738                  *
1739                  * -EAGAIN on timeout, -EINTR on signal.
1740                  */
1741 /* THINK
1742  * do we need to block DRBD_SIG if sock == &meta.socket ??
1743  * otherwise wake_asender() might interrupt some send_*Ack !
1744  */
1745                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1746                 if (rv == -EAGAIN) {
1747                         if (we_should_drop_the_connection(tconn, sock))
1748                                 break;
1749                         else
1750                                 continue;
1751                 }
1752                 if (rv == -EINTR) {
1753                         flush_signals(current);
1754                         rv = 0;
1755                 }
1756                 if (rv < 0)
1757                         break;
1758                 sent += rv;
1759                 iov.iov_base += rv;
1760                 iov.iov_len  -= rv;
1761         } while (sent < size);
1762
1763         if (sock == tconn->data.socket)
1764                 clear_bit(NET_CONGESTED, &tconn->flags);
1765
1766         if (rv <= 0) {
1767                 if (rv != -EAGAIN) {
1768                         conn_err(tconn, "%s_sendmsg returned %d\n",
1769                                  sock == tconn->meta.socket ? "msock" : "sock",
1770                                  rv);
1771                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1772                 } else
1773                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1774         }
1775
1776         return sent;
1777 }
1778
1779 /**
1780  * drbd_send_all  -  Send an entire buffer
1781  *
1782  * Returns 0 upon success and a negative error value otherwise.
1783  */
1784 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1785                   size_t size, unsigned msg_flags)
1786 {
1787         int err;
1788
1789         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1790         if (err < 0)
1791                 return err;
1792         if (err != size)
1793                 return -EIO;
1794         return 0;
1795 }
1796
1797 static int drbd_open(struct block_device *bdev, fmode_t mode)
1798 {
1799         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1800         unsigned long flags;
1801         int rv = 0;
1802
1803         mutex_lock(&drbd_main_mutex);
1804         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1805         /* to have a stable mdev->state.role
1806          * and no race with updating open_cnt */
1807
1808         if (mdev->state.role != R_PRIMARY) {
1809                 if (mode & FMODE_WRITE)
1810                         rv = -EROFS;
1811                 else if (!allow_oos)
1812                         rv = -EMEDIUMTYPE;
1813         }
1814
1815         if (!rv)
1816                 mdev->open_cnt++;
1817         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1818         mutex_unlock(&drbd_main_mutex);
1819
1820         return rv;
1821 }
1822
1823 static int drbd_release(struct gendisk *gd, fmode_t mode)
1824 {
1825         struct drbd_conf *mdev = gd->private_data;
1826         mutex_lock(&drbd_main_mutex);
1827         mdev->open_cnt--;
1828         mutex_unlock(&drbd_main_mutex);
1829         return 0;
1830 }
1831
1832 static void drbd_set_defaults(struct drbd_conf *mdev)
1833 {
1834         /* Beware! The actual layout differs
1835          * between big endian and little endian */
1836         mdev->state = (union drbd_state) {
1837                 { .role = R_SECONDARY,
1838                   .peer = R_UNKNOWN,
1839                   .conn = C_STANDALONE,
1840                   .disk = D_DISKLESS,
1841                   .pdsk = D_UNKNOWN,
1842                   .susp = 0,
1843                   .susp_nod = 0,
1844                   .susp_fen = 0
1845                 } };
1846 }
1847
1848 void drbd_init_set_defaults(struct drbd_conf *mdev)
1849 {
1850         /* the memset(,0,) did most of this.
1851          * note: only assignments, no allocation in here */
1852
1853         drbd_set_defaults(mdev);
1854
1855         atomic_set(&mdev->ap_bio_cnt, 0);
1856         atomic_set(&mdev->ap_pending_cnt, 0);
1857         atomic_set(&mdev->rs_pending_cnt, 0);
1858         atomic_set(&mdev->unacked_cnt, 0);
1859         atomic_set(&mdev->local_cnt, 0);
1860         atomic_set(&mdev->pp_in_use_by_net, 0);
1861         atomic_set(&mdev->rs_sect_in, 0);
1862         atomic_set(&mdev->rs_sect_ev, 0);
1863         atomic_set(&mdev->ap_in_flight, 0);
1864
1865         mutex_init(&mdev->md_io_mutex);
1866         mutex_init(&mdev->own_state_mutex);
1867         mdev->state_mutex = &mdev->own_state_mutex;
1868
1869         spin_lock_init(&mdev->al_lock);
1870         spin_lock_init(&mdev->peer_seq_lock);
1871         spin_lock_init(&mdev->epoch_lock);
1872
1873         INIT_LIST_HEAD(&mdev->active_ee);
1874         INIT_LIST_HEAD(&mdev->sync_ee);
1875         INIT_LIST_HEAD(&mdev->done_ee);
1876         INIT_LIST_HEAD(&mdev->read_ee);
1877         INIT_LIST_HEAD(&mdev->net_ee);
1878         INIT_LIST_HEAD(&mdev->resync_reads);
1879         INIT_LIST_HEAD(&mdev->resync_work.list);
1880         INIT_LIST_HEAD(&mdev->unplug_work.list);
1881         INIT_LIST_HEAD(&mdev->go_diskless.list);
1882         INIT_LIST_HEAD(&mdev->md_sync_work.list);
1883         INIT_LIST_HEAD(&mdev->start_resync_work.list);
1884         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1885
1886         mdev->resync_work.cb  = w_resync_timer;
1887         mdev->unplug_work.cb  = w_send_write_hint;
1888         mdev->go_diskless.cb  = w_go_diskless;
1889         mdev->md_sync_work.cb = w_md_sync;
1890         mdev->bm_io_work.w.cb = w_bitmap_io;
1891         mdev->start_resync_work.cb = w_start_resync;
1892
1893         mdev->resync_work.mdev  = mdev;
1894         mdev->unplug_work.mdev  = mdev;
1895         mdev->go_diskless.mdev  = mdev;
1896         mdev->md_sync_work.mdev = mdev;
1897         mdev->bm_io_work.w.mdev = mdev;
1898         mdev->start_resync_work.mdev = mdev;
1899
1900         init_timer(&mdev->resync_timer);
1901         init_timer(&mdev->md_sync_timer);
1902         init_timer(&mdev->start_resync_timer);
1903         init_timer(&mdev->request_timer);
1904         mdev->resync_timer.function = resync_timer_fn;
1905         mdev->resync_timer.data = (unsigned long) mdev;
1906         mdev->md_sync_timer.function = md_sync_timer_fn;
1907         mdev->md_sync_timer.data = (unsigned long) mdev;
1908         mdev->start_resync_timer.function = start_resync_timer_fn;
1909         mdev->start_resync_timer.data = (unsigned long) mdev;
1910         mdev->request_timer.function = request_timer_fn;
1911         mdev->request_timer.data = (unsigned long) mdev;
1912
1913         init_waitqueue_head(&mdev->misc_wait);
1914         init_waitqueue_head(&mdev->state_wait);
1915         init_waitqueue_head(&mdev->ee_wait);
1916         init_waitqueue_head(&mdev->al_wait);
1917         init_waitqueue_head(&mdev->seq_wait);
1918
1919         /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
1920         mdev->write_ordering = WO_bdev_flush;
1921         mdev->resync_wenr = LC_FREE;
1922         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1923         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
1924 }
1925
1926 void drbd_mdev_cleanup(struct drbd_conf *mdev)
1927 {
1928         int i;
1929         if (mdev->tconn->receiver.t_state != NONE)
1930                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
1931                                 mdev->tconn->receiver.t_state);
1932
1933         /* no need to lock it, I'm the only thread alive */
1934         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
1935                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
1936         mdev->al_writ_cnt  =
1937         mdev->bm_writ_cnt  =
1938         mdev->read_cnt     =
1939         mdev->recv_cnt     =
1940         mdev->send_cnt     =
1941         mdev->writ_cnt     =
1942         mdev->p_size       =
1943         mdev->rs_start     =
1944         mdev->rs_total     =
1945         mdev->rs_failed    = 0;
1946         mdev->rs_last_events = 0;
1947         mdev->rs_last_sect_ev = 0;
1948         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1949                 mdev->rs_mark_left[i] = 0;
1950                 mdev->rs_mark_time[i] = 0;
1951         }
1952         D_ASSERT(mdev->tconn->net_conf == NULL);
1953
1954         drbd_set_my_capacity(mdev, 0);
1955         if (mdev->bitmap) {
1956                 /* maybe never allocated. */
1957                 drbd_bm_resize(mdev, 0, 1);
1958                 drbd_bm_cleanup(mdev);
1959         }
1960
1961         drbd_free_resources(mdev);
1962         clear_bit(AL_SUSPENDED, &mdev->flags);
1963
1964         /*
1965          * currently we drbd_init_ee only on module load, so
1966          * we may do drbd_release_ee only on module unload!
1967          */
1968         D_ASSERT(list_empty(&mdev->active_ee));
1969         D_ASSERT(list_empty(&mdev->sync_ee));
1970         D_ASSERT(list_empty(&mdev->done_ee));
1971         D_ASSERT(list_empty(&mdev->read_ee));
1972         D_ASSERT(list_empty(&mdev->net_ee));
1973         D_ASSERT(list_empty(&mdev->resync_reads));
1974         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
1975         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
1976         D_ASSERT(list_empty(&mdev->resync_work.list));
1977         D_ASSERT(list_empty(&mdev->unplug_work.list));
1978         D_ASSERT(list_empty(&mdev->go_diskless.list));
1979
1980         drbd_set_defaults(mdev);
1981 }
1982
1983
1984 static void drbd_destroy_mempools(void)
1985 {
1986         struct page *page;
1987
1988         while (drbd_pp_pool) {
1989                 page = drbd_pp_pool;
1990                 drbd_pp_pool = (struct page *)page_private(page);
1991                 __free_page(page);
1992                 drbd_pp_vacant--;
1993         }
1994
1995         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
1996
1997         if (drbd_md_io_bio_set)
1998                 bioset_free(drbd_md_io_bio_set);
1999         if (drbd_md_io_page_pool)
2000                 mempool_destroy(drbd_md_io_page_pool);
2001         if (drbd_ee_mempool)
2002                 mempool_destroy(drbd_ee_mempool);
2003         if (drbd_request_mempool)
2004                 mempool_destroy(drbd_request_mempool);
2005         if (drbd_ee_cache)
2006                 kmem_cache_destroy(drbd_ee_cache);
2007         if (drbd_request_cache)
2008                 kmem_cache_destroy(drbd_request_cache);
2009         if (drbd_bm_ext_cache)
2010                 kmem_cache_destroy(drbd_bm_ext_cache);
2011         if (drbd_al_ext_cache)
2012                 kmem_cache_destroy(drbd_al_ext_cache);
2013
2014         drbd_md_io_bio_set   = NULL;
2015         drbd_md_io_page_pool = NULL;
2016         drbd_ee_mempool      = NULL;
2017         drbd_request_mempool = NULL;
2018         drbd_ee_cache        = NULL;
2019         drbd_request_cache   = NULL;
2020         drbd_bm_ext_cache    = NULL;
2021         drbd_al_ext_cache    = NULL;
2022
2023         return;
2024 }
2025
2026 static int drbd_create_mempools(void)
2027 {
2028         struct page *page;
2029         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2030         int i;
2031
2032         /* prepare our caches and mempools */
2033         drbd_request_mempool = NULL;
2034         drbd_ee_cache        = NULL;
2035         drbd_request_cache   = NULL;
2036         drbd_bm_ext_cache    = NULL;
2037         drbd_al_ext_cache    = NULL;
2038         drbd_pp_pool         = NULL;
2039         drbd_md_io_page_pool = NULL;
2040         drbd_md_io_bio_set   = NULL;
2041
2042         /* caches */
2043         drbd_request_cache = kmem_cache_create(
2044                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2045         if (drbd_request_cache == NULL)
2046                 goto Enomem;
2047
2048         drbd_ee_cache = kmem_cache_create(
2049                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2050         if (drbd_ee_cache == NULL)
2051                 goto Enomem;
2052
2053         drbd_bm_ext_cache = kmem_cache_create(
2054                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2055         if (drbd_bm_ext_cache == NULL)
2056                 goto Enomem;
2057
2058         drbd_al_ext_cache = kmem_cache_create(
2059                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2060         if (drbd_al_ext_cache == NULL)
2061                 goto Enomem;
2062
2063         /* mempools */
2064         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2065         if (drbd_md_io_bio_set == NULL)
2066                 goto Enomem;
2067
2068         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2069         if (drbd_md_io_page_pool == NULL)
2070                 goto Enomem;
2071
2072         drbd_request_mempool = mempool_create(number,
2073                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2074         if (drbd_request_mempool == NULL)
2075                 goto Enomem;
2076
2077         drbd_ee_mempool = mempool_create(number,
2078                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2079         if (drbd_ee_mempool == NULL)
2080                 goto Enomem;
2081
2082         /* drbd's page pool */
2083         spin_lock_init(&drbd_pp_lock);
2084
2085         for (i = 0; i < number; i++) {
2086                 page = alloc_page(GFP_HIGHUSER);
2087                 if (!page)
2088                         goto Enomem;
2089                 set_page_private(page, (unsigned long)drbd_pp_pool);
2090                 drbd_pp_pool = page;
2091         }
2092         drbd_pp_vacant = number;
2093
2094         return 0;
2095
2096 Enomem:
2097         drbd_destroy_mempools(); /* in case we allocated some */
2098         return -ENOMEM;
2099 }
2100
2101 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2102         void *unused)
2103 {
2104         /* just so we have it.  you never know what interesting things we
2105          * might want to do here some day...
2106          */
2107
2108         return NOTIFY_DONE;
2109 }
2110
2111 static struct notifier_block drbd_notifier = {
2112         .notifier_call = drbd_notify_sys,
2113 };
2114
2115 static void drbd_release_ee_lists(struct drbd_conf *mdev)
2116 {
2117         int rr;
2118
2119         rr = drbd_release_ee(mdev, &mdev->active_ee);
2120         if (rr)
2121                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2122
2123         rr = drbd_release_ee(mdev, &mdev->sync_ee);
2124         if (rr)
2125                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2126
2127         rr = drbd_release_ee(mdev, &mdev->read_ee);
2128         if (rr)
2129                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2130
2131         rr = drbd_release_ee(mdev, &mdev->done_ee);
2132         if (rr)
2133                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2134
2135         rr = drbd_release_ee(mdev, &mdev->net_ee);
2136         if (rr)
2137                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2138 }
2139
2140 /* caution. no locking. */
2141 void drbd_delete_device(unsigned int minor)
2142 {
2143         struct drbd_conf *mdev = minor_to_mdev(minor);
2144
2145         if (!mdev)
2146                 return;
2147
2148         idr_remove(&mdev->tconn->volumes, mdev->vnr);
2149         idr_remove(&minors, minor);
2150         synchronize_rcu();
2151
2152         /* paranoia asserts */
2153         D_ASSERT(mdev->open_cnt == 0);
2154         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2155         /* end paranoia asserts */
2156
2157         del_gendisk(mdev->vdisk);
2158
2159         /* cleanup stuff that may have been allocated during
2160          * device (re-)configuration or state changes */
2161
2162         if (mdev->this_bdev)
2163                 bdput(mdev->this_bdev);
2164
2165         drbd_free_resources(mdev);
2166
2167         drbd_release_ee_lists(mdev);
2168
2169         lc_destroy(mdev->act_log);
2170         lc_destroy(mdev->resync);
2171
2172         kfree(mdev->p_uuid);
2173         /* mdev->p_uuid = NULL; */
2174
2175         /* cleanup the rest that has been
2176          * allocated from drbd_new_device
2177          * and actually free the mdev itself */
2178         drbd_free_mdev(mdev);
2179 }
2180
2181 static void drbd_cleanup(void)
2182 {
2183         unsigned int i;
2184         struct drbd_conf *mdev;
2185
2186         unregister_reboot_notifier(&drbd_notifier);
2187
2188         /* first remove proc,
2189          * drbdsetup uses it's presence to detect
2190          * whether DRBD is loaded.
2191          * If we would get stuck in proc removal,
2192          * but have netlink already deregistered,
2193          * some drbdsetup commands may wait forever
2194          * for an answer.
2195          */
2196         if (drbd_proc)
2197                 remove_proc_entry("drbd", NULL);
2198
2199         drbd_genl_unregister();
2200
2201         idr_for_each_entry(&minors, mdev, i)
2202                 drbd_delete_device(i);
2203         drbd_destroy_mempools();
2204         unregister_blkdev(DRBD_MAJOR, "drbd");
2205
2206         idr_destroy(&minors);
2207
2208         printk(KERN_INFO "drbd: module cleanup done.\n");
2209 }
2210
2211 /**
2212  * drbd_congested() - Callback for pdflush
2213  * @congested_data:     User data
2214  * @bdi_bits:           Bits pdflush is currently interested in
2215  *
2216  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2217  */
2218 static int drbd_congested(void *congested_data, int bdi_bits)
2219 {
2220         struct drbd_conf *mdev = congested_data;
2221         struct request_queue *q;
2222         char reason = '-';
2223         int r = 0;
2224
2225         if (!may_inc_ap_bio(mdev)) {
2226                 /* DRBD has frozen IO */
2227                 r = bdi_bits;
2228                 reason = 'd';
2229                 goto out;
2230         }
2231
2232         if (get_ldev(mdev)) {
2233                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2234                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2235                 put_ldev(mdev);
2236                 if (r)
2237                         reason = 'b';
2238         }
2239
2240         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2241                 r |= (1 << BDI_async_congested);
2242                 reason = reason == 'b' ? 'a' : 'n';
2243         }
2244
2245 out:
2246         mdev->congestion_reason = reason;
2247         return r;
2248 }
2249
2250 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2251 {
2252         sema_init(&wq->s, 0);
2253         spin_lock_init(&wq->q_lock);
2254         INIT_LIST_HEAD(&wq->q);
2255 }
2256
2257 struct drbd_tconn *conn_by_name(const char *name)
2258 {
2259         struct drbd_tconn *tconn;
2260
2261         if (!name || !name[0])
2262                 return NULL;
2263
2264         mutex_lock(&drbd_cfg_mutex);
2265         list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2266                 if (!strcmp(tconn->name, name))
2267                         goto found;
2268         }
2269         tconn = NULL;
2270 found:
2271         mutex_unlock(&drbd_cfg_mutex);
2272         return tconn;
2273 }
2274
2275 static int drbd_alloc_socket(struct drbd_socket *socket)
2276 {
2277         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2278         if (!socket->rbuf)
2279                 return -ENOMEM;
2280         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2281         if (!socket->sbuf)
2282                 return -ENOMEM;
2283         return 0;
2284 }
2285
2286 static void drbd_free_socket(struct drbd_socket *socket)
2287 {
2288         free_page((unsigned long) socket->sbuf);
2289         free_page((unsigned long) socket->rbuf);
2290 }
2291
2292 struct drbd_tconn *drbd_new_tconn(const char *name)
2293 {
2294         struct drbd_tconn *tconn;
2295
2296         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2297         if (!tconn)
2298                 return NULL;
2299
2300         tconn->name = kstrdup(name, GFP_KERNEL);
2301         if (!tconn->name)
2302                 goto fail;
2303
2304         if (drbd_alloc_socket(&tconn->data))
2305                 goto fail;
2306         if (drbd_alloc_socket(&tconn->meta))
2307                 goto fail;
2308
2309         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2310                 goto fail;
2311
2312         if (!tl_init(tconn))
2313                 goto fail;
2314
2315         tconn->cstate = C_STANDALONE;
2316         mutex_init(&tconn->cstate_mutex);
2317         spin_lock_init(&tconn->req_lock);
2318         atomic_set(&tconn->net_cnt, 0);
2319         init_waitqueue_head(&tconn->net_cnt_wait);
2320         init_waitqueue_head(&tconn->ping_wait);
2321         idr_init(&tconn->volumes);
2322
2323         drbd_init_workqueue(&tconn->data.work);
2324         mutex_init(&tconn->data.mutex);
2325
2326         drbd_init_workqueue(&tconn->meta.work);
2327         mutex_init(&tconn->meta.mutex);
2328
2329         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2330         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2331         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2332
2333         tconn->res_opts = (struct res_opts) {
2334                 {}, 0, /* cpu_mask */
2335                 DRBD_ON_NO_DATA_DEF, /* on_no_data */
2336         };
2337
2338         mutex_lock(&drbd_cfg_mutex);
2339         list_add_tail(&tconn->all_tconn, &drbd_tconns);
2340         mutex_unlock(&drbd_cfg_mutex);
2341
2342         return tconn;
2343
2344 fail:
2345         tl_cleanup(tconn);
2346         free_cpumask_var(tconn->cpu_mask);
2347         drbd_free_socket(&tconn->meta);
2348         drbd_free_socket(&tconn->data);
2349         kfree(tconn->name);
2350         kfree(tconn);
2351
2352         return NULL;
2353 }
2354
2355 void drbd_free_tconn(struct drbd_tconn *tconn)
2356 {
2357         list_del(&tconn->all_tconn);
2358         idr_destroy(&tconn->volumes);
2359
2360         free_cpumask_var(tconn->cpu_mask);
2361         drbd_free_socket(&tconn->meta);
2362         drbd_free_socket(&tconn->data);
2363         kfree(tconn->name);
2364         kfree(tconn->int_dig_out);
2365         kfree(tconn->int_dig_in);
2366         kfree(tconn->int_dig_vv);
2367         kfree(tconn);
2368 }
2369
2370 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2371 {
2372         struct drbd_conf *mdev;
2373         struct gendisk *disk;
2374         struct request_queue *q;
2375         int vnr_got = vnr;
2376         int minor_got = minor;
2377         enum drbd_ret_code err = ERR_NOMEM;
2378
2379         mdev = minor_to_mdev(minor);
2380         if (mdev)
2381                 return ERR_MINOR_EXISTS;
2382
2383         /* GFP_KERNEL, we are outside of all write-out paths */
2384         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2385         if (!mdev)
2386                 return ERR_NOMEM;
2387
2388         mdev->tconn = tconn;
2389         mdev->minor = minor;
2390         mdev->vnr = vnr;
2391
2392         drbd_init_set_defaults(mdev);
2393
2394         q = blk_alloc_queue(GFP_KERNEL);
2395         if (!q)
2396                 goto out_no_q;
2397         mdev->rq_queue = q;
2398         q->queuedata   = mdev;
2399
2400         disk = alloc_disk(1);
2401         if (!disk)
2402                 goto out_no_disk;
2403         mdev->vdisk = disk;
2404
2405         set_disk_ro(disk, true);
2406
2407         disk->queue = q;
2408         disk->major = DRBD_MAJOR;
2409         disk->first_minor = minor;
2410         disk->fops = &drbd_ops;
2411         sprintf(disk->disk_name, "drbd%d", minor);
2412         disk->private_data = mdev;
2413
2414         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2415         /* we have no partitions. we contain only ourselves. */
2416         mdev->this_bdev->bd_contains = mdev->this_bdev;
2417
2418         q->backing_dev_info.congested_fn = drbd_congested;
2419         q->backing_dev_info.congested_data = mdev;
2420
2421         blk_queue_make_request(q, drbd_make_request);
2422         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2423            This triggers a max_bio_size message upon first attach or connect */
2424         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2425         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2426         blk_queue_merge_bvec(q, drbd_merge_bvec);
2427         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2428
2429         mdev->md_io_page = alloc_page(GFP_KERNEL);
2430         if (!mdev->md_io_page)
2431                 goto out_no_io_page;
2432
2433         if (drbd_bm_init(mdev))
2434                 goto out_no_bitmap;
2435         mdev->read_requests = RB_ROOT;
2436         mdev->write_requests = RB_ROOT;
2437
2438         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2439         if (!mdev->current_epoch)
2440                 goto out_no_epoch;
2441
2442         INIT_LIST_HEAD(&mdev->current_epoch->list);
2443         mdev->epochs = 1;
2444
2445         if (!idr_pre_get(&minors, GFP_KERNEL))
2446                 goto out_no_minor_idr;
2447         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2448                 goto out_no_minor_idr;
2449         if (minor_got != minor) {
2450                 err = ERR_MINOR_EXISTS;
2451                 drbd_msg_put_info("requested minor exists already");
2452                 goto out_idr_remove_minor;
2453         }
2454
2455         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2456                 goto out_idr_remove_minor;
2457         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2458                 goto out_idr_remove_minor;
2459         if (vnr_got != vnr) {
2460                 err = ERR_INVALID_REQUEST;
2461                 drbd_msg_put_info("requested volume exists already");
2462                 goto out_idr_remove_vol;
2463         }
2464         add_disk(disk);
2465
2466         /* inherit the connection state */
2467         mdev->state.conn = tconn->cstate;
2468         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2469                 drbd_connected(vnr, mdev, tconn);
2470
2471         return NO_ERROR;
2472
2473 out_idr_remove_vol:
2474         idr_remove(&tconn->volumes, vnr_got);
2475 out_idr_remove_minor:
2476         idr_remove(&minors, minor_got);
2477         synchronize_rcu();
2478 out_no_minor_idr:
2479         kfree(mdev->current_epoch);
2480 out_no_epoch:
2481         drbd_bm_cleanup(mdev);
2482 out_no_bitmap:
2483         __free_page(mdev->md_io_page);
2484 out_no_io_page:
2485         put_disk(disk);
2486 out_no_disk:
2487         blk_cleanup_queue(q);
2488 out_no_q:
2489         kfree(mdev);
2490         return err;
2491 }
2492
2493 /* counterpart of drbd_new_device.
2494  * last part of drbd_delete_device. */
2495 void drbd_free_mdev(struct drbd_conf *mdev)
2496 {
2497         kfree(mdev->current_epoch);
2498         if (mdev->bitmap) /* should no longer be there. */
2499                 drbd_bm_cleanup(mdev);
2500         __free_page(mdev->md_io_page);
2501         put_disk(mdev->vdisk);
2502         blk_cleanup_queue(mdev->rq_queue);
2503         kfree(mdev);
2504 }
2505
2506
2507 int __init drbd_init(void)
2508 {
2509         int err;
2510
2511         BUILD_BUG_ON(sizeof(struct p_header80) != sizeof(struct p_header95));
2512         BUILD_BUG_ON(sizeof(struct p_handshake) != 80);
2513
2514         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2515                 printk(KERN_ERR
2516                        "drbd: invalid minor_count (%d)\n", minor_count);
2517 #ifdef MODULE
2518                 return -EINVAL;
2519 #else
2520                 minor_count = 8;
2521 #endif
2522         }
2523
2524         err = register_blkdev(DRBD_MAJOR, "drbd");
2525         if (err) {
2526                 printk(KERN_ERR
2527                        "drbd: unable to register block device major %d\n",
2528                        DRBD_MAJOR);
2529                 return err;
2530         }
2531
2532         err = drbd_genl_register();
2533         if (err) {
2534                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2535                 goto fail;
2536         }
2537
2538
2539         register_reboot_notifier(&drbd_notifier);
2540
2541         /*
2542          * allocate all necessary structs
2543          */
2544         err = -ENOMEM;
2545
2546         init_waitqueue_head(&drbd_pp_wait);
2547
2548         drbd_proc = NULL; /* play safe for drbd_cleanup */
2549         idr_init(&minors);
2550
2551         err = drbd_create_mempools();
2552         if (err)
2553                 goto fail;
2554
2555         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2556         if (!drbd_proc) {
2557                 printk(KERN_ERR "drbd: unable to register proc file\n");
2558                 goto fail;
2559         }
2560
2561         rwlock_init(&global_state_lock);
2562         INIT_LIST_HEAD(&drbd_tconns);
2563
2564         printk(KERN_INFO "drbd: initialized. "
2565                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2566                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2567         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2568         printk(KERN_INFO "drbd: registered as block device major %d\n",
2569                 DRBD_MAJOR);
2570
2571         return 0; /* Success! */
2572
2573 fail:
2574         drbd_cleanup();
2575         if (err == -ENOMEM)
2576                 /* currently always the case */
2577                 printk(KERN_ERR "drbd: ran out of memory\n");
2578         else
2579                 printk(KERN_ERR "drbd: initialization failure\n");
2580         return err;
2581 }
2582
2583 void drbd_free_bc(struct drbd_backing_dev *ldev)
2584 {
2585         if (ldev == NULL)
2586                 return;
2587
2588         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2589         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2590
2591         kfree(ldev);
2592 }
2593
2594 void drbd_free_sock(struct drbd_tconn *tconn)
2595 {
2596         if (tconn->data.socket) {
2597                 mutex_lock(&tconn->data.mutex);
2598                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2599                 sock_release(tconn->data.socket);
2600                 tconn->data.socket = NULL;
2601                 mutex_unlock(&tconn->data.mutex);
2602         }
2603         if (tconn->meta.socket) {
2604                 mutex_lock(&tconn->meta.mutex);
2605                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2606                 sock_release(tconn->meta.socket);
2607                 tconn->meta.socket = NULL;
2608                 mutex_unlock(&tconn->meta.mutex);
2609         }
2610 }
2611
2612
2613 void drbd_free_resources(struct drbd_conf *mdev)
2614 {
2615         crypto_free_hash(mdev->tconn->csums_tfm);
2616         mdev->tconn->csums_tfm = NULL;
2617         crypto_free_hash(mdev->tconn->verify_tfm);
2618         mdev->tconn->verify_tfm = NULL;
2619         crypto_free_hash(mdev->tconn->cram_hmac_tfm);
2620         mdev->tconn->cram_hmac_tfm = NULL;
2621         crypto_free_hash(mdev->tconn->integrity_w_tfm);
2622         mdev->tconn->integrity_w_tfm = NULL;
2623         crypto_free_hash(mdev->tconn->integrity_r_tfm);
2624         mdev->tconn->integrity_r_tfm = NULL;
2625
2626         drbd_free_sock(mdev->tconn);
2627
2628         __no_warn(local,
2629                   drbd_free_bc(mdev->ldev);
2630                   mdev->ldev = NULL;);
2631 }
2632
2633 /* meta data management */
2634
2635 struct meta_data_on_disk {
2636         u64 la_size;           /* last agreed size. */
2637         u64 uuid[UI_SIZE];   /* UUIDs. */
2638         u64 device_uuid;
2639         u64 reserved_u64_1;
2640         u32 flags;             /* MDF */
2641         u32 magic;
2642         u32 md_size_sect;
2643         u32 al_offset;         /* offset to this block */
2644         u32 al_nr_extents;     /* important for restoring the AL */
2645               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2646         u32 bm_offset;         /* offset to the bitmap, from here */
2647         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2648         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2649         u32 reserved_u32[3];
2650
2651 } __packed;
2652
2653 /**
2654  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2655  * @mdev:       DRBD device.
2656  */
2657 void drbd_md_sync(struct drbd_conf *mdev)
2658 {
2659         struct meta_data_on_disk *buffer;
2660         sector_t sector;
2661         int i;
2662
2663         del_timer(&mdev->md_sync_timer);
2664         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2665         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2666                 return;
2667
2668         /* We use here D_FAILED and not D_ATTACHING because we try to write
2669          * metadata even if we detach due to a disk failure! */
2670         if (!get_ldev_if_state(mdev, D_FAILED))
2671                 return;
2672
2673         mutex_lock(&mdev->md_io_mutex);
2674         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2675         memset(buffer, 0, 512);
2676
2677         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2678         for (i = UI_CURRENT; i < UI_SIZE; i++)
2679                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2680         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2681         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2682
2683         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2684         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2685         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2686         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2687         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2688
2689         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2690         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2691
2692         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2693         sector = mdev->ldev->md.md_offset;
2694
2695         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2696                 /* this was a try anyways ... */
2697                 dev_err(DEV, "meta data update failed!\n");
2698                 drbd_chk_io_error(mdev, 1, true);
2699         }
2700
2701         /* Update mdev->ldev->md.la_size_sect,
2702          * since we updated it on metadata. */
2703         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2704
2705         mutex_unlock(&mdev->md_io_mutex);
2706         put_ldev(mdev);
2707 }
2708
2709 /**
2710  * drbd_md_read() - Reads in the meta data super block
2711  * @mdev:       DRBD device.
2712  * @bdev:       Device from which the meta data should be read in.
2713  *
2714  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2715  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2716  */
2717 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2718 {
2719         struct meta_data_on_disk *buffer;
2720         int i, rv = NO_ERROR;
2721
2722         if (!get_ldev_if_state(mdev, D_ATTACHING))
2723                 return ERR_IO_MD_DISK;
2724
2725         mutex_lock(&mdev->md_io_mutex);
2726         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2727
2728         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2729                 /* NOTE: can't do normal error processing here as this is
2730                    called BEFORE disk is attached */
2731                 dev_err(DEV, "Error while reading metadata.\n");
2732                 rv = ERR_IO_MD_DISK;
2733                 goto err;
2734         }
2735
2736         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2737                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2738                 rv = ERR_MD_INVALID;
2739                 goto err;
2740         }
2741         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2742                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2743                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2744                 rv = ERR_MD_INVALID;
2745                 goto err;
2746         }
2747         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2748                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2749                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2750                 rv = ERR_MD_INVALID;
2751                 goto err;
2752         }
2753         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2754                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2755                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2756                 rv = ERR_MD_INVALID;
2757                 goto err;
2758         }
2759
2760         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2761                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2762                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2763                 rv = ERR_MD_INVALID;
2764                 goto err;
2765         }
2766
2767         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2768         for (i = UI_CURRENT; i < UI_SIZE; i++)
2769                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2770         bdev->md.flags = be32_to_cpu(buffer->flags);
2771         bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2772         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2773
2774         spin_lock_irq(&mdev->tconn->req_lock);
2775         if (mdev->state.conn < C_CONNECTED) {
2776                 int peer;
2777                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2778                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2779                 mdev->peer_max_bio_size = peer;
2780         }
2781         spin_unlock_irq(&mdev->tconn->req_lock);
2782
2783         if (bdev->dc.al_extents < 7)
2784                 bdev->dc.al_extents = 127;
2785
2786  err:
2787         mutex_unlock(&mdev->md_io_mutex);
2788         put_ldev(mdev);
2789
2790         return rv;
2791 }
2792
2793 /**
2794  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2795  * @mdev:       DRBD device.
2796  *
2797  * Call this function if you change anything that should be written to
2798  * the meta-data super block. This function sets MD_DIRTY, and starts a
2799  * timer that ensures that within five seconds you have to call drbd_md_sync().
2800  */
2801 #ifdef DEBUG
2802 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2803 {
2804         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2805                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2806                 mdev->last_md_mark_dirty.line = line;
2807                 mdev->last_md_mark_dirty.func = func;
2808         }
2809 }
2810 #else
2811 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2812 {
2813         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2814                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2815 }
2816 #endif
2817
2818 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2819 {
2820         int i;
2821
2822         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2823                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2824 }
2825
2826 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2827 {
2828         if (idx == UI_CURRENT) {
2829                 if (mdev->state.role == R_PRIMARY)
2830                         val |= 1;
2831                 else
2832                         val &= ~((u64)1);
2833
2834                 drbd_set_ed_uuid(mdev, val);
2835         }
2836
2837         mdev->ldev->md.uuid[idx] = val;
2838         drbd_md_mark_dirty(mdev);
2839 }
2840
2841
2842 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2843 {
2844         if (mdev->ldev->md.uuid[idx]) {
2845                 drbd_uuid_move_history(mdev);
2846                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2847         }
2848         _drbd_uuid_set(mdev, idx, val);
2849 }
2850
2851 /**
2852  * drbd_uuid_new_current() - Creates a new current UUID
2853  * @mdev:       DRBD device.
2854  *
2855  * Creates a new current UUID, and rotates the old current UUID into
2856  * the bitmap slot. Causes an incremental resync upon next connect.
2857  */
2858 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2859 {
2860         u64 val;
2861         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2862
2863         if (bm_uuid)
2864                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2865
2866         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2867
2868         get_random_bytes(&val, sizeof(u64));
2869         _drbd_uuid_set(mdev, UI_CURRENT, val);
2870         drbd_print_uuids(mdev, "new current UUID");
2871         /* get it to stable storage _now_ */
2872         drbd_md_sync(mdev);
2873 }
2874
2875 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2876 {
2877         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2878                 return;
2879
2880         if (val == 0) {
2881                 drbd_uuid_move_history(mdev);
2882                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2883                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2884         } else {
2885                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2886                 if (bm_uuid)
2887                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2888
2889                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2890         }
2891         drbd_md_mark_dirty(mdev);
2892 }
2893
2894 /**
2895  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2896  * @mdev:       DRBD device.
2897  *
2898  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2899  */
2900 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2901 {
2902         int rv = -EIO;
2903
2904         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2905                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
2906                 drbd_md_sync(mdev);
2907                 drbd_bm_set_all(mdev);
2908
2909                 rv = drbd_bm_write(mdev);
2910
2911                 if (!rv) {
2912                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2913                         drbd_md_sync(mdev);
2914                 }
2915
2916                 put_ldev(mdev);
2917         }
2918
2919         return rv;
2920 }
2921
2922 /**
2923  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2924  * @mdev:       DRBD device.
2925  *
2926  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
2927  */
2928 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
2929 {
2930         int rv = -EIO;
2931
2932         drbd_resume_al(mdev);
2933         if (get_ldev_if_state(mdev, D_ATTACHING)) {
2934                 drbd_bm_clear_all(mdev);
2935                 rv = drbd_bm_write(mdev);
2936                 put_ldev(mdev);
2937         }
2938
2939         return rv;
2940 }
2941
2942 static int w_bitmap_io(struct drbd_work *w, int unused)
2943 {
2944         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
2945         struct drbd_conf *mdev = w->mdev;
2946         int rv = -EIO;
2947
2948         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
2949
2950         if (get_ldev(mdev)) {
2951                 drbd_bm_lock(mdev, work->why, work->flags);
2952                 rv = work->io_fn(mdev);
2953                 drbd_bm_unlock(mdev);
2954                 put_ldev(mdev);
2955         }
2956
2957         clear_bit_unlock(BITMAP_IO, &mdev->flags);
2958         wake_up(&mdev->misc_wait);
2959
2960         if (work->done)
2961                 work->done(mdev, rv);
2962
2963         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
2964         work->why = NULL;
2965         work->flags = 0;
2966
2967         return 0;
2968 }
2969
2970 void drbd_ldev_destroy(struct drbd_conf *mdev)
2971 {
2972         lc_destroy(mdev->resync);
2973         mdev->resync = NULL;
2974         lc_destroy(mdev->act_log);
2975         mdev->act_log = NULL;
2976         __no_warn(local,
2977                 drbd_free_bc(mdev->ldev);
2978                 mdev->ldev = NULL;);
2979
2980         clear_bit(GO_DISKLESS, &mdev->flags);
2981 }
2982
2983 static int w_go_diskless(struct drbd_work *w, int unused)
2984 {
2985         struct drbd_conf *mdev = w->mdev;
2986
2987         D_ASSERT(mdev->state.disk == D_FAILED);
2988         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
2989          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
2990          * the protected members anymore, though, so once put_ldev reaches zero
2991          * again, it will be safe to free them. */
2992         drbd_force_state(mdev, NS(disk, D_DISKLESS));
2993         return 0;
2994 }
2995
2996 void drbd_go_diskless(struct drbd_conf *mdev)
2997 {
2998         D_ASSERT(mdev->state.disk == D_FAILED);
2999         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3000                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3001 }
3002
3003 /**
3004  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3005  * @mdev:       DRBD device.
3006  * @io_fn:      IO callback to be called when bitmap IO is possible
3007  * @done:       callback to be called after the bitmap IO was performed
3008  * @why:        Descriptive text of the reason for doing the IO
3009  *
3010  * While IO on the bitmap happens we freeze application IO thus we ensure
3011  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3012  * called from worker context. It MUST NOT be used while a previous such
3013  * work is still pending!
3014  */
3015 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3016                           int (*io_fn)(struct drbd_conf *),
3017                           void (*done)(struct drbd_conf *, int),
3018                           char *why, enum bm_flag flags)
3019 {
3020         D_ASSERT(current == mdev->tconn->worker.task);
3021
3022         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3023         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3024         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3025         if (mdev->bm_io_work.why)
3026                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3027                         why, mdev->bm_io_work.why);
3028
3029         mdev->bm_io_work.io_fn = io_fn;
3030         mdev->bm_io_work.done = done;
3031         mdev->bm_io_work.why = why;
3032         mdev->bm_io_work.flags = flags;
3033
3034         spin_lock_irq(&mdev->tconn->req_lock);
3035         set_bit(BITMAP_IO, &mdev->flags);
3036         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3037                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3038                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3039         }
3040         spin_unlock_irq(&mdev->tconn->req_lock);
3041 }
3042
3043 /**
3044  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3045  * @mdev:       DRBD device.
3046  * @io_fn:      IO callback to be called when bitmap IO is possible
3047  * @why:        Descriptive text of the reason for doing the IO
3048  *
3049  * freezes application IO while that the actual IO operations runs. This
3050  * functions MAY NOT be called from worker context.
3051  */
3052 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3053                 char *why, enum bm_flag flags)
3054 {
3055         int rv;
3056
3057         D_ASSERT(current != mdev->tconn->worker.task);
3058
3059         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3060                 drbd_suspend_io(mdev);
3061
3062         drbd_bm_lock(mdev, why, flags);
3063         rv = io_fn(mdev);
3064         drbd_bm_unlock(mdev);
3065
3066         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3067                 drbd_resume_io(mdev);
3068
3069         return rv;
3070 }
3071
3072 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3073 {
3074         if ((mdev->ldev->md.flags & flag) != flag) {
3075                 drbd_md_mark_dirty(mdev);
3076                 mdev->ldev->md.flags |= flag;
3077         }
3078 }
3079
3080 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3081 {
3082         if ((mdev->ldev->md.flags & flag) != 0) {
3083                 drbd_md_mark_dirty(mdev);
3084                 mdev->ldev->md.flags &= ~flag;
3085         }
3086 }
3087 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3088 {
3089         return (bdev->md.flags & flag) != 0;
3090 }
3091
3092 static void md_sync_timer_fn(unsigned long data)
3093 {
3094         struct drbd_conf *mdev = (struct drbd_conf *) data;
3095
3096         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3097 }
3098
3099 static int w_md_sync(struct drbd_work *w, int unused)
3100 {
3101         struct drbd_conf *mdev = w->mdev;
3102
3103         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3104 #ifdef DEBUG
3105         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3106                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3107 #endif
3108         drbd_md_sync(mdev);
3109         return 0;
3110 }
3111
3112 const char *cmdname(enum drbd_packet cmd)
3113 {
3114         /* THINK may need to become several global tables
3115          * when we want to support more than
3116          * one PRO_VERSION */
3117         static const char *cmdnames[] = {
3118                 [P_DATA]                = "Data",
3119                 [P_DATA_REPLY]          = "DataReply",
3120                 [P_RS_DATA_REPLY]       = "RSDataReply",
3121                 [P_BARRIER]             = "Barrier",
3122                 [P_BITMAP]              = "ReportBitMap",
3123                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3124                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3125                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3126                 [P_DATA_REQUEST]        = "DataRequest",
3127                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3128                 [P_SYNC_PARAM]          = "SyncParam",
3129                 [P_SYNC_PARAM89]        = "SyncParam89",
3130                 [P_PROTOCOL]            = "ReportProtocol",
3131                 [P_UUIDS]               = "ReportUUIDs",
3132                 [P_SIZES]               = "ReportSizes",
3133                 [P_STATE]               = "ReportState",
3134                 [P_SYNC_UUID]           = "ReportSyncUUID",
3135                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3136                 [P_AUTH_RESPONSE]       = "AuthResponse",
3137                 [P_PING]                = "Ping",
3138                 [P_PING_ACK]            = "PingAck",
3139                 [P_RECV_ACK]            = "RecvAck",
3140                 [P_WRITE_ACK]           = "WriteAck",
3141                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3142                 [P_DISCARD_WRITE]        = "DiscardWrite",
3143                 [P_NEG_ACK]             = "NegAck",
3144                 [P_NEG_DREPLY]          = "NegDReply",
3145                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3146                 [P_BARRIER_ACK]         = "BarrierAck",
3147                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3148                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3149                 [P_OV_REQUEST]          = "OVRequest",
3150                 [P_OV_REPLY]            = "OVReply",
3151                 [P_OV_RESULT]           = "OVResult",
3152                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3153                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3154                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3155                 [P_DELAY_PROBE]         = "DelayProbe",
3156                 [P_OUT_OF_SYNC]         = "OutOfSync",
3157                 [P_RETRY_WRITE]         = "RetryWrite",
3158         };
3159
3160         if (cmd == P_HAND_SHAKE_M)
3161                 return "HandShakeM";
3162         if (cmd == P_HAND_SHAKE_S)
3163                 return "HandShakeS";
3164         if (cmd == P_HAND_SHAKE)
3165                 return "HandShake";
3166         if (cmd >= ARRAY_SIZE(cmdnames))
3167                 return "Unknown";
3168         return cmdnames[cmd];
3169 }
3170
3171 /**
3172  * drbd_wait_misc  -  wait for a request to make progress
3173  * @mdev:       device associated with the request
3174  * @i:          the struct drbd_interval embedded in struct drbd_request or
3175  *              struct drbd_peer_request
3176  */
3177 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3178 {
3179         struct net_conf *net_conf = mdev->tconn->net_conf;
3180         DEFINE_WAIT(wait);
3181         long timeout;
3182
3183         if (!net_conf)
3184                 return -ETIMEDOUT;
3185         timeout = MAX_SCHEDULE_TIMEOUT;
3186         if (net_conf->ko_count)
3187                 timeout = net_conf->timeout * HZ / 10 * net_conf->ko_count;
3188
3189         /* Indicate to wake up mdev->misc_wait on progress.  */
3190         i->waiting = true;
3191         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3192         spin_unlock_irq(&mdev->tconn->req_lock);
3193         timeout = schedule_timeout(timeout);
3194         finish_wait(&mdev->misc_wait, &wait);
3195         spin_lock_irq(&mdev->tconn->req_lock);
3196         if (!timeout || mdev->state.conn < C_CONNECTED)
3197                 return -ETIMEDOUT;
3198         if (signal_pending(current))
3199                 return -ERESTARTSYS;
3200         return 0;
3201 }
3202
3203 #ifdef CONFIG_DRBD_FAULT_INJECTION
3204 /* Fault insertion support including random number generator shamelessly
3205  * stolen from kernel/rcutorture.c */
3206 struct fault_random_state {
3207         unsigned long state;
3208         unsigned long count;
3209 };
3210
3211 #define FAULT_RANDOM_MULT 39916801  /* prime */
3212 #define FAULT_RANDOM_ADD        479001701 /* prime */
3213 #define FAULT_RANDOM_REFRESH 10000
3214
3215 /*
3216  * Crude but fast random-number generator.  Uses a linear congruential
3217  * generator, with occasional help from get_random_bytes().
3218  */
3219 static unsigned long
3220 _drbd_fault_random(struct fault_random_state *rsp)
3221 {
3222         long refresh;
3223
3224         if (!rsp->count--) {
3225                 get_random_bytes(&refresh, sizeof(refresh));
3226                 rsp->state += refresh;
3227                 rsp->count = FAULT_RANDOM_REFRESH;
3228         }
3229         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3230         return swahw32(rsp->state);
3231 }
3232
3233 static char *
3234 _drbd_fault_str(unsigned int type) {
3235         static char *_faults[] = {
3236                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3237                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3238                 [DRBD_FAULT_RS_WR] = "Resync write",
3239                 [DRBD_FAULT_RS_RD] = "Resync read",
3240                 [DRBD_FAULT_DT_WR] = "Data write",
3241                 [DRBD_FAULT_DT_RD] = "Data read",
3242                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3243                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3244                 [DRBD_FAULT_AL_EE] = "EE allocation",
3245                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3246         };
3247
3248         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3249 }
3250
3251 unsigned int
3252 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3253 {
3254         static struct fault_random_state rrs = {0, 0};
3255
3256         unsigned int ret = (
3257                 (fault_devs == 0 ||
3258                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3259                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3260
3261         if (ret) {
3262                 fault_count++;
3263
3264                 if (__ratelimit(&drbd_ratelimit_state))
3265                         dev_warn(DEV, "***Simulating %s failure\n",
3266                                 _drbd_fault_str(type));
3267         }
3268
3269         return ret;
3270 }
3271 #endif
3272
3273 const char *drbd_buildtag(void)
3274 {
3275         /* DRBD built from external sources has here a reference to the
3276            git hash of the source code. */
3277
3278         static char buildtag[38] = "\0uilt-in";
3279
3280         if (buildtag[0] == 0) {
3281 #ifdef CONFIG_MODULES
3282                 if (THIS_MODULE != NULL)
3283                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3284                 else
3285 #endif
3286                         buildtag[0] = 'b';
3287         }
3288
3289         return buildtag;
3290 }
3291
3292 module_init(drbd_init)
3293 module_exit(drbd_cleanup)
3294
3295 EXPORT_SYMBOL(drbd_conn_str);
3296 EXPORT_SYMBOL(drbd_role_str);
3297 EXPORT_SYMBOL(drbd_disk_str);
3298 EXPORT_SYMBOL(drbd_set_st_err_str);