drbd: Send PROTOCOL_UPDATE packets when appropriate
[firefly-linux-kernel-4.4.55.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218
219         return 1;
220 }
221
222 static void tl_cleanup(struct drbd_tconn *tconn)
223 {
224         if (tconn->oldest_tle != tconn->newest_tle)
225                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
226         if (!list_empty(&tconn->out_of_sequence_requests))
227                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
228         kfree(tconn->oldest_tle);
229         tconn->oldest_tle = NULL;
230         kfree(tconn->unused_spare_tle);
231         tconn->unused_spare_tle = NULL;
232 }
233
234 /**
235  * _tl_add_barrier() - Adds a barrier to the transfer log
236  * @mdev:       DRBD device.
237  * @new:        Barrier to be added before the current head of the TL.
238  *
239  * The caller must hold the req_lock.
240  */
241 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
242 {
243         struct drbd_tl_epoch *newest_before;
244
245         INIT_LIST_HEAD(&new->requests);
246         INIT_LIST_HEAD(&new->w.list);
247         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
248         new->next = NULL;
249         new->n_writes = 0;
250
251         newest_before = tconn->newest_tle;
252         /* never send a barrier number == 0, because that is special-cased
253          * when using TCQ for our write ordering code */
254         new->br_number = (newest_before->br_number+1) ?: 1;
255         if (tconn->newest_tle != new) {
256                 tconn->newest_tle->next = new;
257                 tconn->newest_tle = new;
258         }
259 }
260
261 /**
262  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
263  * @mdev:       DRBD device.
264  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
265  * @set_size:   Expected number of requests before that barrier.
266  *
267  * In case the passed barrier_nr or set_size does not match the oldest
268  * &struct drbd_tl_epoch objects this function will cause a termination
269  * of the connection.
270  */
271 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
272                 unsigned int set_size)
273 {
274         struct drbd_conf *mdev;
275         struct drbd_tl_epoch *b, *nob; /* next old barrier */
276         struct list_head *le, *tle;
277         struct drbd_request *r;
278
279         spin_lock_irq(&tconn->req_lock);
280
281         b = tconn->oldest_tle;
282
283         /* first some paranoia code */
284         if (b == NULL) {
285                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
286                          barrier_nr);
287                 goto bail;
288         }
289         if (b->br_number != barrier_nr) {
290                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
291                          barrier_nr, b->br_number);
292                 goto bail;
293         }
294         if (b->n_writes != set_size) {
295                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
296                          barrier_nr, set_size, b->n_writes);
297                 goto bail;
298         }
299
300         /* Clean up list of requests processed during current epoch */
301         list_for_each_safe(le, tle, &b->requests) {
302                 r = list_entry(le, struct drbd_request, tl_requests);
303                 _req_mod(r, BARRIER_ACKED);
304         }
305         /* There could be requests on the list waiting for completion
306            of the write to the local disk. To avoid corruptions of
307            slab's data structures we have to remove the lists head.
308
309            Also there could have been a barrier ack out of sequence, overtaking
310            the write acks - which would be a bug and violating write ordering.
311            To not deadlock in case we lose connection while such requests are
312            still pending, we need some way to find them for the
313            _req_mode(CONNECTION_LOST_WHILE_PENDING).
314
315            These have been list_move'd to the out_of_sequence_requests list in
316            _req_mod(, BARRIER_ACKED) above.
317            */
318         list_del_init(&b->requests);
319         mdev = b->w.mdev;
320
321         nob = b->next;
322         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
323                 _tl_add_barrier(tconn, b);
324                 if (nob)
325                         tconn->oldest_tle = nob;
326                 /* if nob == NULL b was the only barrier, and becomes the new
327                    barrier. Therefore tconn->oldest_tle points already to b */
328         } else {
329                 D_ASSERT(nob != NULL);
330                 tconn->oldest_tle = nob;
331                 kfree(b);
332         }
333
334         spin_unlock_irq(&tconn->req_lock);
335         dec_ap_pending(mdev);
336
337         return;
338
339 bail:
340         spin_unlock_irq(&tconn->req_lock);
341         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
342 }
343
344
345 /**
346  * _tl_restart() - Walks the transfer log, and applies an action to all requests
347  * @mdev:       DRBD device.
348  * @what:       The action/event to perform with all request objects
349  *
350  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
351  * RESTART_FROZEN_DISK_IO.
352  */
353 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
354 {
355         struct drbd_tl_epoch *b, *tmp, **pn;
356         struct list_head *le, *tle, carry_reads;
357         struct drbd_request *req;
358         int rv, n_writes, n_reads;
359
360         b = tconn->oldest_tle;
361         pn = &tconn->oldest_tle;
362         while (b) {
363                 n_writes = 0;
364                 n_reads = 0;
365                 INIT_LIST_HEAD(&carry_reads);
366                 list_for_each_safe(le, tle, &b->requests) {
367                         req = list_entry(le, struct drbd_request, tl_requests);
368                         rv = _req_mod(req, what);
369
370                         n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
371                         n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
372                 }
373                 tmp = b->next;
374
375                 if (n_writes) {
376                         if (what == RESEND) {
377                                 b->n_writes = n_writes;
378                                 if (b->w.cb == NULL) {
379                                         b->w.cb = w_send_barrier;
380                                         inc_ap_pending(b->w.mdev);
381                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
382                                 }
383
384                                 drbd_queue_work(&tconn->data.work, &b->w);
385                         }
386                         pn = &b->next;
387                 } else {
388                         if (n_reads)
389                                 list_add(&carry_reads, &b->requests);
390                         /* there could still be requests on that ring list,
391                          * in case local io is still pending */
392                         list_del(&b->requests);
393
394                         /* dec_ap_pending corresponding to queue_barrier.
395                          * the newest barrier may not have been queued yet,
396                          * in which case w.cb is still NULL. */
397                         if (b->w.cb != NULL)
398                                 dec_ap_pending(b->w.mdev);
399
400                         if (b == tconn->newest_tle) {
401                                 /* recycle, but reinit! */
402                                 if (tmp != NULL)
403                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
404                                 INIT_LIST_HEAD(&b->requests);
405                                 list_splice(&carry_reads, &b->requests);
406                                 INIT_LIST_HEAD(&b->w.list);
407                                 b->w.cb = NULL;
408                                 b->br_number = net_random();
409                                 b->n_writes = 0;
410
411                                 *pn = b;
412                                 break;
413                         }
414                         *pn = tmp;
415                         kfree(b);
416                 }
417                 b = tmp;
418                 list_splice(&carry_reads, &b->requests);
419         }
420 }
421
422
423 /**
424  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
425  * @mdev:       DRBD device.
426  *
427  * This is called after the connection to the peer was lost. The storage covered
428  * by the requests on the transfer gets marked as our of sync. Called from the
429  * receiver thread and the worker thread.
430  */
431 void tl_clear(struct drbd_tconn *tconn)
432 {
433         struct drbd_conf *mdev;
434         struct list_head *le, *tle;
435         struct drbd_request *r;
436         int vnr;
437
438         spin_lock_irq(&tconn->req_lock);
439
440         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
441
442         /* we expect this list to be empty. */
443         if (!list_empty(&tconn->out_of_sequence_requests))
444                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
445
446         /* but just in case, clean it up anyways! */
447         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
448                 r = list_entry(le, struct drbd_request, tl_requests);
449                 /* It would be nice to complete outside of spinlock.
450                  * But this is easier for now. */
451                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
452         }
453
454         /* ensure bit indicating barrier is required is clear */
455         rcu_read_lock();
456         idr_for_each_entry(&tconn->volumes, mdev, vnr)
457                 clear_bit(CREATE_BARRIER, &mdev->flags);
458         rcu_read_unlock();
459
460         spin_unlock_irq(&tconn->req_lock);
461 }
462
463 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
464 {
465         spin_lock_irq(&tconn->req_lock);
466         _tl_restart(tconn, what);
467         spin_unlock_irq(&tconn->req_lock);
468 }
469
470 static int drbd_thread_setup(void *arg)
471 {
472         struct drbd_thread *thi = (struct drbd_thread *) arg;
473         struct drbd_tconn *tconn = thi->tconn;
474         unsigned long flags;
475         int retval;
476
477         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
478                  thi->name[0], thi->tconn->name);
479
480 restart:
481         retval = thi->function(thi);
482
483         spin_lock_irqsave(&thi->t_lock, flags);
484
485         /* if the receiver has been "EXITING", the last thing it did
486          * was set the conn state to "StandAlone",
487          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
488          * and receiver thread will be "started".
489          * drbd_thread_start needs to set "RESTARTING" in that case.
490          * t_state check and assignment needs to be within the same spinlock,
491          * so either thread_start sees EXITING, and can remap to RESTARTING,
492          * or thread_start see NONE, and can proceed as normal.
493          */
494
495         if (thi->t_state == RESTARTING) {
496                 conn_info(tconn, "Restarting %s thread\n", thi->name);
497                 thi->t_state = RUNNING;
498                 spin_unlock_irqrestore(&thi->t_lock, flags);
499                 goto restart;
500         }
501
502         thi->task = NULL;
503         thi->t_state = NONE;
504         smp_mb();
505         complete_all(&thi->stop);
506         spin_unlock_irqrestore(&thi->t_lock, flags);
507
508         conn_info(tconn, "Terminating %s\n", current->comm);
509
510         /* Release mod reference taken when thread was started */
511
512         kref_put(&tconn->kref, &conn_destroy);
513         module_put(THIS_MODULE);
514         return retval;
515 }
516
517 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
518                              int (*func) (struct drbd_thread *), char *name)
519 {
520         spin_lock_init(&thi->t_lock);
521         thi->task    = NULL;
522         thi->t_state = NONE;
523         thi->function = func;
524         thi->tconn = tconn;
525         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
526 }
527
528 int drbd_thread_start(struct drbd_thread *thi)
529 {
530         struct drbd_tconn *tconn = thi->tconn;
531         struct task_struct *nt;
532         unsigned long flags;
533
534         /* is used from state engine doing drbd_thread_stop_nowait,
535          * while holding the req lock irqsave */
536         spin_lock_irqsave(&thi->t_lock, flags);
537
538         switch (thi->t_state) {
539         case NONE:
540                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
541                          thi->name, current->comm, current->pid);
542
543                 /* Get ref on module for thread - this is released when thread exits */
544                 if (!try_module_get(THIS_MODULE)) {
545                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
546                         spin_unlock_irqrestore(&thi->t_lock, flags);
547                         return false;
548                 }
549
550                 kref_get(&thi->tconn->kref);
551
552                 init_completion(&thi->stop);
553                 thi->reset_cpu_mask = 1;
554                 thi->t_state = RUNNING;
555                 spin_unlock_irqrestore(&thi->t_lock, flags);
556                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
557
558                 nt = kthread_create(drbd_thread_setup, (void *) thi,
559                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
560
561                 if (IS_ERR(nt)) {
562                         conn_err(tconn, "Couldn't start thread\n");
563
564                         kref_put(&tconn->kref, &conn_destroy);
565                         module_put(THIS_MODULE);
566                         return false;
567                 }
568                 spin_lock_irqsave(&thi->t_lock, flags);
569                 thi->task = nt;
570                 thi->t_state = RUNNING;
571                 spin_unlock_irqrestore(&thi->t_lock, flags);
572                 wake_up_process(nt);
573                 break;
574         case EXITING:
575                 thi->t_state = RESTARTING;
576                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
577                                 thi->name, current->comm, current->pid);
578                 /* fall through */
579         case RUNNING:
580         case RESTARTING:
581         default:
582                 spin_unlock_irqrestore(&thi->t_lock, flags);
583                 break;
584         }
585
586         return true;
587 }
588
589
590 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
591 {
592         unsigned long flags;
593
594         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
595
596         /* may be called from state engine, holding the req lock irqsave */
597         spin_lock_irqsave(&thi->t_lock, flags);
598
599         if (thi->t_state == NONE) {
600                 spin_unlock_irqrestore(&thi->t_lock, flags);
601                 if (restart)
602                         drbd_thread_start(thi);
603                 return;
604         }
605
606         if (thi->t_state != ns) {
607                 if (thi->task == NULL) {
608                         spin_unlock_irqrestore(&thi->t_lock, flags);
609                         return;
610                 }
611
612                 thi->t_state = ns;
613                 smp_mb();
614                 init_completion(&thi->stop);
615                 if (thi->task != current)
616                         force_sig(DRBD_SIGKILL, thi->task);
617         }
618
619         spin_unlock_irqrestore(&thi->t_lock, flags);
620
621         if (wait)
622                 wait_for_completion(&thi->stop);
623 }
624
625 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
626 {
627         struct drbd_thread *thi =
628                 task == tconn->receiver.task ? &tconn->receiver :
629                 task == tconn->asender.task  ? &tconn->asender :
630                 task == tconn->worker.task   ? &tconn->worker : NULL;
631
632         return thi;
633 }
634
635 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
636 {
637         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
638         return thi ? thi->name : task->comm;
639 }
640
641 int conn_lowest_minor(struct drbd_tconn *tconn)
642 {
643         struct drbd_conf *mdev;
644         int vnr = 0, m;
645
646         rcu_read_lock();
647         mdev = idr_get_next(&tconn->volumes, &vnr);
648         m = mdev ? mdev_to_minor(mdev) : -1;
649         rcu_read_unlock();
650
651         return m;
652 }
653
654 #ifdef CONFIG_SMP
655 /**
656  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
657  * @mdev:       DRBD device.
658  *
659  * Forces all threads of a device onto the same CPU. This is beneficial for
660  * DRBD's performance. May be overwritten by user's configuration.
661  */
662 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
663 {
664         int ord, cpu;
665
666         /* user override. */
667         if (cpumask_weight(tconn->cpu_mask))
668                 return;
669
670         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
671         for_each_online_cpu(cpu) {
672                 if (ord-- == 0) {
673                         cpumask_set_cpu(cpu, tconn->cpu_mask);
674                         return;
675                 }
676         }
677         /* should not be reached */
678         cpumask_setall(tconn->cpu_mask);
679 }
680
681 /**
682  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
683  * @mdev:       DRBD device.
684  * @thi:        drbd_thread object
685  *
686  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
687  * prematurely.
688  */
689 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
690 {
691         struct task_struct *p = current;
692
693         if (!thi->reset_cpu_mask)
694                 return;
695         thi->reset_cpu_mask = 0;
696         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
697 }
698 #endif
699
700 /**
701  * drbd_header_size  -  size of a packet header
702  *
703  * The header size is a multiple of 8, so any payload following the header is
704  * word aligned on 64-bit architectures.  (The bitmap send and receive code
705  * relies on this.)
706  */
707 unsigned int drbd_header_size(struct drbd_tconn *tconn)
708 {
709         if (tconn->agreed_pro_version >= 100) {
710                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
711                 return sizeof(struct p_header100);
712         } else {
713                 BUILD_BUG_ON(sizeof(struct p_header80) !=
714                              sizeof(struct p_header95));
715                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
716                 return sizeof(struct p_header80);
717         }
718 }
719
720 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
721 {
722         h->magic   = cpu_to_be32(DRBD_MAGIC);
723         h->command = cpu_to_be16(cmd);
724         h->length  = cpu_to_be16(size);
725         return sizeof(struct p_header80);
726 }
727
728 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
729 {
730         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
731         h->command = cpu_to_be16(cmd);
732         h->length = cpu_to_be32(size);
733         return sizeof(struct p_header95);
734 }
735
736 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
737                                       int size, int vnr)
738 {
739         h->magic = cpu_to_be32(DRBD_MAGIC_100);
740         h->volume = cpu_to_be16(vnr);
741         h->command = cpu_to_be16(cmd);
742         h->length = cpu_to_be32(size);
743         h->pad = 0;
744         return sizeof(struct p_header100);
745 }
746
747 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
748                                    void *buffer, enum drbd_packet cmd, int size)
749 {
750         if (tconn->agreed_pro_version >= 100)
751                 return prepare_header100(buffer, cmd, size, vnr);
752         else if (tconn->agreed_pro_version >= 95 &&
753                  size > DRBD_MAX_SIZE_H80_PACKET)
754                 return prepare_header95(buffer, cmd, size);
755         else
756                 return prepare_header80(buffer, cmd, size);
757 }
758
759 static void *__conn_prepare_command(struct drbd_tconn *tconn,
760                                     struct drbd_socket *sock)
761 {
762         if (!sock->socket)
763                 return NULL;
764         return sock->sbuf + drbd_header_size(tconn);
765 }
766
767 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
768 {
769         void *p;
770
771         mutex_lock(&sock->mutex);
772         p = __conn_prepare_command(tconn, sock);
773         if (!p)
774                 mutex_unlock(&sock->mutex);
775
776         return p;
777 }
778
779 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
780 {
781         return conn_prepare_command(mdev->tconn, sock);
782 }
783
784 static int __send_command(struct drbd_tconn *tconn, int vnr,
785                           struct drbd_socket *sock, enum drbd_packet cmd,
786                           unsigned int header_size, void *data,
787                           unsigned int size)
788 {
789         int msg_flags;
790         int err;
791
792         /*
793          * Called with @data == NULL and the size of the data blocks in @size
794          * for commands that send data blocks.  For those commands, omit the
795          * MSG_MORE flag: this will increase the likelihood that data blocks
796          * which are page aligned on the sender will end up page aligned on the
797          * receiver.
798          */
799         msg_flags = data ? MSG_MORE : 0;
800
801         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
802                                       header_size + size);
803         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
804                             msg_flags);
805         if (data && !err)
806                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
807         return err;
808 }
809
810 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
811                                enum drbd_packet cmd, unsigned int header_size,
812                                void *data, unsigned int size)
813 {
814         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
815 }
816
817 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
818                       enum drbd_packet cmd, unsigned int header_size,
819                       void *data, unsigned int size)
820 {
821         int err;
822
823         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
824         mutex_unlock(&sock->mutex);
825         return err;
826 }
827
828 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
829                       enum drbd_packet cmd, unsigned int header_size,
830                       void *data, unsigned int size)
831 {
832         int err;
833
834         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
835                              data, size);
836         mutex_unlock(&sock->mutex);
837         return err;
838 }
839
840 int drbd_send_ping(struct drbd_tconn *tconn)
841 {
842         struct drbd_socket *sock;
843
844         sock = &tconn->meta;
845         if (!conn_prepare_command(tconn, sock))
846                 return -EIO;
847         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
848 }
849
850 int drbd_send_ping_ack(struct drbd_tconn *tconn)
851 {
852         struct drbd_socket *sock;
853
854         sock = &tconn->meta;
855         if (!conn_prepare_command(tconn, sock))
856                 return -EIO;
857         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
858 }
859
860 int drbd_send_sync_param(struct drbd_conf *mdev)
861 {
862         struct drbd_socket *sock;
863         struct p_rs_param_95 *p;
864         int size;
865         const int apv = mdev->tconn->agreed_pro_version;
866         enum drbd_packet cmd;
867         struct net_conf *nc;
868         struct disk_conf *dc;
869
870         sock = &mdev->tconn->data;
871         p = drbd_prepare_command(mdev, sock);
872         if (!p)
873                 return -EIO;
874
875         rcu_read_lock();
876         nc = rcu_dereference(mdev->tconn->net_conf);
877
878         size = apv <= 87 ? sizeof(struct p_rs_param)
879                 : apv == 88 ? sizeof(struct p_rs_param)
880                         + strlen(nc->verify_alg) + 1
881                 : apv <= 94 ? sizeof(struct p_rs_param_89)
882                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
883
884         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
885
886         /* initialize verify_alg and csums_alg */
887         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
888
889         if (get_ldev(mdev)) {
890                 dc = rcu_dereference(mdev->ldev->disk_conf);
891                 p->resync_rate = cpu_to_be32(dc->resync_rate);
892                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
893                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
894                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
895                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
896                 put_ldev(mdev);
897         } else {
898                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
899                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
900                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
901                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
902                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
903         }
904
905         if (apv >= 88)
906                 strcpy(p->verify_alg, nc->verify_alg);
907         if (apv >= 89)
908                 strcpy(p->csums_alg, nc->csums_alg);
909         rcu_read_unlock();
910
911         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
912 }
913
914 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
915 {
916         struct drbd_socket *sock;
917         struct p_protocol *p;
918         struct net_conf *nc;
919         int size, cf;
920
921         sock = &tconn->data;
922         p = __conn_prepare_command(tconn, sock);
923         if (!p)
924                 return -EIO;
925
926         rcu_read_lock();
927         nc = rcu_dereference(tconn->net_conf);
928
929         if (nc->dry_run && tconn->agreed_pro_version < 92) {
930                 rcu_read_unlock();
931                 mutex_unlock(&sock->mutex);
932                 conn_err(tconn, "--dry-run is not supported by peer");
933                 return -EOPNOTSUPP;
934         }
935
936         size = sizeof(*p);
937         if (tconn->agreed_pro_version >= 87)
938                 size += strlen(nc->integrity_alg) + 1;
939
940         p->protocol      = cpu_to_be32(nc->wire_protocol);
941         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
942         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
943         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
944         p->two_primaries = cpu_to_be32(nc->two_primaries);
945         cf = 0;
946         if (nc->discard_my_data)
947                 cf |= CF_DISCARD_MY_DATA;
948         if (nc->dry_run)
949                 cf |= CF_DRY_RUN;
950         p->conn_flags    = cpu_to_be32(cf);
951
952         if (tconn->agreed_pro_version >= 87)
953                 strcpy(p->integrity_alg, nc->integrity_alg);
954         rcu_read_unlock();
955
956         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
957 }
958
959 int drbd_send_protocol(struct drbd_tconn *tconn)
960 {
961         int err;
962
963         mutex_lock(&tconn->data.mutex);
964         err = __drbd_send_protocol(tconn, P_PROTOCOL);
965         mutex_unlock(&tconn->data.mutex);
966
967         return err;
968 }
969
970 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
971 {
972         struct drbd_socket *sock;
973         struct p_uuids *p;
974         int i;
975
976         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
977                 return 0;
978
979         sock = &mdev->tconn->data;
980         p = drbd_prepare_command(mdev, sock);
981         if (!p) {
982                 put_ldev(mdev);
983                 return -EIO;
984         }
985         for (i = UI_CURRENT; i < UI_SIZE; i++)
986                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
987
988         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
989         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
990         rcu_read_lock();
991         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
992         rcu_read_unlock();
993         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
994         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
995         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
996
997         put_ldev(mdev);
998         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
999 }
1000
1001 int drbd_send_uuids(struct drbd_conf *mdev)
1002 {
1003         return _drbd_send_uuids(mdev, 0);
1004 }
1005
1006 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1007 {
1008         return _drbd_send_uuids(mdev, 8);
1009 }
1010
1011 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1012 {
1013         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1014                 u64 *uuid = mdev->ldev->md.uuid;
1015                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1016                      text,
1017                      (unsigned long long)uuid[UI_CURRENT],
1018                      (unsigned long long)uuid[UI_BITMAP],
1019                      (unsigned long long)uuid[UI_HISTORY_START],
1020                      (unsigned long long)uuid[UI_HISTORY_END]);
1021                 put_ldev(mdev);
1022         } else {
1023                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1024                                 text,
1025                                 (unsigned long long)mdev->ed_uuid);
1026         }
1027 }
1028
1029 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1030 {
1031         struct drbd_socket *sock;
1032         struct p_rs_uuid *p;
1033         u64 uuid;
1034
1035         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1036
1037         uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1038         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1039         drbd_print_uuids(mdev, "updated sync UUID");
1040         drbd_md_sync(mdev);
1041
1042         sock = &mdev->tconn->data;
1043         p = drbd_prepare_command(mdev, sock);
1044         if (p) {
1045                 p->uuid = cpu_to_be64(uuid);
1046                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1047         }
1048 }
1049
1050 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1051 {
1052         struct drbd_socket *sock;
1053         struct p_sizes *p;
1054         sector_t d_size, u_size;
1055         int q_order_type, max_bio_size;
1056
1057         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1058                 D_ASSERT(mdev->ldev->backing_bdev);
1059                 d_size = drbd_get_max_capacity(mdev->ldev);
1060                 rcu_read_lock();
1061                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1062                 rcu_read_unlock();
1063                 q_order_type = drbd_queue_order_type(mdev);
1064                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1065                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1066                 put_ldev(mdev);
1067         } else {
1068                 d_size = 0;
1069                 u_size = 0;
1070                 q_order_type = QUEUE_ORDERED_NONE;
1071                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1072         }
1073
1074         sock = &mdev->tconn->data;
1075         p = drbd_prepare_command(mdev, sock);
1076         if (!p)
1077                 return -EIO;
1078         p->d_size = cpu_to_be64(d_size);
1079         p->u_size = cpu_to_be64(u_size);
1080         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1081         p->max_bio_size = cpu_to_be32(max_bio_size);
1082         p->queue_order_type = cpu_to_be16(q_order_type);
1083         p->dds_flags = cpu_to_be16(flags);
1084         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1085 }
1086
1087 /**
1088  * drbd_send_state() - Sends the drbd state to the peer
1089  * @mdev:       DRBD device.
1090  */
1091 int drbd_send_state(struct drbd_conf *mdev)
1092 {
1093         struct drbd_socket *sock;
1094         struct p_state *p;
1095
1096         sock = &mdev->tconn->data;
1097         p = drbd_prepare_command(mdev, sock);
1098         if (!p)
1099                 return -EIO;
1100         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1101         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1102 }
1103
1104 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1105 {
1106         struct drbd_socket *sock;
1107         struct p_req_state *p;
1108
1109         sock = &mdev->tconn->data;
1110         p = drbd_prepare_command(mdev, sock);
1111         if (!p)
1112                 return -EIO;
1113         p->mask = cpu_to_be32(mask.i);
1114         p->val = cpu_to_be32(val.i);
1115         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1116
1117 }
1118
1119 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1120 {
1121         enum drbd_packet cmd;
1122         struct drbd_socket *sock;
1123         struct p_req_state *p;
1124
1125         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1126         sock = &tconn->data;
1127         p = conn_prepare_command(tconn, sock);
1128         if (!p)
1129                 return -EIO;
1130         p->mask = cpu_to_be32(mask.i);
1131         p->val = cpu_to_be32(val.i);
1132         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1133 }
1134
1135 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1136 {
1137         struct drbd_socket *sock;
1138         struct p_req_state_reply *p;
1139
1140         sock = &mdev->tconn->meta;
1141         p = drbd_prepare_command(mdev, sock);
1142         if (p) {
1143                 p->retcode = cpu_to_be32(retcode);
1144                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1145         }
1146 }
1147
1148 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1149 {
1150         struct drbd_socket *sock;
1151         struct p_req_state_reply *p;
1152         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1153
1154         sock = &tconn->meta;
1155         p = conn_prepare_command(tconn, sock);
1156         if (p) {
1157                 p->retcode = cpu_to_be32(retcode);
1158                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1159         }
1160 }
1161
1162 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1163 {
1164         BUG_ON(code & ~0xf);
1165         p->encoding = (p->encoding & ~0xf) | code;
1166 }
1167
1168 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1169 {
1170         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1171 }
1172
1173 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1174 {
1175         BUG_ON(n & ~0x7);
1176         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1177 }
1178
1179 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1180                          struct p_compressed_bm *p,
1181                          unsigned int size,
1182                          struct bm_xfer_ctx *c)
1183 {
1184         struct bitstream bs;
1185         unsigned long plain_bits;
1186         unsigned long tmp;
1187         unsigned long rl;
1188         unsigned len;
1189         unsigned toggle;
1190         int bits, use_rle;
1191
1192         /* may we use this feature? */
1193         rcu_read_lock();
1194         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1195         rcu_read_unlock();
1196         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1197                 return 0;
1198
1199         if (c->bit_offset >= c->bm_bits)
1200                 return 0; /* nothing to do. */
1201
1202         /* use at most thus many bytes */
1203         bitstream_init(&bs, p->code, size, 0);
1204         memset(p->code, 0, size);
1205         /* plain bits covered in this code string */
1206         plain_bits = 0;
1207
1208         /* p->encoding & 0x80 stores whether the first run length is set.
1209          * bit offset is implicit.
1210          * start with toggle == 2 to be able to tell the first iteration */
1211         toggle = 2;
1212
1213         /* see how much plain bits we can stuff into one packet
1214          * using RLE and VLI. */
1215         do {
1216                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1217                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1218                 if (tmp == -1UL)
1219                         tmp = c->bm_bits;
1220                 rl = tmp - c->bit_offset;
1221
1222                 if (toggle == 2) { /* first iteration */
1223                         if (rl == 0) {
1224                                 /* the first checked bit was set,
1225                                  * store start value, */
1226                                 dcbp_set_start(p, 1);
1227                                 /* but skip encoding of zero run length */
1228                                 toggle = !toggle;
1229                                 continue;
1230                         }
1231                         dcbp_set_start(p, 0);
1232                 }
1233
1234                 /* paranoia: catch zero runlength.
1235                  * can only happen if bitmap is modified while we scan it. */
1236                 if (rl == 0) {
1237                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1238                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1239                         return -1;
1240                 }
1241
1242                 bits = vli_encode_bits(&bs, rl);
1243                 if (bits == -ENOBUFS) /* buffer full */
1244                         break;
1245                 if (bits <= 0) {
1246                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1247                         return 0;
1248                 }
1249
1250                 toggle = !toggle;
1251                 plain_bits += rl;
1252                 c->bit_offset = tmp;
1253         } while (c->bit_offset < c->bm_bits);
1254
1255         len = bs.cur.b - p->code + !!bs.cur.bit;
1256
1257         if (plain_bits < (len << 3)) {
1258                 /* incompressible with this method.
1259                  * we need to rewind both word and bit position. */
1260                 c->bit_offset -= plain_bits;
1261                 bm_xfer_ctx_bit_to_word_offset(c);
1262                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1263                 return 0;
1264         }
1265
1266         /* RLE + VLI was able to compress it just fine.
1267          * update c->word_offset. */
1268         bm_xfer_ctx_bit_to_word_offset(c);
1269
1270         /* store pad_bits */
1271         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1272
1273         return len;
1274 }
1275
1276 /**
1277  * send_bitmap_rle_or_plain
1278  *
1279  * Return 0 when done, 1 when another iteration is needed, and a negative error
1280  * code upon failure.
1281  */
1282 static int
1283 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1284 {
1285         struct drbd_socket *sock = &mdev->tconn->data;
1286         unsigned int header_size = drbd_header_size(mdev->tconn);
1287         struct p_compressed_bm *p = sock->sbuf + header_size;
1288         int len, err;
1289
1290         len = fill_bitmap_rle_bits(mdev, p,
1291                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1292         if (len < 0)
1293                 return -EIO;
1294
1295         if (len) {
1296                 dcbp_set_code(p, RLE_VLI_Bits);
1297                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1298                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1299                                      NULL, 0);
1300                 c->packets[0]++;
1301                 c->bytes[0] += header_size + sizeof(*p) + len;
1302
1303                 if (c->bit_offset >= c->bm_bits)
1304                         len = 0; /* DONE */
1305         } else {
1306                 /* was not compressible.
1307                  * send a buffer full of plain text bits instead. */
1308                 unsigned int data_size;
1309                 unsigned long num_words;
1310                 unsigned long *p = sock->sbuf + header_size;
1311
1312                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1313                 num_words = min_t(size_t, data_size / sizeof(*p),
1314                                   c->bm_words - c->word_offset);
1315                 len = num_words * sizeof(*p);
1316                 if (len)
1317                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1318                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1319                 c->word_offset += num_words;
1320                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1321
1322                 c->packets[1]++;
1323                 c->bytes[1] += header_size + len;
1324
1325                 if (c->bit_offset > c->bm_bits)
1326                         c->bit_offset = c->bm_bits;
1327         }
1328         if (!err) {
1329                 if (len == 0) {
1330                         INFO_bm_xfer_stats(mdev, "send", c);
1331                         return 0;
1332                 } else
1333                         return 1;
1334         }
1335         return -EIO;
1336 }
1337
1338 /* See the comment at receive_bitmap() */
1339 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1340 {
1341         struct bm_xfer_ctx c;
1342         int err;
1343
1344         if (!expect(mdev->bitmap))
1345                 return false;
1346
1347         if (get_ldev(mdev)) {
1348                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1349                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1350                         drbd_bm_set_all(mdev);
1351                         if (drbd_bm_write(mdev)) {
1352                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1353                                  * but otherwise process as per normal - need to tell other
1354                                  * side that a full resync is required! */
1355                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1356                         } else {
1357                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1358                                 drbd_md_sync(mdev);
1359                         }
1360                 }
1361                 put_ldev(mdev);
1362         }
1363
1364         c = (struct bm_xfer_ctx) {
1365                 .bm_bits = drbd_bm_bits(mdev),
1366                 .bm_words = drbd_bm_words(mdev),
1367         };
1368
1369         do {
1370                 err = send_bitmap_rle_or_plain(mdev, &c);
1371         } while (err > 0);
1372
1373         return err == 0;
1374 }
1375
1376 int drbd_send_bitmap(struct drbd_conf *mdev)
1377 {
1378         struct drbd_socket *sock = &mdev->tconn->data;
1379         int err = -1;
1380
1381         mutex_lock(&sock->mutex);
1382         if (sock->socket)
1383                 err = !_drbd_send_bitmap(mdev);
1384         mutex_unlock(&sock->mutex);
1385         return err;
1386 }
1387
1388 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1389 {
1390         struct drbd_socket *sock;
1391         struct p_barrier_ack *p;
1392
1393         if (mdev->state.conn < C_CONNECTED)
1394                 return;
1395
1396         sock = &mdev->tconn->meta;
1397         p = drbd_prepare_command(mdev, sock);
1398         if (!p)
1399                 return;
1400         p->barrier = barrier_nr;
1401         p->set_size = cpu_to_be32(set_size);
1402         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1403 }
1404
1405 /**
1406  * _drbd_send_ack() - Sends an ack packet
1407  * @mdev:       DRBD device.
1408  * @cmd:        Packet command code.
1409  * @sector:     sector, needs to be in big endian byte order
1410  * @blksize:    size in byte, needs to be in big endian byte order
1411  * @block_id:   Id, big endian byte order
1412  */
1413 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1414                           u64 sector, u32 blksize, u64 block_id)
1415 {
1416         struct drbd_socket *sock;
1417         struct p_block_ack *p;
1418
1419         if (mdev->state.conn < C_CONNECTED)
1420                 return -EIO;
1421
1422         sock = &mdev->tconn->meta;
1423         p = drbd_prepare_command(mdev, sock);
1424         if (!p)
1425                 return -EIO;
1426         p->sector = sector;
1427         p->block_id = block_id;
1428         p->blksize = blksize;
1429         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1430         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1431 }
1432
1433 /* dp->sector and dp->block_id already/still in network byte order,
1434  * data_size is payload size according to dp->head,
1435  * and may need to be corrected for digest size. */
1436 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1437                       struct p_data *dp, int data_size)
1438 {
1439         if (mdev->tconn->peer_integrity_tfm)
1440                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1441         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1442                        dp->block_id);
1443 }
1444
1445 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1446                       struct p_block_req *rp)
1447 {
1448         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1449 }
1450
1451 /**
1452  * drbd_send_ack() - Sends an ack packet
1453  * @mdev:       DRBD device
1454  * @cmd:        packet command code
1455  * @peer_req:   peer request
1456  */
1457 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1458                   struct drbd_peer_request *peer_req)
1459 {
1460         return _drbd_send_ack(mdev, cmd,
1461                               cpu_to_be64(peer_req->i.sector),
1462                               cpu_to_be32(peer_req->i.size),
1463                               peer_req->block_id);
1464 }
1465
1466 /* This function misuses the block_id field to signal if the blocks
1467  * are is sync or not. */
1468 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1469                      sector_t sector, int blksize, u64 block_id)
1470 {
1471         return _drbd_send_ack(mdev, cmd,
1472                               cpu_to_be64(sector),
1473                               cpu_to_be32(blksize),
1474                               cpu_to_be64(block_id));
1475 }
1476
1477 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1478                        sector_t sector, int size, u64 block_id)
1479 {
1480         struct drbd_socket *sock;
1481         struct p_block_req *p;
1482
1483         sock = &mdev->tconn->data;
1484         p = drbd_prepare_command(mdev, sock);
1485         if (!p)
1486                 return -EIO;
1487         p->sector = cpu_to_be64(sector);
1488         p->block_id = block_id;
1489         p->blksize = cpu_to_be32(size);
1490         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1491 }
1492
1493 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1494                             void *digest, int digest_size, enum drbd_packet cmd)
1495 {
1496         struct drbd_socket *sock;
1497         struct p_block_req *p;
1498
1499         /* FIXME: Put the digest into the preallocated socket buffer.  */
1500
1501         sock = &mdev->tconn->data;
1502         p = drbd_prepare_command(mdev, sock);
1503         if (!p)
1504                 return -EIO;
1505         p->sector = cpu_to_be64(sector);
1506         p->block_id = ID_SYNCER /* unused */;
1507         p->blksize = cpu_to_be32(size);
1508         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1509                                  digest, digest_size);
1510 }
1511
1512 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1513 {
1514         struct drbd_socket *sock;
1515         struct p_block_req *p;
1516
1517         sock = &mdev->tconn->data;
1518         p = drbd_prepare_command(mdev, sock);
1519         if (!p)
1520                 return -EIO;
1521         p->sector = cpu_to_be64(sector);
1522         p->block_id = ID_SYNCER /* unused */;
1523         p->blksize = cpu_to_be32(size);
1524         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1525 }
1526
1527 /* called on sndtimeo
1528  * returns false if we should retry,
1529  * true if we think connection is dead
1530  */
1531 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1532 {
1533         int drop_it;
1534         /* long elapsed = (long)(jiffies - mdev->last_received); */
1535
1536         drop_it =   tconn->meta.socket == sock
1537                 || !tconn->asender.task
1538                 || get_t_state(&tconn->asender) != RUNNING
1539                 || tconn->cstate < C_WF_REPORT_PARAMS;
1540
1541         if (drop_it)
1542                 return true;
1543
1544         drop_it = !--tconn->ko_count;
1545         if (!drop_it) {
1546                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1547                          current->comm, current->pid, tconn->ko_count);
1548                 request_ping(tconn);
1549         }
1550
1551         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1552 }
1553
1554 static void drbd_update_congested(struct drbd_tconn *tconn)
1555 {
1556         struct sock *sk = tconn->data.socket->sk;
1557         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1558                 set_bit(NET_CONGESTED, &tconn->flags);
1559 }
1560
1561 /* The idea of sendpage seems to be to put some kind of reference
1562  * to the page into the skb, and to hand it over to the NIC. In
1563  * this process get_page() gets called.
1564  *
1565  * As soon as the page was really sent over the network put_page()
1566  * gets called by some part of the network layer. [ NIC driver? ]
1567  *
1568  * [ get_page() / put_page() increment/decrement the count. If count
1569  *   reaches 0 the page will be freed. ]
1570  *
1571  * This works nicely with pages from FSs.
1572  * But this means that in protocol A we might signal IO completion too early!
1573  *
1574  * In order not to corrupt data during a resync we must make sure
1575  * that we do not reuse our own buffer pages (EEs) to early, therefore
1576  * we have the net_ee list.
1577  *
1578  * XFS seems to have problems, still, it submits pages with page_count == 0!
1579  * As a workaround, we disable sendpage on pages
1580  * with page_count == 0 or PageSlab.
1581  */
1582 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1583                               int offset, size_t size, unsigned msg_flags)
1584 {
1585         struct socket *socket;
1586         void *addr;
1587         int err;
1588
1589         socket = mdev->tconn->data.socket;
1590         addr = kmap(page) + offset;
1591         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1592         kunmap(page);
1593         if (!err)
1594                 mdev->send_cnt += size >> 9;
1595         return err;
1596 }
1597
1598 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1599                     int offset, size_t size, unsigned msg_flags)
1600 {
1601         struct socket *socket = mdev->tconn->data.socket;
1602         mm_segment_t oldfs = get_fs();
1603         int len = size;
1604         int err = -EIO;
1605
1606         /* e.g. XFS meta- & log-data is in slab pages, which have a
1607          * page_count of 0 and/or have PageSlab() set.
1608          * we cannot use send_page for those, as that does get_page();
1609          * put_page(); and would cause either a VM_BUG directly, or
1610          * __page_cache_release a page that would actually still be referenced
1611          * by someone, leading to some obscure delayed Oops somewhere else. */
1612         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1613                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1614
1615         msg_flags |= MSG_NOSIGNAL;
1616         drbd_update_congested(mdev->tconn);
1617         set_fs(KERNEL_DS);
1618         do {
1619                 int sent;
1620
1621                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1622                 if (sent <= 0) {
1623                         if (sent == -EAGAIN) {
1624                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1625                                         break;
1626                                 continue;
1627                         }
1628                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1629                              __func__, (int)size, len, sent);
1630                         if (sent < 0)
1631                                 err = sent;
1632                         break;
1633                 }
1634                 len    -= sent;
1635                 offset += sent;
1636         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1637         set_fs(oldfs);
1638         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1639
1640         if (len == 0) {
1641                 err = 0;
1642                 mdev->send_cnt += size >> 9;
1643         }
1644         return err;
1645 }
1646
1647 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1648 {
1649         struct bio_vec *bvec;
1650         int i;
1651         /* hint all but last page with MSG_MORE */
1652         __bio_for_each_segment(bvec, bio, i, 0) {
1653                 int err;
1654
1655                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1656                                          bvec->bv_offset, bvec->bv_len,
1657                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1658                 if (err)
1659                         return err;
1660         }
1661         return 0;
1662 }
1663
1664 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1665 {
1666         struct bio_vec *bvec;
1667         int i;
1668         /* hint all but last page with MSG_MORE */
1669         __bio_for_each_segment(bvec, bio, i, 0) {
1670                 int err;
1671
1672                 err = _drbd_send_page(mdev, bvec->bv_page,
1673                                       bvec->bv_offset, bvec->bv_len,
1674                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1675                 if (err)
1676                         return err;
1677         }
1678         return 0;
1679 }
1680
1681 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1682                             struct drbd_peer_request *peer_req)
1683 {
1684         struct page *page = peer_req->pages;
1685         unsigned len = peer_req->i.size;
1686         int err;
1687
1688         /* hint all but last page with MSG_MORE */
1689         page_chain_for_each(page) {
1690                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1691
1692                 err = _drbd_send_page(mdev, page, 0, l,
1693                                       page_chain_next(page) ? MSG_MORE : 0);
1694                 if (err)
1695                         return err;
1696                 len -= l;
1697         }
1698         return 0;
1699 }
1700
1701 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1702 {
1703         if (mdev->tconn->agreed_pro_version >= 95)
1704                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1705                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1706                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1707                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1708         else
1709                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1710 }
1711
1712 /* Used to send write requests
1713  * R_PRIMARY -> Peer    (P_DATA)
1714  */
1715 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1716 {
1717         struct drbd_socket *sock;
1718         struct p_data *p;
1719         unsigned int dp_flags = 0;
1720         int dgs;
1721         int err;
1722
1723         sock = &mdev->tconn->data;
1724         p = drbd_prepare_command(mdev, sock);
1725         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1726                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1727
1728         if (!p)
1729                 return -EIO;
1730         p->sector = cpu_to_be64(req->i.sector);
1731         p->block_id = (unsigned long)req;
1732         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1733         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1734         if (mdev->state.conn >= C_SYNC_SOURCE &&
1735             mdev->state.conn <= C_PAUSED_SYNC_T)
1736                 dp_flags |= DP_MAY_SET_IN_SYNC;
1737         if (mdev->tconn->agreed_pro_version >= 100) {
1738                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1739                         dp_flags |= DP_SEND_RECEIVE_ACK;
1740                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1741                         dp_flags |= DP_SEND_WRITE_ACK;
1742         }
1743         p->dp_flags = cpu_to_be32(dp_flags);
1744         if (dgs)
1745                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1746         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1747         if (!err) {
1748                 /* For protocol A, we have to memcpy the payload into
1749                  * socket buffers, as we may complete right away
1750                  * as soon as we handed it over to tcp, at which point the data
1751                  * pages may become invalid.
1752                  *
1753                  * For data-integrity enabled, we copy it as well, so we can be
1754                  * sure that even if the bio pages may still be modified, it
1755                  * won't change the data on the wire, thus if the digest checks
1756                  * out ok after sending on this side, but does not fit on the
1757                  * receiving side, we sure have detected corruption elsewhere.
1758                  */
1759                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1760                         err = _drbd_send_bio(mdev, req->master_bio);
1761                 else
1762                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1763
1764                 /* double check digest, sometimes buffers have been modified in flight. */
1765                 if (dgs > 0 && dgs <= 64) {
1766                         /* 64 byte, 512 bit, is the largest digest size
1767                          * currently supported in kernel crypto. */
1768                         unsigned char digest[64];
1769                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1770                         if (memcmp(p + 1, digest, dgs)) {
1771                                 dev_warn(DEV,
1772                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1773                                         (unsigned long long)req->i.sector, req->i.size);
1774                         }
1775                 } /* else if (dgs > 64) {
1776                      ... Be noisy about digest too large ...
1777                 } */
1778         }
1779         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1780
1781         return err;
1782 }
1783
1784 /* answer packet, used to send data back for read requests:
1785  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1786  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1787  */
1788 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1789                     struct drbd_peer_request *peer_req)
1790 {
1791         struct drbd_socket *sock;
1792         struct p_data *p;
1793         int err;
1794         int dgs;
1795
1796         sock = &mdev->tconn->data;
1797         p = drbd_prepare_command(mdev, sock);
1798
1799         dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_tfm) ?
1800                 crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1801
1802         if (!p)
1803                 return -EIO;
1804         p->sector = cpu_to_be64(peer_req->i.sector);
1805         p->block_id = peer_req->block_id;
1806         p->seq_num = 0;  /* unused */
1807         if (dgs)
1808                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1809         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1810         if (!err)
1811                 err = _drbd_send_zc_ee(mdev, peer_req);
1812         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1813
1814         return err;
1815 }
1816
1817 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1818 {
1819         struct drbd_socket *sock;
1820         struct p_block_desc *p;
1821
1822         sock = &mdev->tconn->data;
1823         p = drbd_prepare_command(mdev, sock);
1824         if (!p)
1825                 return -EIO;
1826         p->sector = cpu_to_be64(req->i.sector);
1827         p->blksize = cpu_to_be32(req->i.size);
1828         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1829 }
1830
1831 /*
1832   drbd_send distinguishes two cases:
1833
1834   Packets sent via the data socket "sock"
1835   and packets sent via the meta data socket "msock"
1836
1837                     sock                      msock
1838   -----------------+-------------------------+------------------------------
1839   timeout           conf.timeout / 2          conf.timeout / 2
1840   timeout action    send a ping via msock     Abort communication
1841                                               and close all sockets
1842 */
1843
1844 /*
1845  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1846  */
1847 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1848               void *buf, size_t size, unsigned msg_flags)
1849 {
1850         struct kvec iov;
1851         struct msghdr msg;
1852         int rv, sent = 0;
1853
1854         if (!sock)
1855                 return -EBADR;
1856
1857         /* THINK  if (signal_pending) return ... ? */
1858
1859         iov.iov_base = buf;
1860         iov.iov_len  = size;
1861
1862         msg.msg_name       = NULL;
1863         msg.msg_namelen    = 0;
1864         msg.msg_control    = NULL;
1865         msg.msg_controllen = 0;
1866         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1867
1868         if (sock == tconn->data.socket) {
1869                 rcu_read_lock();
1870                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1871                 rcu_read_unlock();
1872                 drbd_update_congested(tconn);
1873         }
1874         do {
1875                 /* STRANGE
1876                  * tcp_sendmsg does _not_ use its size parameter at all ?
1877                  *
1878                  * -EAGAIN on timeout, -EINTR on signal.
1879                  */
1880 /* THINK
1881  * do we need to block DRBD_SIG if sock == &meta.socket ??
1882  * otherwise wake_asender() might interrupt some send_*Ack !
1883  */
1884                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1885                 if (rv == -EAGAIN) {
1886                         if (we_should_drop_the_connection(tconn, sock))
1887                                 break;
1888                         else
1889                                 continue;
1890                 }
1891                 if (rv == -EINTR) {
1892                         flush_signals(current);
1893                         rv = 0;
1894                 }
1895                 if (rv < 0)
1896                         break;
1897                 sent += rv;
1898                 iov.iov_base += rv;
1899                 iov.iov_len  -= rv;
1900         } while (sent < size);
1901
1902         if (sock == tconn->data.socket)
1903                 clear_bit(NET_CONGESTED, &tconn->flags);
1904
1905         if (rv <= 0) {
1906                 if (rv != -EAGAIN) {
1907                         conn_err(tconn, "%s_sendmsg returned %d\n",
1908                                  sock == tconn->meta.socket ? "msock" : "sock",
1909                                  rv);
1910                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1911                 } else
1912                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1913         }
1914
1915         return sent;
1916 }
1917
1918 /**
1919  * drbd_send_all  -  Send an entire buffer
1920  *
1921  * Returns 0 upon success and a negative error value otherwise.
1922  */
1923 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1924                   size_t size, unsigned msg_flags)
1925 {
1926         int err;
1927
1928         err = drbd_send(tconn, sock, buffer, size, msg_flags);
1929         if (err < 0)
1930                 return err;
1931         if (err != size)
1932                 return -EIO;
1933         return 0;
1934 }
1935
1936 static int drbd_open(struct block_device *bdev, fmode_t mode)
1937 {
1938         struct drbd_conf *mdev = bdev->bd_disk->private_data;
1939         unsigned long flags;
1940         int rv = 0;
1941
1942         mutex_lock(&drbd_main_mutex);
1943         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1944         /* to have a stable mdev->state.role
1945          * and no race with updating open_cnt */
1946
1947         if (mdev->state.role != R_PRIMARY) {
1948                 if (mode & FMODE_WRITE)
1949                         rv = -EROFS;
1950                 else if (!allow_oos)
1951                         rv = -EMEDIUMTYPE;
1952         }
1953
1954         if (!rv)
1955                 mdev->open_cnt++;
1956         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1957         mutex_unlock(&drbd_main_mutex);
1958
1959         return rv;
1960 }
1961
1962 static int drbd_release(struct gendisk *gd, fmode_t mode)
1963 {
1964         struct drbd_conf *mdev = gd->private_data;
1965         mutex_lock(&drbd_main_mutex);
1966         mdev->open_cnt--;
1967         mutex_unlock(&drbd_main_mutex);
1968         return 0;
1969 }
1970
1971 static void drbd_set_defaults(struct drbd_conf *mdev)
1972 {
1973         /* Beware! The actual layout differs
1974          * between big endian and little endian */
1975         mdev->state = (union drbd_dev_state) {
1976                 { .role = R_SECONDARY,
1977                   .peer = R_UNKNOWN,
1978                   .conn = C_STANDALONE,
1979                   .disk = D_DISKLESS,
1980                   .pdsk = D_UNKNOWN,
1981                 } };
1982 }
1983
1984 void drbd_init_set_defaults(struct drbd_conf *mdev)
1985 {
1986         /* the memset(,0,) did most of this.
1987          * note: only assignments, no allocation in here */
1988
1989         drbd_set_defaults(mdev);
1990
1991         atomic_set(&mdev->ap_bio_cnt, 0);
1992         atomic_set(&mdev->ap_pending_cnt, 0);
1993         atomic_set(&mdev->rs_pending_cnt, 0);
1994         atomic_set(&mdev->unacked_cnt, 0);
1995         atomic_set(&mdev->local_cnt, 0);
1996         atomic_set(&mdev->pp_in_use_by_net, 0);
1997         atomic_set(&mdev->rs_sect_in, 0);
1998         atomic_set(&mdev->rs_sect_ev, 0);
1999         atomic_set(&mdev->ap_in_flight, 0);
2000
2001         mutex_init(&mdev->md_io_mutex);
2002         mutex_init(&mdev->own_state_mutex);
2003         mdev->state_mutex = &mdev->own_state_mutex;
2004
2005         spin_lock_init(&mdev->al_lock);
2006         spin_lock_init(&mdev->peer_seq_lock);
2007         spin_lock_init(&mdev->epoch_lock);
2008
2009         INIT_LIST_HEAD(&mdev->active_ee);
2010         INIT_LIST_HEAD(&mdev->sync_ee);
2011         INIT_LIST_HEAD(&mdev->done_ee);
2012         INIT_LIST_HEAD(&mdev->read_ee);
2013         INIT_LIST_HEAD(&mdev->net_ee);
2014         INIT_LIST_HEAD(&mdev->resync_reads);
2015         INIT_LIST_HEAD(&mdev->resync_work.list);
2016         INIT_LIST_HEAD(&mdev->unplug_work.list);
2017         INIT_LIST_HEAD(&mdev->go_diskless.list);
2018         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2019         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2020         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2021
2022         mdev->resync_work.cb  = w_resync_timer;
2023         mdev->unplug_work.cb  = w_send_write_hint;
2024         mdev->go_diskless.cb  = w_go_diskless;
2025         mdev->md_sync_work.cb = w_md_sync;
2026         mdev->bm_io_work.w.cb = w_bitmap_io;
2027         mdev->start_resync_work.cb = w_start_resync;
2028
2029         mdev->resync_work.mdev  = mdev;
2030         mdev->unplug_work.mdev  = mdev;
2031         mdev->go_diskless.mdev  = mdev;
2032         mdev->md_sync_work.mdev = mdev;
2033         mdev->bm_io_work.w.mdev = mdev;
2034         mdev->start_resync_work.mdev = mdev;
2035
2036         init_timer(&mdev->resync_timer);
2037         init_timer(&mdev->md_sync_timer);
2038         init_timer(&mdev->start_resync_timer);
2039         init_timer(&mdev->request_timer);
2040         mdev->resync_timer.function = resync_timer_fn;
2041         mdev->resync_timer.data = (unsigned long) mdev;
2042         mdev->md_sync_timer.function = md_sync_timer_fn;
2043         mdev->md_sync_timer.data = (unsigned long) mdev;
2044         mdev->start_resync_timer.function = start_resync_timer_fn;
2045         mdev->start_resync_timer.data = (unsigned long) mdev;
2046         mdev->request_timer.function = request_timer_fn;
2047         mdev->request_timer.data = (unsigned long) mdev;
2048
2049         init_waitqueue_head(&mdev->misc_wait);
2050         init_waitqueue_head(&mdev->state_wait);
2051         init_waitqueue_head(&mdev->ee_wait);
2052         init_waitqueue_head(&mdev->al_wait);
2053         init_waitqueue_head(&mdev->seq_wait);
2054
2055         mdev->write_ordering = WO_bdev_flush;
2056         mdev->resync_wenr = LC_FREE;
2057         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2058         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2059 }
2060
2061 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2062 {
2063         int i;
2064         if (mdev->tconn->receiver.t_state != NONE)
2065                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2066                                 mdev->tconn->receiver.t_state);
2067
2068         /* no need to lock it, I'm the only thread alive */
2069         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2070                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2071         mdev->al_writ_cnt  =
2072         mdev->bm_writ_cnt  =
2073         mdev->read_cnt     =
2074         mdev->recv_cnt     =
2075         mdev->send_cnt     =
2076         mdev->writ_cnt     =
2077         mdev->p_size       =
2078         mdev->rs_start     =
2079         mdev->rs_total     =
2080         mdev->rs_failed    = 0;
2081         mdev->rs_last_events = 0;
2082         mdev->rs_last_sect_ev = 0;
2083         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2084                 mdev->rs_mark_left[i] = 0;
2085                 mdev->rs_mark_time[i] = 0;
2086         }
2087         D_ASSERT(mdev->tconn->net_conf == NULL);
2088
2089         drbd_set_my_capacity(mdev, 0);
2090         if (mdev->bitmap) {
2091                 /* maybe never allocated. */
2092                 drbd_bm_resize(mdev, 0, 1);
2093                 drbd_bm_cleanup(mdev);
2094         }
2095
2096         drbd_free_bc(mdev->ldev);
2097         mdev->ldev = NULL;
2098
2099         clear_bit(AL_SUSPENDED, &mdev->flags);
2100
2101         D_ASSERT(list_empty(&mdev->active_ee));
2102         D_ASSERT(list_empty(&mdev->sync_ee));
2103         D_ASSERT(list_empty(&mdev->done_ee));
2104         D_ASSERT(list_empty(&mdev->read_ee));
2105         D_ASSERT(list_empty(&mdev->net_ee));
2106         D_ASSERT(list_empty(&mdev->resync_reads));
2107         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2108         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2109         D_ASSERT(list_empty(&mdev->resync_work.list));
2110         D_ASSERT(list_empty(&mdev->unplug_work.list));
2111         D_ASSERT(list_empty(&mdev->go_diskless.list));
2112
2113         drbd_set_defaults(mdev);
2114 }
2115
2116
2117 static void drbd_destroy_mempools(void)
2118 {
2119         struct page *page;
2120
2121         while (drbd_pp_pool) {
2122                 page = drbd_pp_pool;
2123                 drbd_pp_pool = (struct page *)page_private(page);
2124                 __free_page(page);
2125                 drbd_pp_vacant--;
2126         }
2127
2128         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2129
2130         if (drbd_md_io_bio_set)
2131                 bioset_free(drbd_md_io_bio_set);
2132         if (drbd_md_io_page_pool)
2133                 mempool_destroy(drbd_md_io_page_pool);
2134         if (drbd_ee_mempool)
2135                 mempool_destroy(drbd_ee_mempool);
2136         if (drbd_request_mempool)
2137                 mempool_destroy(drbd_request_mempool);
2138         if (drbd_ee_cache)
2139                 kmem_cache_destroy(drbd_ee_cache);
2140         if (drbd_request_cache)
2141                 kmem_cache_destroy(drbd_request_cache);
2142         if (drbd_bm_ext_cache)
2143                 kmem_cache_destroy(drbd_bm_ext_cache);
2144         if (drbd_al_ext_cache)
2145                 kmem_cache_destroy(drbd_al_ext_cache);
2146
2147         drbd_md_io_bio_set   = NULL;
2148         drbd_md_io_page_pool = NULL;
2149         drbd_ee_mempool      = NULL;
2150         drbd_request_mempool = NULL;
2151         drbd_ee_cache        = NULL;
2152         drbd_request_cache   = NULL;
2153         drbd_bm_ext_cache    = NULL;
2154         drbd_al_ext_cache    = NULL;
2155
2156         return;
2157 }
2158
2159 static int drbd_create_mempools(void)
2160 {
2161         struct page *page;
2162         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2163         int i;
2164
2165         /* prepare our caches and mempools */
2166         drbd_request_mempool = NULL;
2167         drbd_ee_cache        = NULL;
2168         drbd_request_cache   = NULL;
2169         drbd_bm_ext_cache    = NULL;
2170         drbd_al_ext_cache    = NULL;
2171         drbd_pp_pool         = NULL;
2172         drbd_md_io_page_pool = NULL;
2173         drbd_md_io_bio_set   = NULL;
2174
2175         /* caches */
2176         drbd_request_cache = kmem_cache_create(
2177                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2178         if (drbd_request_cache == NULL)
2179                 goto Enomem;
2180
2181         drbd_ee_cache = kmem_cache_create(
2182                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2183         if (drbd_ee_cache == NULL)
2184                 goto Enomem;
2185
2186         drbd_bm_ext_cache = kmem_cache_create(
2187                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2188         if (drbd_bm_ext_cache == NULL)
2189                 goto Enomem;
2190
2191         drbd_al_ext_cache = kmem_cache_create(
2192                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2193         if (drbd_al_ext_cache == NULL)
2194                 goto Enomem;
2195
2196         /* mempools */
2197         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2198         if (drbd_md_io_bio_set == NULL)
2199                 goto Enomem;
2200
2201         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2202         if (drbd_md_io_page_pool == NULL)
2203                 goto Enomem;
2204
2205         drbd_request_mempool = mempool_create(number,
2206                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2207         if (drbd_request_mempool == NULL)
2208                 goto Enomem;
2209
2210         drbd_ee_mempool = mempool_create(number,
2211                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2212         if (drbd_ee_mempool == NULL)
2213                 goto Enomem;
2214
2215         /* drbd's page pool */
2216         spin_lock_init(&drbd_pp_lock);
2217
2218         for (i = 0; i < number; i++) {
2219                 page = alloc_page(GFP_HIGHUSER);
2220                 if (!page)
2221                         goto Enomem;
2222                 set_page_private(page, (unsigned long)drbd_pp_pool);
2223                 drbd_pp_pool = page;
2224         }
2225         drbd_pp_vacant = number;
2226
2227         return 0;
2228
2229 Enomem:
2230         drbd_destroy_mempools(); /* in case we allocated some */
2231         return -ENOMEM;
2232 }
2233
2234 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2235         void *unused)
2236 {
2237         /* just so we have it.  you never know what interesting things we
2238          * might want to do here some day...
2239          */
2240
2241         return NOTIFY_DONE;
2242 }
2243
2244 static struct notifier_block drbd_notifier = {
2245         .notifier_call = drbd_notify_sys,
2246 };
2247
2248 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2249 {
2250         int rr;
2251
2252         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2253         if (rr)
2254                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2255
2256         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2257         if (rr)
2258                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2259
2260         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2261         if (rr)
2262                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2263
2264         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2265         if (rr)
2266                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2267
2268         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2269         if (rr)
2270                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2271 }
2272
2273 /* caution. no locking. */
2274 void drbd_minor_destroy(struct kref *kref)
2275 {
2276         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2277         struct drbd_tconn *tconn = mdev->tconn;
2278
2279         /* paranoia asserts */
2280         D_ASSERT(mdev->open_cnt == 0);
2281         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2282         /* end paranoia asserts */
2283
2284         /* cleanup stuff that may have been allocated during
2285          * device (re-)configuration or state changes */
2286
2287         if (mdev->this_bdev)
2288                 bdput(mdev->this_bdev);
2289
2290         drbd_free_bc(mdev->ldev);
2291         mdev->ldev = NULL;
2292
2293         drbd_release_all_peer_reqs(mdev);
2294
2295         lc_destroy(mdev->act_log);
2296         lc_destroy(mdev->resync);
2297
2298         kfree(mdev->p_uuid);
2299         /* mdev->p_uuid = NULL; */
2300
2301         kfree(mdev->current_epoch);
2302         if (mdev->bitmap) /* should no longer be there. */
2303                 drbd_bm_cleanup(mdev);
2304         __free_page(mdev->md_io_page);
2305         put_disk(mdev->vdisk);
2306         blk_cleanup_queue(mdev->rq_queue);
2307         kfree(mdev->rs_plan_s);
2308         kfree(mdev);
2309
2310         kref_put(&tconn->kref, &conn_destroy);
2311 }
2312
2313 static void drbd_cleanup(void)
2314 {
2315         unsigned int i;
2316         struct drbd_conf *mdev;
2317         struct drbd_tconn *tconn, *tmp;
2318
2319         unregister_reboot_notifier(&drbd_notifier);
2320
2321         /* first remove proc,
2322          * drbdsetup uses it's presence to detect
2323          * whether DRBD is loaded.
2324          * If we would get stuck in proc removal,
2325          * but have netlink already deregistered,
2326          * some drbdsetup commands may wait forever
2327          * for an answer.
2328          */
2329         if (drbd_proc)
2330                 remove_proc_entry("drbd", NULL);
2331
2332         drbd_genl_unregister();
2333
2334         idr_for_each_entry(&minors, mdev, i) {
2335                 idr_remove(&minors, mdev_to_minor(mdev));
2336                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2337                 del_gendisk(mdev->vdisk);
2338                 /* synchronize_rcu(); No other threads running at this point */
2339                 kref_put(&mdev->kref, &drbd_minor_destroy);
2340         }
2341
2342         /* not _rcu since, no other updater anymore. Genl already unregistered */
2343         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2344                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2345                 /* synchronize_rcu(); */
2346                 kref_put(&tconn->kref, &conn_destroy);
2347         }
2348
2349         drbd_destroy_mempools();
2350         unregister_blkdev(DRBD_MAJOR, "drbd");
2351
2352         idr_destroy(&minors);
2353
2354         printk(KERN_INFO "drbd: module cleanup done.\n");
2355 }
2356
2357 /**
2358  * drbd_congested() - Callback for pdflush
2359  * @congested_data:     User data
2360  * @bdi_bits:           Bits pdflush is currently interested in
2361  *
2362  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2363  */
2364 static int drbd_congested(void *congested_data, int bdi_bits)
2365 {
2366         struct drbd_conf *mdev = congested_data;
2367         struct request_queue *q;
2368         char reason = '-';
2369         int r = 0;
2370
2371         if (!may_inc_ap_bio(mdev)) {
2372                 /* DRBD has frozen IO */
2373                 r = bdi_bits;
2374                 reason = 'd';
2375                 goto out;
2376         }
2377
2378         if (get_ldev(mdev)) {
2379                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2380                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2381                 put_ldev(mdev);
2382                 if (r)
2383                         reason = 'b';
2384         }
2385
2386         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2387                 r |= (1 << BDI_async_congested);
2388                 reason = reason == 'b' ? 'a' : 'n';
2389         }
2390
2391 out:
2392         mdev->congestion_reason = reason;
2393         return r;
2394 }
2395
2396 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2397 {
2398         sema_init(&wq->s, 0);
2399         spin_lock_init(&wq->q_lock);
2400         INIT_LIST_HEAD(&wq->q);
2401 }
2402
2403 struct drbd_tconn *conn_get_by_name(const char *name)
2404 {
2405         struct drbd_tconn *tconn;
2406
2407         if (!name || !name[0])
2408                 return NULL;
2409
2410         rcu_read_lock();
2411         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2412                 if (!strcmp(tconn->name, name)) {
2413                         kref_get(&tconn->kref);
2414                         goto found;
2415                 }
2416         }
2417         tconn = NULL;
2418 found:
2419         rcu_read_unlock();
2420         return tconn;
2421 }
2422
2423 static int drbd_alloc_socket(struct drbd_socket *socket)
2424 {
2425         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2426         if (!socket->rbuf)
2427                 return -ENOMEM;
2428         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2429         if (!socket->sbuf)
2430                 return -ENOMEM;
2431         return 0;
2432 }
2433
2434 static void drbd_free_socket(struct drbd_socket *socket)
2435 {
2436         free_page((unsigned long) socket->sbuf);
2437         free_page((unsigned long) socket->rbuf);
2438 }
2439
2440 void conn_free_crypto(struct drbd_tconn *tconn)
2441 {
2442         drbd_free_sock(tconn);
2443
2444         crypto_free_hash(tconn->csums_tfm);
2445         crypto_free_hash(tconn->verify_tfm);
2446         crypto_free_hash(tconn->cram_hmac_tfm);
2447         crypto_free_hash(tconn->integrity_tfm);
2448         crypto_free_hash(tconn->peer_integrity_tfm);
2449         kfree(tconn->int_dig_in);
2450         kfree(tconn->int_dig_vv);
2451
2452         tconn->csums_tfm = NULL;
2453         tconn->verify_tfm = NULL;
2454         tconn->cram_hmac_tfm = NULL;
2455         tconn->integrity_tfm = NULL;
2456         tconn->peer_integrity_tfm = NULL;
2457         tconn->int_dig_in = NULL;
2458         tconn->int_dig_vv = NULL;
2459 }
2460
2461 /* caller must be under genl_lock() */
2462 struct drbd_tconn *conn_create(const char *name)
2463 {
2464         struct drbd_tconn *tconn;
2465
2466         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2467         if (!tconn)
2468                 return NULL;
2469
2470         tconn->name = kstrdup(name, GFP_KERNEL);
2471         if (!tconn->name)
2472                 goto fail;
2473
2474         if (drbd_alloc_socket(&tconn->data))
2475                 goto fail;
2476         if (drbd_alloc_socket(&tconn->meta))
2477                 goto fail;
2478
2479         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2480                 goto fail;
2481
2482         if (!tl_init(tconn))
2483                 goto fail;
2484
2485         tconn->cstate = C_STANDALONE;
2486         mutex_init(&tconn->cstate_mutex);
2487         spin_lock_init(&tconn->req_lock);
2488         mutex_init(&tconn->conf_update);
2489         init_waitqueue_head(&tconn->ping_wait);
2490         idr_init(&tconn->volumes);
2491
2492         drbd_init_workqueue(&tconn->data.work);
2493         mutex_init(&tconn->data.mutex);
2494
2495         drbd_init_workqueue(&tconn->meta.work);
2496         mutex_init(&tconn->meta.mutex);
2497
2498         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2499         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2500         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2501
2502         drbd_set_res_opts_defaults(&tconn->res_opts);
2503
2504         kref_init(&tconn->kref);
2505         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2506
2507         return tconn;
2508
2509 fail:
2510         tl_cleanup(tconn);
2511         free_cpumask_var(tconn->cpu_mask);
2512         drbd_free_socket(&tconn->meta);
2513         drbd_free_socket(&tconn->data);
2514         kfree(tconn->name);
2515         kfree(tconn);
2516
2517         return NULL;
2518 }
2519
2520 void conn_destroy(struct kref *kref)
2521 {
2522         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2523
2524         idr_destroy(&tconn->volumes);
2525
2526         free_cpumask_var(tconn->cpu_mask);
2527         drbd_free_socket(&tconn->meta);
2528         drbd_free_socket(&tconn->data);
2529         kfree(tconn->name);
2530         kfree(tconn->int_dig_in);
2531         kfree(tconn->int_dig_vv);
2532         kfree(tconn);
2533 }
2534
2535 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2536 {
2537         struct drbd_conf *mdev;
2538         struct gendisk *disk;
2539         struct request_queue *q;
2540         int vnr_got = vnr;
2541         int minor_got = minor;
2542         enum drbd_ret_code err = ERR_NOMEM;
2543
2544         mdev = minor_to_mdev(minor);
2545         if (mdev)
2546                 return ERR_MINOR_EXISTS;
2547
2548         /* GFP_KERNEL, we are outside of all write-out paths */
2549         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2550         if (!mdev)
2551                 return ERR_NOMEM;
2552
2553         kref_get(&tconn->kref);
2554         mdev->tconn = tconn;
2555
2556         mdev->minor = minor;
2557         mdev->vnr = vnr;
2558
2559         drbd_init_set_defaults(mdev);
2560
2561         q = blk_alloc_queue(GFP_KERNEL);
2562         if (!q)
2563                 goto out_no_q;
2564         mdev->rq_queue = q;
2565         q->queuedata   = mdev;
2566
2567         disk = alloc_disk(1);
2568         if (!disk)
2569                 goto out_no_disk;
2570         mdev->vdisk = disk;
2571
2572         set_disk_ro(disk, true);
2573
2574         disk->queue = q;
2575         disk->major = DRBD_MAJOR;
2576         disk->first_minor = minor;
2577         disk->fops = &drbd_ops;
2578         sprintf(disk->disk_name, "drbd%d", minor);
2579         disk->private_data = mdev;
2580
2581         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2582         /* we have no partitions. we contain only ourselves. */
2583         mdev->this_bdev->bd_contains = mdev->this_bdev;
2584
2585         q->backing_dev_info.congested_fn = drbd_congested;
2586         q->backing_dev_info.congested_data = mdev;
2587
2588         blk_queue_make_request(q, drbd_make_request);
2589         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2590            This triggers a max_bio_size message upon first attach or connect */
2591         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2592         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2593         blk_queue_merge_bvec(q, drbd_merge_bvec);
2594         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2595
2596         mdev->md_io_page = alloc_page(GFP_KERNEL);
2597         if (!mdev->md_io_page)
2598                 goto out_no_io_page;
2599
2600         if (drbd_bm_init(mdev))
2601                 goto out_no_bitmap;
2602         mdev->read_requests = RB_ROOT;
2603         mdev->write_requests = RB_ROOT;
2604
2605         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2606         if (!mdev->current_epoch)
2607                 goto out_no_epoch;
2608
2609         INIT_LIST_HEAD(&mdev->current_epoch->list);
2610         mdev->epochs = 1;
2611
2612         if (!idr_pre_get(&minors, GFP_KERNEL))
2613                 goto out_no_minor_idr;
2614         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2615                 goto out_no_minor_idr;
2616         if (minor_got != minor) {
2617                 err = ERR_MINOR_EXISTS;
2618                 drbd_msg_put_info("requested minor exists already");
2619                 goto out_idr_remove_minor;
2620         }
2621
2622         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2623                 goto out_idr_remove_minor;
2624         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2625                 goto out_idr_remove_minor;
2626         if (vnr_got != vnr) {
2627                 err = ERR_INVALID_REQUEST;
2628                 drbd_msg_put_info("requested volume exists already");
2629                 goto out_idr_remove_vol;
2630         }
2631         add_disk(disk);
2632         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2633
2634         /* inherit the connection state */
2635         mdev->state.conn = tconn->cstate;
2636         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2637                 drbd_connected(mdev);
2638
2639         return NO_ERROR;
2640
2641 out_idr_remove_vol:
2642         idr_remove(&tconn->volumes, vnr_got);
2643 out_idr_remove_minor:
2644         idr_remove(&minors, minor_got);
2645         synchronize_rcu();
2646 out_no_minor_idr:
2647         kfree(mdev->current_epoch);
2648 out_no_epoch:
2649         drbd_bm_cleanup(mdev);
2650 out_no_bitmap:
2651         __free_page(mdev->md_io_page);
2652 out_no_io_page:
2653         put_disk(disk);
2654 out_no_disk:
2655         blk_cleanup_queue(q);
2656 out_no_q:
2657         kfree(mdev);
2658         kref_put(&tconn->kref, &conn_destroy);
2659         return err;
2660 }
2661
2662 int __init drbd_init(void)
2663 {
2664         int err;
2665
2666         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2667                 printk(KERN_ERR
2668                        "drbd: invalid minor_count (%d)\n", minor_count);
2669 #ifdef MODULE
2670                 return -EINVAL;
2671 #else
2672                 minor_count = 8;
2673 #endif
2674         }
2675
2676         err = register_blkdev(DRBD_MAJOR, "drbd");
2677         if (err) {
2678                 printk(KERN_ERR
2679                        "drbd: unable to register block device major %d\n",
2680                        DRBD_MAJOR);
2681                 return err;
2682         }
2683
2684         err = drbd_genl_register();
2685         if (err) {
2686                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2687                 goto fail;
2688         }
2689
2690
2691         register_reboot_notifier(&drbd_notifier);
2692
2693         /*
2694          * allocate all necessary structs
2695          */
2696         err = -ENOMEM;
2697
2698         init_waitqueue_head(&drbd_pp_wait);
2699
2700         drbd_proc = NULL; /* play safe for drbd_cleanup */
2701         idr_init(&minors);
2702
2703         err = drbd_create_mempools();
2704         if (err)
2705                 goto fail;
2706
2707         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2708         if (!drbd_proc) {
2709                 printk(KERN_ERR "drbd: unable to register proc file\n");
2710                 goto fail;
2711         }
2712
2713         rwlock_init(&global_state_lock);
2714         INIT_LIST_HEAD(&drbd_tconns);
2715
2716         printk(KERN_INFO "drbd: initialized. "
2717                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2718                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2719         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2720         printk(KERN_INFO "drbd: registered as block device major %d\n",
2721                 DRBD_MAJOR);
2722
2723         return 0; /* Success! */
2724
2725 fail:
2726         drbd_cleanup();
2727         if (err == -ENOMEM)
2728                 /* currently always the case */
2729                 printk(KERN_ERR "drbd: ran out of memory\n");
2730         else
2731                 printk(KERN_ERR "drbd: initialization failure\n");
2732         return err;
2733 }
2734
2735 void drbd_free_bc(struct drbd_backing_dev *ldev)
2736 {
2737         if (ldev == NULL)
2738                 return;
2739
2740         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2741         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2742
2743         kfree(ldev);
2744 }
2745
2746 void drbd_free_sock(struct drbd_tconn *tconn)
2747 {
2748         if (tconn->data.socket) {
2749                 mutex_lock(&tconn->data.mutex);
2750                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2751                 sock_release(tconn->data.socket);
2752                 tconn->data.socket = NULL;
2753                 mutex_unlock(&tconn->data.mutex);
2754         }
2755         if (tconn->meta.socket) {
2756                 mutex_lock(&tconn->meta.mutex);
2757                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2758                 sock_release(tconn->meta.socket);
2759                 tconn->meta.socket = NULL;
2760                 mutex_unlock(&tconn->meta.mutex);
2761         }
2762 }
2763
2764 /* meta data management */
2765
2766 struct meta_data_on_disk {
2767         u64 la_size;           /* last agreed size. */
2768         u64 uuid[UI_SIZE];   /* UUIDs. */
2769         u64 device_uuid;
2770         u64 reserved_u64_1;
2771         u32 flags;             /* MDF */
2772         u32 magic;
2773         u32 md_size_sect;
2774         u32 al_offset;         /* offset to this block */
2775         u32 al_nr_extents;     /* important for restoring the AL */
2776               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2777         u32 bm_offset;         /* offset to the bitmap, from here */
2778         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2779         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2780         u32 reserved_u32[3];
2781
2782 } __packed;
2783
2784 /**
2785  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2786  * @mdev:       DRBD device.
2787  */
2788 void drbd_md_sync(struct drbd_conf *mdev)
2789 {
2790         struct meta_data_on_disk *buffer;
2791         sector_t sector;
2792         int i;
2793
2794         del_timer(&mdev->md_sync_timer);
2795         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2796         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2797                 return;
2798
2799         /* We use here D_FAILED and not D_ATTACHING because we try to write
2800          * metadata even if we detach due to a disk failure! */
2801         if (!get_ldev_if_state(mdev, D_FAILED))
2802                 return;
2803
2804         mutex_lock(&mdev->md_io_mutex);
2805         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2806         memset(buffer, 0, 512);
2807
2808         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2809         for (i = UI_CURRENT; i < UI_SIZE; i++)
2810                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2811         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2812         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2813
2814         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2815         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2816         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2817         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2818         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2819
2820         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2821         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2822
2823         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2824         sector = mdev->ldev->md.md_offset;
2825
2826         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2827                 /* this was a try anyways ... */
2828                 dev_err(DEV, "meta data update failed!\n");
2829                 drbd_chk_io_error(mdev, 1, true);
2830         }
2831
2832         /* Update mdev->ldev->md.la_size_sect,
2833          * since we updated it on metadata. */
2834         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2835
2836         mutex_unlock(&mdev->md_io_mutex);
2837         put_ldev(mdev);
2838 }
2839
2840 /**
2841  * drbd_md_read() - Reads in the meta data super block
2842  * @mdev:       DRBD device.
2843  * @bdev:       Device from which the meta data should be read in.
2844  *
2845  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2846  * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2847  */
2848 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2849 {
2850         struct meta_data_on_disk *buffer;
2851         int i, rv = NO_ERROR;
2852
2853         if (!get_ldev_if_state(mdev, D_ATTACHING))
2854                 return ERR_IO_MD_DISK;
2855
2856         mutex_lock(&mdev->md_io_mutex);
2857         buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2858
2859         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2860                 /* NOTE: can't do normal error processing here as this is
2861                    called BEFORE disk is attached */
2862                 dev_err(DEV, "Error while reading metadata.\n");
2863                 rv = ERR_IO_MD_DISK;
2864                 goto err;
2865         }
2866
2867         if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2868                 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2869                 rv = ERR_MD_INVALID;
2870                 goto err;
2871         }
2872         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2873                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2874                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2875                 rv = ERR_MD_INVALID;
2876                 goto err;
2877         }
2878         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2879                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2880                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2881                 rv = ERR_MD_INVALID;
2882                 goto err;
2883         }
2884         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2885                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2886                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2887                 rv = ERR_MD_INVALID;
2888                 goto err;
2889         }
2890
2891         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2892                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2893                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2894                 rv = ERR_MD_INVALID;
2895                 goto err;
2896         }
2897
2898         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2899         for (i = UI_CURRENT; i < UI_SIZE; i++)
2900                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2901         bdev->md.flags = be32_to_cpu(buffer->flags);
2902         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2903
2904         spin_lock_irq(&mdev->tconn->req_lock);
2905         if (mdev->state.conn < C_CONNECTED) {
2906                 int peer;
2907                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2908                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2909                 mdev->peer_max_bio_size = peer;
2910         }
2911         spin_unlock_irq(&mdev->tconn->req_lock);
2912
2913         /* This blocks wants to be get removed... */
2914         bdev->disk_conf->al_extents = be32_to_cpu(buffer->al_nr_extents);
2915         if (bdev->disk_conf->al_extents < DRBD_AL_EXTENTS_MIN)
2916                 bdev->disk_conf->al_extents = DRBD_AL_EXTENTS_DEF;
2917
2918  err:
2919         mutex_unlock(&mdev->md_io_mutex);
2920         put_ldev(mdev);
2921
2922         return rv;
2923 }
2924
2925 /**
2926  * drbd_md_mark_dirty() - Mark meta data super block as dirty
2927  * @mdev:       DRBD device.
2928  *
2929  * Call this function if you change anything that should be written to
2930  * the meta-data super block. This function sets MD_DIRTY, and starts a
2931  * timer that ensures that within five seconds you have to call drbd_md_sync().
2932  */
2933 #ifdef DEBUG
2934 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2935 {
2936         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2937                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2938                 mdev->last_md_mark_dirty.line = line;
2939                 mdev->last_md_mark_dirty.func = func;
2940         }
2941 }
2942 #else
2943 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2944 {
2945         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2946                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2947 }
2948 #endif
2949
2950 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2951 {
2952         int i;
2953
2954         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2955                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2956 }
2957
2958 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2959 {
2960         if (idx == UI_CURRENT) {
2961                 if (mdev->state.role == R_PRIMARY)
2962                         val |= 1;
2963                 else
2964                         val &= ~((u64)1);
2965
2966                 drbd_set_ed_uuid(mdev, val);
2967         }
2968
2969         mdev->ldev->md.uuid[idx] = val;
2970         drbd_md_mark_dirty(mdev);
2971 }
2972
2973
2974 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2975 {
2976         if (mdev->ldev->md.uuid[idx]) {
2977                 drbd_uuid_move_history(mdev);
2978                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2979         }
2980         _drbd_uuid_set(mdev, idx, val);
2981 }
2982
2983 /**
2984  * drbd_uuid_new_current() - Creates a new current UUID
2985  * @mdev:       DRBD device.
2986  *
2987  * Creates a new current UUID, and rotates the old current UUID into
2988  * the bitmap slot. Causes an incremental resync upon next connect.
2989  */
2990 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2991 {
2992         u64 val;
2993         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2994
2995         if (bm_uuid)
2996                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2997
2998         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2999
3000         get_random_bytes(&val, sizeof(u64));
3001         _drbd_uuid_set(mdev, UI_CURRENT, val);
3002         drbd_print_uuids(mdev, "new current UUID");
3003         /* get it to stable storage _now_ */
3004         drbd_md_sync(mdev);
3005 }
3006
3007 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3008 {
3009         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3010                 return;
3011
3012         if (val == 0) {
3013                 drbd_uuid_move_history(mdev);
3014                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3015                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3016         } else {
3017                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3018                 if (bm_uuid)
3019                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3020
3021                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3022         }
3023         drbd_md_mark_dirty(mdev);
3024 }
3025
3026 /**
3027  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3028  * @mdev:       DRBD device.
3029  *
3030  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3031  */
3032 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3033 {
3034         int rv = -EIO;
3035
3036         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3037                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3038                 drbd_md_sync(mdev);
3039                 drbd_bm_set_all(mdev);
3040
3041                 rv = drbd_bm_write(mdev);
3042
3043                 if (!rv) {
3044                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3045                         drbd_md_sync(mdev);
3046                 }
3047
3048                 put_ldev(mdev);
3049         }
3050
3051         return rv;
3052 }
3053
3054 /**
3055  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3056  * @mdev:       DRBD device.
3057  *
3058  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3059  */
3060 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3061 {
3062         int rv = -EIO;
3063
3064         drbd_resume_al(mdev);
3065         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3066                 drbd_bm_clear_all(mdev);
3067                 rv = drbd_bm_write(mdev);
3068                 put_ldev(mdev);
3069         }
3070
3071         return rv;
3072 }
3073
3074 static int w_bitmap_io(struct drbd_work *w, int unused)
3075 {
3076         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3077         struct drbd_conf *mdev = w->mdev;
3078         int rv = -EIO;
3079
3080         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3081
3082         if (get_ldev(mdev)) {
3083                 drbd_bm_lock(mdev, work->why, work->flags);
3084                 rv = work->io_fn(mdev);
3085                 drbd_bm_unlock(mdev);
3086                 put_ldev(mdev);
3087         }
3088
3089         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3090         wake_up(&mdev->misc_wait);
3091
3092         if (work->done)
3093                 work->done(mdev, rv);
3094
3095         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3096         work->why = NULL;
3097         work->flags = 0;
3098
3099         return 0;
3100 }
3101
3102 void drbd_ldev_destroy(struct drbd_conf *mdev)
3103 {
3104         lc_destroy(mdev->resync);
3105         mdev->resync = NULL;
3106         lc_destroy(mdev->act_log);
3107         mdev->act_log = NULL;
3108         __no_warn(local,
3109                 drbd_free_bc(mdev->ldev);
3110                 mdev->ldev = NULL;);
3111
3112         clear_bit(GO_DISKLESS, &mdev->flags);
3113 }
3114
3115 static int w_go_diskless(struct drbd_work *w, int unused)
3116 {
3117         struct drbd_conf *mdev = w->mdev;
3118
3119         D_ASSERT(mdev->state.disk == D_FAILED);
3120         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3121          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3122          * the protected members anymore, though, so once put_ldev reaches zero
3123          * again, it will be safe to free them. */
3124         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3125         return 0;
3126 }
3127
3128 void drbd_go_diskless(struct drbd_conf *mdev)
3129 {
3130         D_ASSERT(mdev->state.disk == D_FAILED);
3131         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3132                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3133 }
3134
3135 /**
3136  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3137  * @mdev:       DRBD device.
3138  * @io_fn:      IO callback to be called when bitmap IO is possible
3139  * @done:       callback to be called after the bitmap IO was performed
3140  * @why:        Descriptive text of the reason for doing the IO
3141  *
3142  * While IO on the bitmap happens we freeze application IO thus we ensure
3143  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3144  * called from worker context. It MUST NOT be used while a previous such
3145  * work is still pending!
3146  */
3147 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3148                           int (*io_fn)(struct drbd_conf *),
3149                           void (*done)(struct drbd_conf *, int),
3150                           char *why, enum bm_flag flags)
3151 {
3152         D_ASSERT(current == mdev->tconn->worker.task);
3153
3154         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3155         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3156         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3157         if (mdev->bm_io_work.why)
3158                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3159                         why, mdev->bm_io_work.why);
3160
3161         mdev->bm_io_work.io_fn = io_fn;
3162         mdev->bm_io_work.done = done;
3163         mdev->bm_io_work.why = why;
3164         mdev->bm_io_work.flags = flags;
3165
3166         spin_lock_irq(&mdev->tconn->req_lock);
3167         set_bit(BITMAP_IO, &mdev->flags);
3168         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3169                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3170                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3171         }
3172         spin_unlock_irq(&mdev->tconn->req_lock);
3173 }
3174
3175 /**
3176  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3177  * @mdev:       DRBD device.
3178  * @io_fn:      IO callback to be called when bitmap IO is possible
3179  * @why:        Descriptive text of the reason for doing the IO
3180  *
3181  * freezes application IO while that the actual IO operations runs. This
3182  * functions MAY NOT be called from worker context.
3183  */
3184 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3185                 char *why, enum bm_flag flags)
3186 {
3187         int rv;
3188
3189         D_ASSERT(current != mdev->tconn->worker.task);
3190
3191         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3192                 drbd_suspend_io(mdev);
3193
3194         drbd_bm_lock(mdev, why, flags);
3195         rv = io_fn(mdev);
3196         drbd_bm_unlock(mdev);
3197
3198         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3199                 drbd_resume_io(mdev);
3200
3201         return rv;
3202 }
3203
3204 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3205 {
3206         if ((mdev->ldev->md.flags & flag) != flag) {
3207                 drbd_md_mark_dirty(mdev);
3208                 mdev->ldev->md.flags |= flag;
3209         }
3210 }
3211
3212 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3213 {
3214         if ((mdev->ldev->md.flags & flag) != 0) {
3215                 drbd_md_mark_dirty(mdev);
3216                 mdev->ldev->md.flags &= ~flag;
3217         }
3218 }
3219 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3220 {
3221         return (bdev->md.flags & flag) != 0;
3222 }
3223
3224 static void md_sync_timer_fn(unsigned long data)
3225 {
3226         struct drbd_conf *mdev = (struct drbd_conf *) data;
3227
3228         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3229 }
3230
3231 static int w_md_sync(struct drbd_work *w, int unused)
3232 {
3233         struct drbd_conf *mdev = w->mdev;
3234
3235         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3236 #ifdef DEBUG
3237         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3238                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3239 #endif
3240         drbd_md_sync(mdev);
3241         return 0;
3242 }
3243
3244 const char *cmdname(enum drbd_packet cmd)
3245 {
3246         /* THINK may need to become several global tables
3247          * when we want to support more than
3248          * one PRO_VERSION */
3249         static const char *cmdnames[] = {
3250                 [P_DATA]                = "Data",
3251                 [P_DATA_REPLY]          = "DataReply",
3252                 [P_RS_DATA_REPLY]       = "RSDataReply",
3253                 [P_BARRIER]             = "Barrier",
3254                 [P_BITMAP]              = "ReportBitMap",
3255                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3256                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3257                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3258                 [P_DATA_REQUEST]        = "DataRequest",
3259                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3260                 [P_SYNC_PARAM]          = "SyncParam",
3261                 [P_SYNC_PARAM89]        = "SyncParam89",
3262                 [P_PROTOCOL]            = "ReportProtocol",
3263                 [P_UUIDS]               = "ReportUUIDs",
3264                 [P_SIZES]               = "ReportSizes",
3265                 [P_STATE]               = "ReportState",
3266                 [P_SYNC_UUID]           = "ReportSyncUUID",
3267                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3268                 [P_AUTH_RESPONSE]       = "AuthResponse",
3269                 [P_PING]                = "Ping",
3270                 [P_PING_ACK]            = "PingAck",
3271                 [P_RECV_ACK]            = "RecvAck",
3272                 [P_WRITE_ACK]           = "WriteAck",
3273                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3274                 [P_DISCARD_WRITE]        = "DiscardWrite",
3275                 [P_NEG_ACK]             = "NegAck",
3276                 [P_NEG_DREPLY]          = "NegDReply",
3277                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3278                 [P_BARRIER_ACK]         = "BarrierAck",
3279                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3280                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3281                 [P_OV_REQUEST]          = "OVRequest",
3282                 [P_OV_REPLY]            = "OVReply",
3283                 [P_OV_RESULT]           = "OVResult",
3284                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3285                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3286                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3287                 [P_DELAY_PROBE]         = "DelayProbe",
3288                 [P_OUT_OF_SYNC]         = "OutOfSync",
3289                 [P_RETRY_WRITE]         = "RetryWrite",
3290                 [P_RS_CANCEL]           = "RSCancel",
3291                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3292                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3293                 [P_RETRY_WRITE]         = "retry_write",
3294                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3295
3296                 /* enum drbd_packet, but not commands - obsoleted flags:
3297                  *      P_MAY_IGNORE
3298                  *      P_MAX_OPT_CMD
3299                  */
3300         };
3301
3302         /* too big for the array: 0xfffX */
3303         if (cmd == P_INITIAL_META)
3304                 return "InitialMeta";
3305         if (cmd == P_INITIAL_DATA)
3306                 return "InitialData";
3307         if (cmd == P_CONNECTION_FEATURES)
3308                 return "ConnectionFeatures";
3309         if (cmd >= ARRAY_SIZE(cmdnames))
3310                 return "Unknown";
3311         return cmdnames[cmd];
3312 }
3313
3314 /**
3315  * drbd_wait_misc  -  wait for a request to make progress
3316  * @mdev:       device associated with the request
3317  * @i:          the struct drbd_interval embedded in struct drbd_request or
3318  *              struct drbd_peer_request
3319  */
3320 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3321 {
3322         struct net_conf *nc;
3323         DEFINE_WAIT(wait);
3324         long timeout;
3325
3326         rcu_read_lock();
3327         nc = rcu_dereference(mdev->tconn->net_conf);
3328         if (!nc) {
3329                 rcu_read_unlock();
3330                 return -ETIMEDOUT;
3331         }
3332         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3333         rcu_read_unlock();
3334
3335         /* Indicate to wake up mdev->misc_wait on progress.  */
3336         i->waiting = true;
3337         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3338         spin_unlock_irq(&mdev->tconn->req_lock);
3339         timeout = schedule_timeout(timeout);
3340         finish_wait(&mdev->misc_wait, &wait);
3341         spin_lock_irq(&mdev->tconn->req_lock);
3342         if (!timeout || mdev->state.conn < C_CONNECTED)
3343                 return -ETIMEDOUT;
3344         if (signal_pending(current))
3345                 return -ERESTARTSYS;
3346         return 0;
3347 }
3348
3349 #ifdef CONFIG_DRBD_FAULT_INJECTION
3350 /* Fault insertion support including random number generator shamelessly
3351  * stolen from kernel/rcutorture.c */
3352 struct fault_random_state {
3353         unsigned long state;
3354         unsigned long count;
3355 };
3356
3357 #define FAULT_RANDOM_MULT 39916801  /* prime */
3358 #define FAULT_RANDOM_ADD        479001701 /* prime */
3359 #define FAULT_RANDOM_REFRESH 10000
3360
3361 /*
3362  * Crude but fast random-number generator.  Uses a linear congruential
3363  * generator, with occasional help from get_random_bytes().
3364  */
3365 static unsigned long
3366 _drbd_fault_random(struct fault_random_state *rsp)
3367 {
3368         long refresh;
3369
3370         if (!rsp->count--) {
3371                 get_random_bytes(&refresh, sizeof(refresh));
3372                 rsp->state += refresh;
3373                 rsp->count = FAULT_RANDOM_REFRESH;
3374         }
3375         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3376         return swahw32(rsp->state);
3377 }
3378
3379 static char *
3380 _drbd_fault_str(unsigned int type) {
3381         static char *_faults[] = {
3382                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3383                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3384                 [DRBD_FAULT_RS_WR] = "Resync write",
3385                 [DRBD_FAULT_RS_RD] = "Resync read",
3386                 [DRBD_FAULT_DT_WR] = "Data write",
3387                 [DRBD_FAULT_DT_RD] = "Data read",
3388                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3389                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3390                 [DRBD_FAULT_AL_EE] = "EE allocation",
3391                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3392         };
3393
3394         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3395 }
3396
3397 unsigned int
3398 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3399 {
3400         static struct fault_random_state rrs = {0, 0};
3401
3402         unsigned int ret = (
3403                 (fault_devs == 0 ||
3404                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3405                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3406
3407         if (ret) {
3408                 fault_count++;
3409
3410                 if (__ratelimit(&drbd_ratelimit_state))
3411                         dev_warn(DEV, "***Simulating %s failure\n",
3412                                 _drbd_fault_str(type));
3413         }
3414
3415         return ret;
3416 }
3417 #endif
3418
3419 const char *drbd_buildtag(void)
3420 {
3421         /* DRBD built from external sources has here a reference to the
3422            git hash of the source code. */
3423
3424         static char buildtag[38] = "\0uilt-in";
3425
3426         if (buildtag[0] == 0) {
3427 #ifdef CONFIG_MODULES
3428                 if (THIS_MODULE != NULL)
3429                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3430                 else
3431 #endif
3432                         buildtag[0] = 'b';
3433         }
3434
3435         return buildtag;
3436 }
3437
3438 module_init(drbd_init)
3439 module_exit(drbd_cleanup)
3440
3441 EXPORT_SYMBOL(drbd_conn_str);
3442 EXPORT_SYMBOL(drbd_role_str);
3443 EXPORT_SYMBOL(drbd_disk_str);
3444 EXPORT_SYMBOL(drbd_set_st_err_str);