4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
46 * drbd_md_io_complete (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * bm_async_io_complete (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
59 /* About the global_state_lock
60 Each state transition on an device holds a read lock. In case we have
61 to evaluate the resync after dependencies, we grab a write lock, because
62 we need stable states on all devices for that. */
63 rwlock_t global_state_lock;
65 /* used for synchronous meta data and bitmap IO
66 * submitted by drbd_md_sync_page_io()
68 void drbd_md_io_complete(struct bio *bio, int error)
70 struct drbd_device *device;
72 device = bio->bi_private;
73 device->md_io.error = error;
75 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76 * to timeout on the lower level device, and eventually detach from it.
77 * If this io completion runs after that timeout expired, this
78 * drbd_md_put_buffer() may allow us to finally try and re-attach.
79 * During normal operation, this only puts that extra reference
81 * Make sure we first drop the reference, and only then signal
82 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83 * next drbd_md_sync_page_io(), that we trigger the
84 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
86 drbd_md_put_buffer(device);
87 device->md_io.done = 1;
88 wake_up(&device->misc_wait);
90 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94 /* reads on behalf of the partner,
95 * "submitted" by the receiver
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
99 unsigned long flags = 0;
100 struct drbd_peer_device *peer_device = peer_req->peer_device;
101 struct drbd_device *device = peer_device->device;
103 spin_lock_irqsave(&device->resource->req_lock, flags);
104 device->read_cnt += peer_req->i.size >> 9;
105 list_del(&peer_req->w.list);
106 if (list_empty(&device->read_ee))
107 wake_up(&device->ee_wait);
108 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110 spin_unlock_irqrestore(&device->resource->req_lock, flags);
112 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
116 /* writes on behalf of the partner, or resync writes,
117 * "submitted" by the receiver, final stage. */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
120 unsigned long flags = 0;
121 struct drbd_peer_device *peer_device = peer_req->peer_device;
122 struct drbd_device *device = peer_device->device;
123 struct drbd_interval i;
126 int do_al_complete_io;
128 /* after we moved peer_req to done_ee,
129 * we may no longer access it,
130 * it may be freed/reused already!
131 * (as soon as we release the req_lock) */
133 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134 block_id = peer_req->block_id;
136 spin_lock_irqsave(&device->resource->req_lock, flags);
137 device->writ_cnt += peer_req->i.size >> 9;
138 list_move_tail(&peer_req->w.list, &device->done_ee);
141 * Do not remove from the write_requests tree here: we did not send the
142 * Ack yet and did not wake possibly waiting conflicting requests.
143 * Removed from the tree from "drbd_process_done_ee" within the
144 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
145 * _drbd_clear_done_ee.
148 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150 /* FIXME do we want to detach for failed REQ_DISCARD?
151 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
152 if (peer_req->flags & EE_WAS_ERROR)
153 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
154 spin_unlock_irqrestore(&device->resource->req_lock, flags);
156 if (block_id == ID_SYNCER)
157 drbd_rs_complete_io(device, i.sector);
160 wake_up(&device->ee_wait);
162 if (do_al_complete_io)
163 drbd_al_complete_io(device, &i);
165 wake_asender(peer_device->connection);
169 /* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
172 void drbd_peer_request_endio(struct bio *bio, int error)
174 struct drbd_peer_request *peer_req = bio->bi_private;
175 struct drbd_device *device = peer_req->peer_device->device;
176 int uptodate = bio_flagged(bio, BIO_UPTODATE);
177 int is_write = bio_data_dir(bio) == WRITE;
178 int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180 if (error && __ratelimit(&drbd_ratelimit_state))
181 drbd_warn(device, "%s: error=%d s=%llus\n",
182 is_write ? (is_discard ? "discard" : "write")
184 (unsigned long long)peer_req->i.sector);
185 if (!error && !uptodate) {
186 if (__ratelimit(&drbd_ratelimit_state))
187 drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
188 is_write ? "write" : "read",
189 (unsigned long long)peer_req->i.sector);
190 /* strange behavior of some lower level drivers...
191 * fail the request by clearing the uptodate flag,
192 * but do not return any error?! */
197 set_bit(__EE_WAS_ERROR, &peer_req->flags);
199 bio_put(bio); /* no need for the bio anymore */
200 if (atomic_dec_and_test(&peer_req->pending_bios)) {
202 drbd_endio_write_sec_final(peer_req);
204 drbd_endio_read_sec_final(peer_req);
208 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
210 void drbd_request_endio(struct bio *bio, int error)
213 struct drbd_request *req = bio->bi_private;
214 struct drbd_device *device = req->device;
215 struct bio_and_error m;
216 enum drbd_req_event what;
217 int uptodate = bio_flagged(bio, BIO_UPTODATE);
219 if (!error && !uptodate) {
220 drbd_warn(device, "p %s: setting error to -EIO\n",
221 bio_data_dir(bio) == WRITE ? "write" : "read");
222 /* strange behavior of some lower level drivers...
223 * fail the request by clearing the uptodate flag,
224 * but do not return any error?! */
229 /* If this request was aborted locally before,
230 * but now was completed "successfully",
231 * chances are that this caused arbitrary data corruption.
233 * "aborting" requests, or force-detaching the disk, is intended for
234 * completely blocked/hung local backing devices which do no longer
235 * complete requests at all, not even do error completions. In this
236 * situation, usually a hard-reset and failover is the only way out.
238 * By "aborting", basically faking a local error-completion,
239 * we allow for a more graceful swichover by cleanly migrating services.
240 * Still the affected node has to be rebooted "soon".
242 * By completing these requests, we allow the upper layers to re-use
243 * the associated data pages.
245 * If later the local backing device "recovers", and now DMAs some data
246 * from disk into the original request pages, in the best case it will
247 * just put random data into unused pages; but typically it will corrupt
248 * meanwhile completely unrelated data, causing all sorts of damage.
250 * Which means delayed successful completion,
251 * especially for READ requests,
252 * is a reason to panic().
254 * We assume that a delayed *error* completion is OK,
255 * though we still will complain noisily about it.
257 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
258 if (__ratelimit(&drbd_ratelimit_state))
259 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
262 panic("possible random memory corruption caused by delayed completion of aborted local request\n");
265 /* to avoid recursion in __req_mod */
266 if (unlikely(error)) {
267 if (bio->bi_rw & REQ_DISCARD)
268 what = (error == -EOPNOTSUPP)
269 ? DISCARD_COMPLETED_NOTSUPP
270 : DISCARD_COMPLETED_WITH_ERROR;
272 what = (bio_data_dir(bio) == WRITE)
273 ? WRITE_COMPLETED_WITH_ERROR
274 : (bio_rw(bio) == READ)
275 ? READ_COMPLETED_WITH_ERROR
276 : READ_AHEAD_COMPLETED_WITH_ERROR;
280 bio_put(req->private_bio);
281 req->private_bio = ERR_PTR(error);
283 /* not req_mod(), we need irqsave here! */
284 spin_lock_irqsave(&device->resource->req_lock, flags);
285 __req_mod(req, what, &m);
286 spin_unlock_irqrestore(&device->resource->req_lock, flags);
290 complete_master_bio(device, &m);
293 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
295 struct hash_desc desc;
296 struct scatterlist sg;
297 struct page *page = peer_req->pages;
304 sg_init_table(&sg, 1);
305 crypto_hash_init(&desc);
307 while ((tmp = page_chain_next(page))) {
308 /* all but the last page will be fully used */
309 sg_set_page(&sg, page, PAGE_SIZE, 0);
310 crypto_hash_update(&desc, &sg, sg.length);
313 /* and now the last, possibly only partially used page */
314 len = peer_req->i.size & (PAGE_SIZE - 1);
315 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
316 crypto_hash_update(&desc, &sg, sg.length);
317 crypto_hash_final(&desc, digest);
320 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
322 struct hash_desc desc;
323 struct scatterlist sg;
325 struct bvec_iter iter;
330 sg_init_table(&sg, 1);
331 crypto_hash_init(&desc);
333 bio_for_each_segment(bvec, bio, iter) {
334 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
335 crypto_hash_update(&desc, &sg, sg.length);
337 crypto_hash_final(&desc, digest);
340 /* MAYBE merge common code with w_e_end_ov_req */
341 static int w_e_send_csum(struct drbd_work *w, int cancel)
343 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
344 struct drbd_peer_device *peer_device = peer_req->peer_device;
345 struct drbd_device *device = peer_device->device;
350 if (unlikely(cancel))
353 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
356 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
357 digest = kmalloc(digest_size, GFP_NOIO);
359 sector_t sector = peer_req->i.sector;
360 unsigned int size = peer_req->i.size;
361 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
362 /* Free peer_req and pages before send.
363 * In case we block on congestion, we could otherwise run into
364 * some distributed deadlock, if the other side blocks on
365 * congestion as well, because our receiver blocks in
366 * drbd_alloc_pages due to pp_in_use > max_buffers. */
367 drbd_free_peer_req(device, peer_req);
369 inc_rs_pending(device);
370 err = drbd_send_drequest_csum(peer_device, sector, size,
375 drbd_err(device, "kmalloc() of digest failed.\n");
381 drbd_free_peer_req(device, peer_req);
384 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
388 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
390 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
392 struct drbd_device *device = peer_device->device;
393 struct drbd_peer_request *peer_req;
395 if (!get_ldev(device))
398 if (drbd_rs_should_slow_down(device, sector))
401 /* GFP_TRY, because if there is no memory available right now, this may
402 * be rescheduled for later. It is "only" background resync, after all. */
403 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
404 size, true /* has real payload */, GFP_TRY);
408 peer_req->w.cb = w_e_send_csum;
409 spin_lock_irq(&device->resource->req_lock);
410 list_add_tail(&peer_req->w.list, &device->read_ee);
411 spin_unlock_irq(&device->resource->req_lock);
413 atomic_add(size >> 9, &device->rs_sect_ev);
414 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
417 /* If it failed because of ENOMEM, retry should help. If it failed
418 * because bio_add_page failed (probably broken lower level driver),
419 * retry may or may not help.
420 * If it does not, you may need to force disconnect. */
421 spin_lock_irq(&device->resource->req_lock);
422 list_del(&peer_req->w.list);
423 spin_unlock_irq(&device->resource->req_lock);
425 drbd_free_peer_req(device, peer_req);
431 int w_resync_timer(struct drbd_work *w, int cancel)
433 struct drbd_device *device =
434 container_of(w, struct drbd_device, resync_work);
436 switch (device->state.conn) {
438 make_ov_request(device, cancel);
441 make_resync_request(device, cancel);
448 void resync_timer_fn(unsigned long data)
450 struct drbd_device *device = (struct drbd_device *) data;
452 drbd_queue_work_if_unqueued(
453 &first_peer_device(device)->connection->sender_work,
454 &device->resync_work);
457 static void fifo_set(struct fifo_buffer *fb, int value)
461 for (i = 0; i < fb->size; i++)
462 fb->values[i] = value;
465 static int fifo_push(struct fifo_buffer *fb, int value)
469 ov = fb->values[fb->head_index];
470 fb->values[fb->head_index++] = value;
472 if (fb->head_index >= fb->size)
478 static void fifo_add_val(struct fifo_buffer *fb, int value)
482 for (i = 0; i < fb->size; i++)
483 fb->values[i] += value;
486 struct fifo_buffer *fifo_alloc(int fifo_size)
488 struct fifo_buffer *fb;
490 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
495 fb->size = fifo_size;
501 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
503 struct disk_conf *dc;
504 unsigned int want; /* The number of sectors we want in-flight */
505 int req_sect; /* Number of sectors to request in this turn */
506 int correction; /* Number of sectors more we need in-flight */
507 int cps; /* correction per invocation of drbd_rs_controller() */
508 int steps; /* Number of time steps to plan ahead */
511 struct fifo_buffer *plan;
513 dc = rcu_dereference(device->ldev->disk_conf);
514 plan = rcu_dereference(device->rs_plan_s);
516 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
518 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
519 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
520 } else { /* normal path */
521 want = dc->c_fill_target ? dc->c_fill_target :
522 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
525 correction = want - device->rs_in_flight - plan->total;
528 cps = correction / steps;
529 fifo_add_val(plan, cps);
530 plan->total += cps * steps;
532 /* What we do in this step */
533 curr_corr = fifo_push(plan, 0);
534 plan->total -= curr_corr;
536 req_sect = sect_in + curr_corr;
540 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
541 if (req_sect > max_sect)
545 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
546 sect_in, device->rs_in_flight, want, correction,
547 steps, cps, device->rs_planed, curr_corr, req_sect);
553 static int drbd_rs_number_requests(struct drbd_device *device)
555 unsigned int sect_in; /* Number of sectors that came in since the last turn */
558 sect_in = atomic_xchg(&device->rs_sect_in, 0);
559 device->rs_in_flight -= sect_in;
562 mxb = drbd_get_max_buffers(device) / 2;
563 if (rcu_dereference(device->rs_plan_s)->size) {
564 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
565 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
567 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
568 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
572 /* Don't have more than "max-buffers"/2 in-flight.
573 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
574 * potentially causing a distributed deadlock on congestion during
575 * online-verify or (checksum-based) resync, if max-buffers,
576 * socket buffer sizes and resync rate settings are mis-configured. */
578 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
579 * mxb (as used here, and in drbd_alloc_pages on the peer) is
580 * "number of pages" (typically also 4k),
581 * but "rs_in_flight" is in "sectors" (512 Byte). */
582 if (mxb - device->rs_in_flight/8 < number)
583 number = mxb - device->rs_in_flight/8;
588 static int make_resync_request(struct drbd_device *const device, int cancel)
590 struct drbd_peer_device *const peer_device = first_peer_device(device);
591 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
594 const sector_t capacity = drbd_get_capacity(device->this_bdev);
596 int number, rollback_i, size;
597 int align, requeue = 0;
600 if (unlikely(cancel))
603 if (device->rs_total == 0) {
605 drbd_resync_finished(device);
609 if (!get_ldev(device)) {
610 /* Since we only need to access device->rsync a
611 get_ldev_if_state(device,D_FAILED) would be sufficient, but
612 to continue resync with a broken disk makes no sense at
614 drbd_err(device, "Disk broke down during resync!\n");
618 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
619 number = drbd_rs_number_requests(device);
623 for (i = 0; i < number; i++) {
624 /* Stop generating RS requests when half of the send buffer is filled,
625 * but notify TCP that we'd like to have more space. */
626 mutex_lock(&connection->data.mutex);
627 if (connection->data.socket) {
628 struct sock *sk = connection->data.socket->sk;
629 int queued = sk->sk_wmem_queued;
630 int sndbuf = sk->sk_sndbuf;
631 if (queued > sndbuf / 2) {
634 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
638 mutex_unlock(&connection->data.mutex);
643 size = BM_BLOCK_SIZE;
644 bit = drbd_bm_find_next(device, device->bm_resync_fo);
646 if (bit == DRBD_END_OF_BITMAP) {
647 device->bm_resync_fo = drbd_bm_bits(device);
652 sector = BM_BIT_TO_SECT(bit);
654 if (drbd_rs_should_slow_down(device, sector) ||
655 drbd_try_rs_begin_io(device, sector)) {
656 device->bm_resync_fo = bit;
659 device->bm_resync_fo = bit + 1;
661 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
662 drbd_rs_complete_io(device, sector);
666 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
667 /* try to find some adjacent bits.
668 * we stop if we have already the maximum req size.
670 * Additionally always align bigger requests, in order to
671 * be prepared for all stripe sizes of software RAIDs.
676 if (size + BM_BLOCK_SIZE > max_bio_size)
679 /* Be always aligned */
680 if (sector & ((1<<(align+3))-1))
683 /* do not cross extent boundaries */
684 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
686 /* now, is it actually dirty, after all?
687 * caution, drbd_bm_test_bit is tri-state for some
688 * obscure reason; ( b == 0 ) would get the out-of-band
689 * only accidentally right because of the "oddly sized"
690 * adjustment below */
691 if (drbd_bm_test_bit(device, bit+1) != 1)
694 size += BM_BLOCK_SIZE;
695 if ((BM_BLOCK_SIZE << align) <= size)
699 /* if we merged some,
700 * reset the offset to start the next drbd_bm_find_next from */
701 if (size > BM_BLOCK_SIZE)
702 device->bm_resync_fo = bit + 1;
705 /* adjust very last sectors, in case we are oddly sized */
706 if (sector + (size>>9) > capacity)
707 size = (capacity-sector)<<9;
709 if (device->use_csums) {
710 switch (read_for_csum(peer_device, sector, size)) {
711 case -EIO: /* Disk failure */
714 case -EAGAIN: /* allocation failed, or ldev busy */
715 drbd_rs_complete_io(device, sector);
716 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
728 inc_rs_pending(device);
729 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
730 sector, size, ID_SYNCER);
732 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
733 dec_rs_pending(device);
740 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
741 /* last syncer _request_ was sent,
742 * but the P_RS_DATA_REPLY not yet received. sync will end (and
743 * next sync group will resume), as soon as we receive the last
744 * resync data block, and the last bit is cleared.
745 * until then resync "work" is "inactive" ...
752 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
753 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
758 static int make_ov_request(struct drbd_device *device, int cancel)
762 const sector_t capacity = drbd_get_capacity(device->this_bdev);
763 bool stop_sector_reached = false;
765 if (unlikely(cancel))
768 number = drbd_rs_number_requests(device);
770 sector = device->ov_position;
771 for (i = 0; i < number; i++) {
772 if (sector >= capacity)
775 /* We check for "finished" only in the reply path:
776 * w_e_end_ov_reply().
777 * We need to send at least one request out. */
778 stop_sector_reached = i > 0
779 && verify_can_do_stop_sector(device)
780 && sector >= device->ov_stop_sector;
781 if (stop_sector_reached)
784 size = BM_BLOCK_SIZE;
786 if (drbd_rs_should_slow_down(device, sector) ||
787 drbd_try_rs_begin_io(device, sector)) {
788 device->ov_position = sector;
792 if (sector + (size>>9) > capacity)
793 size = (capacity-sector)<<9;
795 inc_rs_pending(device);
796 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
797 dec_rs_pending(device);
800 sector += BM_SECT_PER_BIT;
802 device->ov_position = sector;
805 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
806 if (i == 0 || !stop_sector_reached)
807 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
811 int w_ov_finished(struct drbd_work *w, int cancel)
813 struct drbd_device_work *dw =
814 container_of(w, struct drbd_device_work, w);
815 struct drbd_device *device = dw->device;
817 ov_out_of_sync_print(device);
818 drbd_resync_finished(device);
823 static int w_resync_finished(struct drbd_work *w, int cancel)
825 struct drbd_device_work *dw =
826 container_of(w, struct drbd_device_work, w);
827 struct drbd_device *device = dw->device;
830 drbd_resync_finished(device);
835 static void ping_peer(struct drbd_device *device)
837 struct drbd_connection *connection = first_peer_device(device)->connection;
839 clear_bit(GOT_PING_ACK, &connection->flags);
840 request_ping(connection);
841 wait_event(connection->ping_wait,
842 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
845 int drbd_resync_finished(struct drbd_device *device)
847 unsigned long db, dt, dbdt;
849 union drbd_state os, ns;
850 struct drbd_device_work *dw;
851 char *khelper_cmd = NULL;
854 /* Remove all elements from the resync LRU. Since future actions
855 * might set bits in the (main) bitmap, then the entries in the
856 * resync LRU would be wrong. */
857 if (drbd_rs_del_all(device)) {
858 /* In case this is not possible now, most probably because
859 * there are P_RS_DATA_REPLY Packets lingering on the worker's
860 * queue (or even the read operations for those packets
861 * is not finished by now). Retry in 100ms. */
863 schedule_timeout_interruptible(HZ / 10);
864 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
866 dw->w.cb = w_resync_finished;
868 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
872 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
875 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
879 db = device->rs_total;
880 /* adjust for verify start and stop sectors, respective reached position */
881 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
882 db -= device->ov_left;
884 dbdt = Bit2KB(db/dt);
885 device->rs_paused /= HZ;
887 if (!get_ldev(device))
892 spin_lock_irq(&device->resource->req_lock);
893 os = drbd_read_state(device);
895 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
897 /* This protects us against multiple calls (that can happen in the presence
898 of application IO), and against connectivity loss just before we arrive here. */
899 if (os.conn <= C_CONNECTED)
903 ns.conn = C_CONNECTED;
905 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
906 verify_done ? "Online verify" : "Resync",
907 dt + device->rs_paused, device->rs_paused, dbdt);
909 n_oos = drbd_bm_total_weight(device);
911 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
913 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
915 khelper_cmd = "out-of-sync";
918 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
920 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
921 khelper_cmd = "after-resync-target";
923 if (device->use_csums && device->rs_total) {
924 const unsigned long s = device->rs_same_csum;
925 const unsigned long t = device->rs_total;
928 (t < 100000) ? ((s*100)/t) : (s/(t/100));
929 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
930 "transferred %luK total %luK\n",
932 Bit2KB(device->rs_same_csum),
933 Bit2KB(device->rs_total - device->rs_same_csum),
934 Bit2KB(device->rs_total));
938 if (device->rs_failed) {
939 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
942 ns.disk = D_INCONSISTENT;
943 ns.pdsk = D_UP_TO_DATE;
945 ns.disk = D_UP_TO_DATE;
946 ns.pdsk = D_INCONSISTENT;
949 ns.disk = D_UP_TO_DATE;
950 ns.pdsk = D_UP_TO_DATE;
952 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
953 if (device->p_uuid) {
955 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
956 _drbd_uuid_set(device, i, device->p_uuid[i]);
957 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
958 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
960 drbd_err(device, "device->p_uuid is NULL! BUG\n");
964 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
965 /* for verify runs, we don't update uuids here,
966 * so there would be nothing to report. */
967 drbd_uuid_set_bm(device, 0UL);
968 drbd_print_uuids(device, "updated UUIDs");
969 if (device->p_uuid) {
970 /* Now the two UUID sets are equal, update what we
971 * know of the peer. */
973 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
974 device->p_uuid[i] = device->ldev->md.uuid[i];
979 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
981 spin_unlock_irq(&device->resource->req_lock);
984 device->rs_total = 0;
985 device->rs_failed = 0;
986 device->rs_paused = 0;
988 /* reset start sector, if we reached end of device */
989 if (verify_done && device->ov_left == 0)
990 device->ov_start_sector = 0;
992 drbd_md_sync(device);
995 drbd_khelper(device, khelper_cmd);
1001 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1003 if (drbd_peer_req_has_active_page(peer_req)) {
1004 /* This might happen if sendpage() has not finished */
1005 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1006 atomic_add(i, &device->pp_in_use_by_net);
1007 atomic_sub(i, &device->pp_in_use);
1008 spin_lock_irq(&device->resource->req_lock);
1009 list_add_tail(&peer_req->w.list, &device->net_ee);
1010 spin_unlock_irq(&device->resource->req_lock);
1011 wake_up(&drbd_pp_wait);
1013 drbd_free_peer_req(device, peer_req);
1017 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1018 * @device: DRBD device.
1020 * @cancel: The connection will be closed anyways
1022 int w_e_end_data_req(struct drbd_work *w, int cancel)
1024 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1025 struct drbd_peer_device *peer_device = peer_req->peer_device;
1026 struct drbd_device *device = peer_device->device;
1029 if (unlikely(cancel)) {
1030 drbd_free_peer_req(device, peer_req);
1031 dec_unacked(device);
1035 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1036 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1038 if (__ratelimit(&drbd_ratelimit_state))
1039 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1040 (unsigned long long)peer_req->i.sector);
1042 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1045 dec_unacked(device);
1047 move_to_net_ee_or_free(device, peer_req);
1050 drbd_err(device, "drbd_send_block() failed\n");
1055 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1057 * @cancel: The connection will be closed anyways
1059 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1061 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1062 struct drbd_peer_device *peer_device = peer_req->peer_device;
1063 struct drbd_device *device = peer_device->device;
1066 if (unlikely(cancel)) {
1067 drbd_free_peer_req(device, peer_req);
1068 dec_unacked(device);
1072 if (get_ldev_if_state(device, D_FAILED)) {
1073 drbd_rs_complete_io(device, peer_req->i.sector);
1077 if (device->state.conn == C_AHEAD) {
1078 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1079 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1081 inc_rs_pending(device);
1082 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1084 if (__ratelimit(&drbd_ratelimit_state))
1085 drbd_err(device, "Not sending RSDataReply, "
1086 "partner DISKLESS!\n");
1090 if (__ratelimit(&drbd_ratelimit_state))
1091 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1092 (unsigned long long)peer_req->i.sector);
1094 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1096 /* update resync data with failure */
1097 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1100 dec_unacked(device);
1102 move_to_net_ee_or_free(device, peer_req);
1105 drbd_err(device, "drbd_send_block() failed\n");
1109 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1111 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1112 struct drbd_peer_device *peer_device = peer_req->peer_device;
1113 struct drbd_device *device = peer_device->device;
1114 struct digest_info *di;
1116 void *digest = NULL;
1119 if (unlikely(cancel)) {
1120 drbd_free_peer_req(device, peer_req);
1121 dec_unacked(device);
1125 if (get_ldev(device)) {
1126 drbd_rs_complete_io(device, peer_req->i.sector);
1130 di = peer_req->digest;
1132 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1133 /* quick hack to try to avoid a race against reconfiguration.
1134 * a real fix would be much more involved,
1135 * introducing more locking mechanisms */
1136 if (peer_device->connection->csums_tfm) {
1137 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1138 D_ASSERT(device, digest_size == di->digest_size);
1139 digest = kmalloc(digest_size, GFP_NOIO);
1142 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1143 eq = !memcmp(digest, di->digest, digest_size);
1148 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1149 /* rs_same_csums unit is BM_BLOCK_SIZE */
1150 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1151 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1153 inc_rs_pending(device);
1154 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1155 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1157 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1160 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1161 if (__ratelimit(&drbd_ratelimit_state))
1162 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1165 dec_unacked(device);
1166 move_to_net_ee_or_free(device, peer_req);
1169 drbd_err(device, "drbd_send_block/ack() failed\n");
1173 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1175 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1176 struct drbd_peer_device *peer_device = peer_req->peer_device;
1177 struct drbd_device *device = peer_device->device;
1178 sector_t sector = peer_req->i.sector;
1179 unsigned int size = peer_req->i.size;
1184 if (unlikely(cancel))
1187 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1188 digest = kmalloc(digest_size, GFP_NOIO);
1190 err = 1; /* terminate the connection in case the allocation failed */
1194 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1195 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1197 memset(digest, 0, digest_size);
1199 /* Free e and pages before send.
1200 * In case we block on congestion, we could otherwise run into
1201 * some distributed deadlock, if the other side blocks on
1202 * congestion as well, because our receiver blocks in
1203 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1204 drbd_free_peer_req(device, peer_req);
1206 inc_rs_pending(device);
1207 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1209 dec_rs_pending(device);
1214 drbd_free_peer_req(device, peer_req);
1215 dec_unacked(device);
1219 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1221 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1222 device->ov_last_oos_size += size>>9;
1224 device->ov_last_oos_start = sector;
1225 device->ov_last_oos_size = size>>9;
1227 drbd_set_out_of_sync(device, sector, size);
1230 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1232 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1233 struct drbd_peer_device *peer_device = peer_req->peer_device;
1234 struct drbd_device *device = peer_device->device;
1235 struct digest_info *di;
1237 sector_t sector = peer_req->i.sector;
1238 unsigned int size = peer_req->i.size;
1241 bool stop_sector_reached = false;
1243 if (unlikely(cancel)) {
1244 drbd_free_peer_req(device, peer_req);
1245 dec_unacked(device);
1249 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1250 * the resync lru has been cleaned up already */
1251 if (get_ldev(device)) {
1252 drbd_rs_complete_io(device, peer_req->i.sector);
1256 di = peer_req->digest;
1258 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1259 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1260 digest = kmalloc(digest_size, GFP_NOIO);
1262 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1264 D_ASSERT(device, digest_size == di->digest_size);
1265 eq = !memcmp(digest, di->digest, digest_size);
1270 /* Free peer_req and pages before send.
1271 * In case we block on congestion, we could otherwise run into
1272 * some distributed deadlock, if the other side blocks on
1273 * congestion as well, because our receiver blocks in
1274 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275 drbd_free_peer_req(device, peer_req);
1277 drbd_ov_out_of_sync_found(device, sector, size);
1279 ov_out_of_sync_print(device);
1281 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1282 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1284 dec_unacked(device);
1288 /* let's advance progress step marks only for every other megabyte */
1289 if ((device->ov_left & 0x200) == 0x200)
1290 drbd_advance_rs_marks(device, device->ov_left);
1292 stop_sector_reached = verify_can_do_stop_sector(device) &&
1293 (sector + (size>>9)) >= device->ov_stop_sector;
1295 if (device->ov_left == 0 || stop_sector_reached) {
1296 ov_out_of_sync_print(device);
1297 drbd_resync_finished(device);
1304 * We need to track the number of pending barrier acks,
1305 * and to be able to wait for them.
1306 * See also comment in drbd_adm_attach before drbd_suspend_io.
1308 static int drbd_send_barrier(struct drbd_connection *connection)
1310 struct p_barrier *p;
1311 struct drbd_socket *sock;
1313 sock = &connection->data;
1314 p = conn_prepare_command(connection, sock);
1317 p->barrier = connection->send.current_epoch_nr;
1319 connection->send.current_epoch_writes = 0;
1321 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1324 int w_send_write_hint(struct drbd_work *w, int cancel)
1326 struct drbd_device *device =
1327 container_of(w, struct drbd_device, unplug_work);
1328 struct drbd_socket *sock;
1332 sock = &first_peer_device(device)->connection->data;
1333 if (!drbd_prepare_command(first_peer_device(device), sock))
1335 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1338 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1340 if (!connection->send.seen_any_write_yet) {
1341 connection->send.seen_any_write_yet = true;
1342 connection->send.current_epoch_nr = epoch;
1343 connection->send.current_epoch_writes = 0;
1347 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1349 /* re-init if first write on this connection */
1350 if (!connection->send.seen_any_write_yet)
1352 if (connection->send.current_epoch_nr != epoch) {
1353 if (connection->send.current_epoch_writes)
1354 drbd_send_barrier(connection);
1355 connection->send.current_epoch_nr = epoch;
1359 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1361 struct drbd_request *req = container_of(w, struct drbd_request, w);
1362 struct drbd_device *device = req->device;
1363 struct drbd_peer_device *const peer_device = first_peer_device(device);
1364 struct drbd_connection *const connection = peer_device->connection;
1367 if (unlikely(cancel)) {
1368 req_mod(req, SEND_CANCELED);
1372 /* this time, no connection->send.current_epoch_writes++;
1373 * If it was sent, it was the closing barrier for the last
1374 * replicated epoch, before we went into AHEAD mode.
1375 * No more barriers will be sent, until we leave AHEAD mode again. */
1376 maybe_send_barrier(connection, req->epoch);
1378 err = drbd_send_out_of_sync(peer_device, req);
1379 req_mod(req, OOS_HANDED_TO_NETWORK);
1385 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1387 * @cancel: The connection will be closed anyways
1389 int w_send_dblock(struct drbd_work *w, int cancel)
1391 struct drbd_request *req = container_of(w, struct drbd_request, w);
1392 struct drbd_device *device = req->device;
1393 struct drbd_peer_device *const peer_device = first_peer_device(device);
1394 struct drbd_connection *connection = peer_device->connection;
1397 if (unlikely(cancel)) {
1398 req_mod(req, SEND_CANCELED);
1402 re_init_if_first_write(connection, req->epoch);
1403 maybe_send_barrier(connection, req->epoch);
1404 connection->send.current_epoch_writes++;
1406 err = drbd_send_dblock(peer_device, req);
1407 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1413 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1415 * @cancel: The connection will be closed anyways
1417 int w_send_read_req(struct drbd_work *w, int cancel)
1419 struct drbd_request *req = container_of(w, struct drbd_request, w);
1420 struct drbd_device *device = req->device;
1421 struct drbd_peer_device *const peer_device = first_peer_device(device);
1422 struct drbd_connection *connection = peer_device->connection;
1425 if (unlikely(cancel)) {
1426 req_mod(req, SEND_CANCELED);
1430 /* Even read requests may close a write epoch,
1431 * if there was any yet. */
1432 maybe_send_barrier(connection, req->epoch);
1434 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1435 (unsigned long)req);
1437 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1442 int w_restart_disk_io(struct drbd_work *w, int cancel)
1444 struct drbd_request *req = container_of(w, struct drbd_request, w);
1445 struct drbd_device *device = req->device;
1447 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1448 drbd_al_begin_io(device, &req->i);
1450 drbd_req_make_private_bio(req, req->master_bio);
1451 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1452 generic_make_request(req->private_bio);
1457 static int _drbd_may_sync_now(struct drbd_device *device)
1459 struct drbd_device *odev = device;
1463 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1466 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1468 if (resync_after == -1)
1470 odev = minor_to_device(resync_after);
1473 if ((odev->state.conn >= C_SYNC_SOURCE &&
1474 odev->state.conn <= C_PAUSED_SYNC_T) ||
1475 odev->state.aftr_isp || odev->state.peer_isp ||
1476 odev->state.user_isp)
1482 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1483 * @device: DRBD device.
1485 * Called from process context only (admin command and after_state_ch).
1487 static int _drbd_pause_after(struct drbd_device *device)
1489 struct drbd_device *odev;
1493 idr_for_each_entry(&drbd_devices, odev, i) {
1494 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1496 if (!_drbd_may_sync_now(odev))
1497 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1498 != SS_NOTHING_TO_DO);
1506 * _drbd_resume_next() - Resume resync on all devices that may resync now
1507 * @device: DRBD device.
1509 * Called from process context only (admin command and worker).
1511 static int _drbd_resume_next(struct drbd_device *device)
1513 struct drbd_device *odev;
1517 idr_for_each_entry(&drbd_devices, odev, i) {
1518 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1520 if (odev->state.aftr_isp) {
1521 if (_drbd_may_sync_now(odev))
1522 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1524 != SS_NOTHING_TO_DO) ;
1531 void resume_next_sg(struct drbd_device *device)
1533 write_lock_irq(&global_state_lock);
1534 _drbd_resume_next(device);
1535 write_unlock_irq(&global_state_lock);
1538 void suspend_other_sg(struct drbd_device *device)
1540 write_lock_irq(&global_state_lock);
1541 _drbd_pause_after(device);
1542 write_unlock_irq(&global_state_lock);
1545 /* caller must hold global_state_lock */
1546 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1548 struct drbd_device *odev;
1553 if (o_minor < -1 || o_minor > MINORMASK)
1554 return ERR_RESYNC_AFTER;
1556 /* check for loops */
1557 odev = minor_to_device(o_minor);
1560 return ERR_RESYNC_AFTER_CYCLE;
1562 /* You are free to depend on diskless, non-existing,
1563 * or not yet/no longer existing minors.
1564 * We only reject dependency loops.
1565 * We cannot follow the dependency chain beyond a detached or
1568 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1572 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1574 /* dependency chain ends here, no cycles. */
1575 if (resync_after == -1)
1578 /* follow the dependency chain */
1579 odev = minor_to_device(resync_after);
1583 /* caller must hold global_state_lock */
1584 void drbd_resync_after_changed(struct drbd_device *device)
1589 changes = _drbd_pause_after(device);
1590 changes |= _drbd_resume_next(device);
1594 void drbd_rs_controller_reset(struct drbd_device *device)
1596 struct fifo_buffer *plan;
1598 atomic_set(&device->rs_sect_in, 0);
1599 atomic_set(&device->rs_sect_ev, 0);
1600 device->rs_in_flight = 0;
1602 /* Updating the RCU protected object in place is necessary since
1603 this function gets called from atomic context.
1604 It is valid since all other updates also lead to an completely
1607 plan = rcu_dereference(device->rs_plan_s);
1613 void start_resync_timer_fn(unsigned long data)
1615 struct drbd_device *device = (struct drbd_device *) data;
1616 drbd_device_post_work(device, RS_START);
1619 static void do_start_resync(struct drbd_device *device)
1621 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1622 drbd_warn(device, "postponing start_resync ...\n");
1623 device->start_resync_timer.expires = jiffies + HZ/10;
1624 add_timer(&device->start_resync_timer);
1628 drbd_start_resync(device, C_SYNC_SOURCE);
1629 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1632 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1634 bool csums_after_crash_only;
1636 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1638 return connection->agreed_pro_version >= 89 && /* supported? */
1639 connection->csums_tfm && /* configured? */
1640 (csums_after_crash_only == 0 /* use for each resync? */
1641 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1645 * drbd_start_resync() - Start the resync process
1646 * @device: DRBD device.
1647 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1649 * This function might bring you directly into one of the
1650 * C_PAUSED_SYNC_* states.
1652 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1654 struct drbd_peer_device *peer_device = first_peer_device(device);
1655 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1656 union drbd_state ns;
1659 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1660 drbd_err(device, "Resync already running!\n");
1664 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1665 if (side == C_SYNC_TARGET) {
1666 /* Since application IO was locked out during C_WF_BITMAP_T and
1667 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1668 we check that we might make the data inconsistent. */
1669 r = drbd_khelper(device, "before-resync-target");
1670 r = (r >> 8) & 0xff;
1672 drbd_info(device, "before-resync-target handler returned %d, "
1673 "dropping connection.\n", r);
1674 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1677 } else /* C_SYNC_SOURCE */ {
1678 r = drbd_khelper(device, "before-resync-source");
1679 r = (r >> 8) & 0xff;
1682 drbd_info(device, "before-resync-source handler returned %d, "
1683 "ignoring. Old userland tools?", r);
1685 drbd_info(device, "before-resync-source handler returned %d, "
1686 "dropping connection.\n", r);
1687 conn_request_state(connection,
1688 NS(conn, C_DISCONNECTING), CS_HARD);
1695 if (current == connection->worker.task) {
1696 /* The worker should not sleep waiting for state_mutex,
1697 that can take long */
1698 if (!mutex_trylock(device->state_mutex)) {
1699 set_bit(B_RS_H_DONE, &device->flags);
1700 device->start_resync_timer.expires = jiffies + HZ/5;
1701 add_timer(&device->start_resync_timer);
1705 mutex_lock(device->state_mutex);
1707 clear_bit(B_RS_H_DONE, &device->flags);
1709 /* req_lock: serialize with drbd_send_and_submit() and others
1710 * global_state_lock: for stable sync-after dependencies */
1711 spin_lock_irq(&device->resource->req_lock);
1712 write_lock(&global_state_lock);
1713 /* Did some connection breakage or IO error race with us? */
1714 if (device->state.conn < C_CONNECTED
1715 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1716 write_unlock(&global_state_lock);
1717 spin_unlock_irq(&device->resource->req_lock);
1718 mutex_unlock(device->state_mutex);
1722 ns = drbd_read_state(device);
1724 ns.aftr_isp = !_drbd_may_sync_now(device);
1728 if (side == C_SYNC_TARGET)
1729 ns.disk = D_INCONSISTENT;
1730 else /* side == C_SYNC_SOURCE */
1731 ns.pdsk = D_INCONSISTENT;
1733 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1734 ns = drbd_read_state(device);
1736 if (ns.conn < C_CONNECTED)
1737 r = SS_UNKNOWN_ERROR;
1739 if (r == SS_SUCCESS) {
1740 unsigned long tw = drbd_bm_total_weight(device);
1741 unsigned long now = jiffies;
1744 device->rs_failed = 0;
1745 device->rs_paused = 0;
1746 device->rs_same_csum = 0;
1747 device->rs_last_events = 0;
1748 device->rs_last_sect_ev = 0;
1749 device->rs_total = tw;
1750 device->rs_start = now;
1751 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1752 device->rs_mark_left[i] = tw;
1753 device->rs_mark_time[i] = now;
1755 _drbd_pause_after(device);
1756 /* Forget potentially stale cached per resync extent bit-counts.
1757 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1758 * disabled, and know the disk state is ok. */
1759 spin_lock(&device->al_lock);
1760 lc_reset(device->resync);
1761 device->resync_locked = 0;
1762 device->resync_wenr = LC_FREE;
1763 spin_unlock(&device->al_lock);
1765 write_unlock(&global_state_lock);
1766 spin_unlock_irq(&device->resource->req_lock);
1768 if (r == SS_SUCCESS) {
1769 wake_up(&device->al_wait); /* for lc_reset() above */
1770 /* reset rs_last_bcast when a resync or verify is started,
1771 * to deal with potential jiffies wrap. */
1772 device->rs_last_bcast = jiffies - HZ;
1774 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1775 drbd_conn_str(ns.conn),
1776 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1777 (unsigned long) device->rs_total);
1778 if (side == C_SYNC_TARGET) {
1779 device->bm_resync_fo = 0;
1780 device->use_csums = use_checksum_based_resync(connection, device);
1782 device->use_csums = 0;
1785 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1786 * with w_send_oos, or the sync target will get confused as to
1787 * how much bits to resync. We cannot do that always, because for an
1788 * empty resync and protocol < 95, we need to do it here, as we call
1789 * drbd_resync_finished from here in that case.
1790 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1791 * and from after_state_ch otherwise. */
1792 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1793 drbd_gen_and_send_sync_uuid(peer_device);
1795 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1796 /* This still has a race (about when exactly the peers
1797 * detect connection loss) that can lead to a full sync
1798 * on next handshake. In 8.3.9 we fixed this with explicit
1799 * resync-finished notifications, but the fix
1800 * introduces a protocol change. Sleeping for some
1801 * time longer than the ping interval + timeout on the
1802 * SyncSource, to give the SyncTarget the chance to
1803 * detect connection loss, then waiting for a ping
1804 * response (implicit in drbd_resync_finished) reduces
1805 * the race considerably, but does not solve it. */
1806 if (side == C_SYNC_SOURCE) {
1807 struct net_conf *nc;
1811 nc = rcu_dereference(connection->net_conf);
1812 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1814 schedule_timeout_interruptible(timeo);
1816 drbd_resync_finished(device);
1819 drbd_rs_controller_reset(device);
1820 /* ns.conn may already be != device->state.conn,
1821 * we may have been paused in between, or become paused until
1822 * the timer triggers.
1823 * No matter, that is handled in resync_timer_fn() */
1824 if (ns.conn == C_SYNC_TARGET)
1825 mod_timer(&device->resync_timer, jiffies);
1827 drbd_md_sync(device);
1830 mutex_unlock(device->state_mutex);
1833 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1835 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1836 device->rs_last_bcast = jiffies;
1838 if (!get_ldev(device))
1841 drbd_bm_write_lazy(device, 0);
1842 if (resync_done && is_sync_state(device->state.conn))
1843 drbd_resync_finished(device);
1845 drbd_bcast_event(device, &sib);
1846 /* update timestamp, in case it took a while to write out stuff */
1847 device->rs_last_bcast = jiffies;
1851 static void drbd_ldev_destroy(struct drbd_device *device)
1853 lc_destroy(device->resync);
1854 device->resync = NULL;
1855 lc_destroy(device->act_log);
1856 device->act_log = NULL;
1858 drbd_free_ldev(device->ldev);
1859 device->ldev = NULL;);
1860 clear_bit(GOING_DISKLESS, &device->flags);
1861 wake_up(&device->misc_wait);
1864 static void go_diskless(struct drbd_device *device)
1866 D_ASSERT(device, device->state.disk == D_FAILED);
1867 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1868 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1869 * the protected members anymore, though, so once put_ldev reaches zero
1870 * again, it will be safe to free them. */
1872 /* Try to write changed bitmap pages, read errors may have just
1873 * set some bits outside the area covered by the activity log.
1875 * If we have an IO error during the bitmap writeout,
1876 * we will want a full sync next time, just in case.
1877 * (Do we want a specific meta data flag for this?)
1879 * If that does not make it to stable storage either,
1880 * we cannot do anything about that anymore.
1882 * We still need to check if both bitmap and ldev are present, we may
1883 * end up here after a failed attach, before ldev was even assigned.
1885 if (device->bitmap && device->ldev) {
1886 /* An interrupted resync or similar is allowed to recounts bits
1888 * Any modifications would not be expected anymore, though.
1890 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1891 "detach", BM_LOCKED_TEST_ALLOWED)) {
1892 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1893 drbd_md_set_flag(device, MDF_FULL_SYNC);
1894 drbd_md_sync(device);
1899 drbd_force_state(device, NS(disk, D_DISKLESS));
1902 static int do_md_sync(struct drbd_device *device)
1904 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1905 drbd_md_sync(device);
1909 #define WORK_PENDING(work_bit, todo) (todo & (1UL << work_bit))
1910 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1912 if (WORK_PENDING(MD_SYNC, todo))
1914 if (WORK_PENDING(RS_DONE, todo) ||
1915 WORK_PENDING(RS_PROGRESS, todo))
1916 update_on_disk_bitmap(device, WORK_PENDING(RS_DONE, todo));
1917 if (WORK_PENDING(GO_DISKLESS, todo))
1918 go_diskless(device);
1919 if (WORK_PENDING(DESTROY_DISK, todo))
1920 drbd_ldev_destroy(device);
1921 if (WORK_PENDING(RS_START, todo))
1922 do_start_resync(device);
1925 #define DRBD_DEVICE_WORK_MASK \
1926 ((1UL << GO_DISKLESS) \
1927 |(1UL << DESTROY_DISK) \
1929 |(1UL << RS_START) \
1930 |(1UL << RS_PROGRESS) \
1934 static unsigned long get_work_bits(unsigned long *flags)
1936 unsigned long old, new;
1939 new = old & ~DRBD_DEVICE_WORK_MASK;
1940 } while (cmpxchg(flags, old, new) != old);
1941 return old & DRBD_DEVICE_WORK_MASK;
1944 static void do_unqueued_work(struct drbd_connection *connection)
1946 struct drbd_peer_device *peer_device;
1950 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1951 struct drbd_device *device = peer_device->device;
1952 unsigned long todo = get_work_bits(&device->flags);
1956 kref_get(&device->kref);
1958 do_device_work(device, todo);
1959 kref_put(&device->kref, drbd_destroy_device);
1965 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1967 spin_lock_irq(&queue->q_lock);
1968 list_splice_tail_init(&queue->q, work_list);
1969 spin_unlock_irq(&queue->q_lock);
1970 return !list_empty(work_list);
1973 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1975 spin_lock_irq(&queue->q_lock);
1976 if (!list_empty(&queue->q))
1977 list_move(queue->q.next, work_list);
1978 spin_unlock_irq(&queue->q_lock);
1979 return !list_empty(work_list);
1982 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1985 struct net_conf *nc;
1988 dequeue_work_item(&connection->sender_work, work_list);
1989 if (!list_empty(work_list))
1992 /* Still nothing to do?
1993 * Maybe we still need to close the current epoch,
1994 * even if no new requests are queued yet.
1996 * Also, poke TCP, just in case.
1997 * Then wait for new work (or signal). */
1999 nc = rcu_dereference(connection->net_conf);
2000 uncork = nc ? nc->tcp_cork : 0;
2003 mutex_lock(&connection->data.mutex);
2004 if (connection->data.socket)
2005 drbd_tcp_uncork(connection->data.socket);
2006 mutex_unlock(&connection->data.mutex);
2011 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2012 spin_lock_irq(&connection->resource->req_lock);
2013 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2014 /* dequeue single item only,
2015 * we still use drbd_queue_work_front() in some places */
2016 if (!list_empty(&connection->sender_work.q))
2017 list_splice_tail_init(&connection->sender_work.q, work_list);
2018 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2019 if (!list_empty(work_list) || signal_pending(current)) {
2020 spin_unlock_irq(&connection->resource->req_lock);
2024 /* We found nothing new to do, no to-be-communicated request,
2025 * no other work item. We may still need to close the last
2026 * epoch. Next incoming request epoch will be connection ->
2027 * current transfer log epoch number. If that is different
2028 * from the epoch of the last request we communicated, it is
2029 * safe to send the epoch separating barrier now.
2032 atomic_read(&connection->current_tle_nr) !=
2033 connection->send.current_epoch_nr;
2034 spin_unlock_irq(&connection->resource->req_lock);
2037 maybe_send_barrier(connection,
2038 connection->send.current_epoch_nr + 1);
2040 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2043 /* drbd_send() may have called flush_signals() */
2044 if (get_t_state(&connection->worker) != RUNNING)
2048 /* may be woken up for other things but new work, too,
2049 * e.g. if the current epoch got closed.
2050 * In which case we send the barrier above. */
2052 finish_wait(&connection->sender_work.q_wait, &wait);
2054 /* someone may have changed the config while we have been waiting above. */
2056 nc = rcu_dereference(connection->net_conf);
2057 cork = nc ? nc->tcp_cork : 0;
2059 mutex_lock(&connection->data.mutex);
2060 if (connection->data.socket) {
2062 drbd_tcp_cork(connection->data.socket);
2064 drbd_tcp_uncork(connection->data.socket);
2066 mutex_unlock(&connection->data.mutex);
2069 int drbd_worker(struct drbd_thread *thi)
2071 struct drbd_connection *connection = thi->connection;
2072 struct drbd_work *w = NULL;
2073 struct drbd_peer_device *peer_device;
2074 LIST_HEAD(work_list);
2077 while (get_t_state(thi) == RUNNING) {
2078 drbd_thread_current_set_cpu(thi);
2080 if (list_empty(&work_list))
2081 wait_for_work(connection, &work_list);
2083 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2084 do_unqueued_work(connection);
2086 if (signal_pending(current)) {
2087 flush_signals(current);
2088 if (get_t_state(thi) == RUNNING) {
2089 drbd_warn(connection, "Worker got an unexpected signal\n");
2095 if (get_t_state(thi) != RUNNING)
2098 while (!list_empty(&work_list)) {
2099 w = list_first_entry(&work_list, struct drbd_work, list);
2100 list_del_init(&w->list);
2101 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2103 if (connection->cstate >= C_WF_REPORT_PARAMS)
2104 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2109 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags))
2110 do_unqueued_work(connection);
2111 while (!list_empty(&work_list)) {
2112 w = list_first_entry(&work_list, struct drbd_work, list);
2113 list_del_init(&w->list);
2116 dequeue_work_batch(&connection->sender_work, &work_list);
2117 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2120 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2121 struct drbd_device *device = peer_device->device;
2122 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2123 kref_get(&device->kref);
2125 drbd_device_cleanup(device);
2126 kref_put(&device->kref, drbd_destroy_device);